diff --git a/banks2ff/src/adapters/firefly/client.rs b/banks2ff/src/adapters/firefly/client.rs index a73f80d..a193d3f 100644 --- a/banks2ff/src/adapters/firefly/client.rs +++ b/banks2ff/src/adapters/firefly/client.rs @@ -1,15 +1,17 @@ -use async_trait::async_trait; -use anyhow::Result; -use tracing::instrument; -use crate::core::ports::{TransactionDestination, TransactionMatch}; use crate::core::models::BankTransaction; +use crate::core::ports::{TransactionDestination, TransactionMatch}; +use anyhow::Result; +use async_trait::async_trait; +use chrono::NaiveDate; use firefly_client::client::FireflyClient; -use firefly_client::models::{TransactionStore, TransactionSplitStore, TransactionUpdate, TransactionSplitUpdate}; -use std::sync::Arc; -use tokio::sync::Mutex; +use firefly_client::models::{ + TransactionSplitStore, TransactionSplitUpdate, TransactionStore, TransactionUpdate, +}; use rust_decimal::Decimal; use std::str::FromStr; -use chrono::NaiveDate; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::instrument; pub struct FireflyAdapter { client: Arc>, @@ -29,7 +31,7 @@ impl TransactionDestination for FireflyAdapter { async fn resolve_account_id(&self, iban: &str) -> Result> { let client = self.client.lock().await; let accounts = client.search_accounts(iban).await?; - + // Look for exact match on IBAN, ensuring account is active for acc in accounts.data { // Filter for active accounts only (default is usually active, but let's check if attribute exists) @@ -42,11 +44,11 @@ impl TransactionDestination for FireflyAdapter { if let Some(acc_iban) = acc.attributes.iban { if acc_iban.replace(" ", "") == iban.replace(" ", "") { - return Ok(Some(acc.id)); + return Ok(Some(acc.id)); } } } - + Ok(None) } @@ -57,19 +59,19 @@ impl TransactionDestination for FireflyAdapter { // For typical users, 50 is enough. If needed we can loop pages. // The client `get_accounts` method hardcodes limit=default. We should probably expose a list_all method or loop here. // For now, let's assume page 1 covers it or use search. - + let accounts = client.get_accounts("").await?; // Argument ignored in current impl let mut ibans = Vec::new(); - + for acc in accounts.data { - let is_active = acc.attributes.active.unwrap_or(true); - if is_active { - if let Some(iban) = acc.attributes.iban { - if !iban.is_empty() { - ibans.push(iban); - } - } - } + let is_active = acc.attributes.active.unwrap_or(true); + if is_active { + if let Some(iban) = acc.attributes.iban { + if !iban.is_empty() { + ibans.push(iban); + } + } + } } Ok(ibans) } @@ -78,63 +80,71 @@ impl TransactionDestination for FireflyAdapter { async fn get_last_transaction_date(&self, account_id: &str) -> Result> { let client = self.client.lock().await; // Fetch latest 1 transaction - let tx_list = client.list_account_transactions(account_id, None, None).await?; - + let tx_list = client + .list_account_transactions(account_id, None, None) + .await?; + if let Some(first) = tx_list.data.first() { if let Some(split) = first.attributes.transactions.first() { - // Format is usually YYYY-MM-DDT... or YYYY-MM-DD - let date_str = split.date.split('T').next().unwrap_or(&split.date); - if let Ok(date) = NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { - return Ok(Some(date)); - } + // Format is usually YYYY-MM-DDT... or YYYY-MM-DD + let date_str = split.date.split('T').next().unwrap_or(&split.date); + if let Ok(date) = NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { + return Ok(Some(date)); + } } } Ok(None) } #[instrument(skip(self))] - async fn find_transaction(&self, account_id: &str, tx: &BankTransaction) -> Result> { + async fn find_transaction( + &self, + account_id: &str, + tx: &BankTransaction, + ) -> Result> { let client = self.client.lock().await; - + // Search window: +/- 3 days let start_date = tx.date - chrono::Duration::days(3); let end_date = tx.date + chrono::Duration::days(3); - - let tx_list = client.list_account_transactions( - account_id, - Some(&start_date.format("%Y-%m-%d").to_string()), - Some(&end_date.format("%Y-%m-%d").to_string()) - ).await?; + + let tx_list = client + .list_account_transactions( + account_id, + Some(&start_date.format("%Y-%m-%d").to_string()), + Some(&end_date.format("%Y-%m-%d").to_string()), + ) + .await?; // Filter logic for existing_tx in tx_list.data { for split in existing_tx.attributes.transactions { // 1. Check Amount (exact match absolute value) if let Ok(amount) = Decimal::from_str(&split.amount) { - if amount.abs() == tx.amount.abs() { - // 2. Check External ID - if let Some(ref ext_id) = split.external_id { - if ext_id == &tx.internal_id { - return Ok(Some(TransactionMatch { - id: existing_tx.id.clone(), - has_external_id: true, - })); - } - } else { - // 3. "Naked" transaction match (Heuristic) - // If currency matches - if let Some(ref code) = split.currency_code { - if code != &tx.currency { - continue; - } - } - - return Ok(Some(TransactionMatch { - id: existing_tx.id.clone(), - has_external_id: false, - })); - } - } + if amount.abs() == tx.amount.abs() { + // 2. Check External ID + if let Some(ref ext_id) = split.external_id { + if ext_id == &tx.internal_id { + return Ok(Some(TransactionMatch { + id: existing_tx.id.clone(), + has_external_id: true, + })); + } + } else { + // 3. "Naked" transaction match (Heuristic) + // If currency matches + if let Some(ref code) = split.currency_code { + if code != &tx.currency { + continue; + } + } + + return Ok(Some(TransactionMatch { + id: existing_tx.id.clone(), + has_external_id: false, + })); + } + } } } } @@ -149,22 +159,42 @@ impl TransactionDestination for FireflyAdapter { // Map to Firefly Transaction let is_credit = tx.amount.is_sign_positive(); let transaction_type = if is_credit { "deposit" } else { "withdrawal" }; - + let split = TransactionSplitStore { transaction_type: transaction_type.to_string(), date: tx.date.format("%Y-%m-%d").to_string(), amount: tx.amount.abs().to_string(), description: tx.description.clone(), - source_id: if !is_credit { Some(account_id.to_string()) } else { None }, - source_name: if is_credit { tx.counterparty_name.clone().or(Some("Unknown Sender".to_string())) } else { None }, - destination_id: if is_credit { Some(account_id.to_string()) } else { None }, - destination_name: if !is_credit { tx.counterparty_name.clone().or(Some("Unknown Recipient".to_string())) } else { None }, + source_id: if !is_credit { + Some(account_id.to_string()) + } else { + None + }, + source_name: if is_credit { + tx.counterparty_name + .clone() + .or(Some("Unknown Sender".to_string())) + } else { + None + }, + destination_id: if is_credit { + Some(account_id.to_string()) + } else { + None + }, + destination_name: if !is_credit { + tx.counterparty_name + .clone() + .or(Some("Unknown Recipient".to_string())) + } else { + None + }, currency_code: Some(tx.currency.clone()), foreign_amount: tx.foreign_amount.map(|d| d.abs().to_string()), foreign_currency_code: tx.foreign_currency.clone(), external_id: Some(tx.internal_id.clone()), }; - + let store = TransactionStore { transactions: vec![split], apply_rules: Some(true), @@ -183,6 +213,9 @@ impl TransactionDestination for FireflyAdapter { external_id: Some(external_id.to_string()), }], }; - client.update_transaction(id, update).await.map_err(|e| e.into()) + client + .update_transaction(id, update) + .await + .map_err(|e| e.into()) } } diff --git a/banks2ff/src/adapters/gocardless/cache.rs b/banks2ff/src/adapters/gocardless/cache.rs index 79968d5..5779186 100644 --- a/banks2ff/src/adapters/gocardless/cache.rs +++ b/banks2ff/src/adapters/gocardless/cache.rs @@ -1,9 +1,9 @@ +use crate::adapters::gocardless::encryption::Encryption; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; use std::path::Path; -use serde::{Deserialize, Serialize}; use tracing::warn; -use crate::adapters::gocardless::encryption::Encryption; #[derive(Debug, Serialize, Deserialize, Default)] pub struct AccountCache { @@ -13,7 +13,8 @@ pub struct AccountCache { impl AccountCache { fn get_path() -> String { - let cache_dir = std::env::var("BANKS2FF_CACHE_DIR").unwrap_or_else(|_| "data/cache".to_string()); + let cache_dir = + std::env::var("BANKS2FF_CACHE_DIR").unwrap_or_else(|_| "data/cache".to_string()); format!("{}/accounts.enc", cache_dir) } @@ -39,7 +40,11 @@ impl AccountCache { if let Some(parent) = std::path::Path::new(&path).parent() { if let Err(e) = std::fs::create_dir_all(parent) { - warn!("Failed to create cache folder '{}': {}", parent.display(), e); + warn!( + "Failed to create cache folder '{}': {}", + parent.display(), + e + ); } } @@ -49,7 +54,7 @@ impl AccountCache { if let Err(e) = fs::write(&path, encrypted_data) { warn!("Failed to write cache file: {}", e); } - }, + } Err(e) => warn!("Failed to encrypt cache: {}", e), }, Err(e) => warn!("Failed to serialize cache: {}", e), diff --git a/banks2ff/src/adapters/gocardless/client.rs b/banks2ff/src/adapters/gocardless/client.rs index 06c0462..beb7aa0 100644 --- a/banks2ff/src/adapters/gocardless/client.rs +++ b/banks2ff/src/adapters/gocardless/client.rs @@ -1,16 +1,16 @@ +use crate::adapters::gocardless::cache::AccountCache; +use crate::adapters::gocardless::mapper::map_transaction; +use crate::adapters::gocardless::transaction_cache::AccountTransactionCache; +use crate::core::models::{Account, BankTransaction}; +use crate::core::ports::TransactionSource; +use anyhow::Result; use async_trait::async_trait; use chrono::NaiveDate; -use anyhow::Result; -use tracing::{debug, info, instrument, warn}; -use crate::core::ports::TransactionSource; -use crate::core::models::{Account, BankTransaction}; -use crate::adapters::gocardless::mapper::map_transaction; -use crate::adapters::gocardless::cache::AccountCache; -use crate::adapters::gocardless::transaction_cache::AccountTransactionCache; use gocardless_client::client::GoCardlessClient; +use tracing::{debug, info, instrument, warn}; -use std::sync::Arc; use std::collections::HashMap; +use std::sync::Arc; use tokio::sync::Mutex; pub struct GoCardlessAdapter { @@ -62,14 +62,20 @@ impl TransactionSource for GoCardlessAdapter { if let Some(agreement_id) = &req.agreement { match client.is_agreement_expired(agreement_id).await { Ok(true) => { - debug!("Skipping requisition {} - agreement {} has expired", req.id, agreement_id); + debug!( + "Skipping requisition {} - agreement {} has expired", + req.id, agreement_id + ); continue; } Ok(false) => { // Agreement is valid, proceed } Err(e) => { - warn!("Failed to check agreement {} expiry: {}. Skipping requisition.", agreement_id, e); + warn!( + "Failed to check agreement {} expiry: {}. Skipping requisition.", + agreement_id, e + ); continue; } } @@ -82,32 +88,32 @@ impl TransactionSource for GoCardlessAdapter { // 2. Fetch if missing if iban_opt.is_none() { - match client.get_account(&acc_id).await { - Ok(details) => { - let new_iban = details.iban.unwrap_or_default(); - cache.insert(acc_id.clone(), new_iban.clone()); - cache.save(); - iban_opt = Some(new_iban); - }, - Err(e) => { - // If rate limit hit here, we might want to skip this account and continue? - // But get_account is critical to identify the account. - // If we fail here, we can't match. - warn!("Failed to fetch details for account {}: {}", acc_id, e); - continue; - } - } + match client.get_account(&acc_id).await { + Ok(details) => { + let new_iban = details.iban.unwrap_or_default(); + cache.insert(acc_id.clone(), new_iban.clone()); + cache.save(); + iban_opt = Some(new_iban); + } + Err(e) => { + // If rate limit hit here, we might want to skip this account and continue? + // But get_account is critical to identify the account. + // If we fail here, we can't match. + warn!("Failed to fetch details for account {}: {}", acc_id, e); + continue; + } + } } let iban = iban_opt.unwrap_or_default(); let mut keep = true; if let Some(ref wanted) = wanted_set { - if !wanted.contains(&iban.replace(" ", "")) { - keep = false; - } else { - found_count += 1; - } + if !wanted.contains(&iban.replace(" ", "")) { + keep = false; + } else { + found_count += 1; + } } if keep { @@ -119,11 +125,13 @@ impl TransactionSource for GoCardlessAdapter { } // Optimization: Stop if we found all wanted accounts - if wanted_set.is_some() - && found_count >= target_count && target_count > 0 { - info!("Found all {} wanted accounts. Stopping search.", target_count); - return Ok(accounts); - } + if wanted_set.is_some() && found_count >= target_count && target_count > 0 { + info!( + "Found all {} wanted accounts. Stopping search.", + target_count + ); + return Ok(accounts); + } } } } @@ -133,7 +141,12 @@ impl TransactionSource for GoCardlessAdapter { } #[instrument(skip(self))] - async fn get_transactions(&self, account_id: &str, start: NaiveDate, end: NaiveDate) -> Result> { + async fn get_transactions( + &self, + account_id: &str, + start: NaiveDate, + end: NaiveDate, + ) -> Result> { let mut client = self.client.lock().await; client.obtain_access_token().await?; @@ -154,27 +167,43 @@ impl TransactionSource for GoCardlessAdapter { // Fetch missing ranges for (range_start, range_end) in uncovered_ranges { - let response_result = client.get_transactions( - account_id, - Some(&range_start.to_string()), - Some(&range_end.to_string()) - ).await; + let response_result = client + .get_transactions( + account_id, + Some(&range_start.to_string()), + Some(&range_end.to_string()), + ) + .await; match response_result { Ok(response) => { let raw_txs = response.transactions.booked.clone(); raw_transactions.extend(raw_txs.clone()); cache.store_transactions(range_start, range_end, raw_txs); - info!("Fetched {} transactions for account {} in range {}-{}", response.transactions.booked.len(), account_id, range_start, range_end); - }, + info!( + "Fetched {} transactions for account {} in range {}-{}", + response.transactions.booked.len(), + account_id, + range_start, + range_end + ); + } Err(e) => { let err_str = e.to_string(); if err_str.contains("429") { - warn!("Rate limit reached for account {} in range {}-{}. Skipping.", account_id, range_start, range_end); + warn!( + "Rate limit reached for account {} in range {}-{}. Skipping.", + account_id, range_start, range_end + ); continue; } - if err_str.contains("401") && (err_str.contains("expired") || err_str.contains("EUA")) { - debug!("EUA expired for account {} in range {}-{}. Skipping.", account_id, range_start, range_end); + if err_str.contains("401") + && (err_str.contains("expired") || err_str.contains("EUA")) + { + debug!( + "EUA expired for account {} in range {}-{}. Skipping.", + account_id, range_start, range_end + ); continue; } return Err(e.into()); @@ -194,7 +223,13 @@ impl TransactionSource for GoCardlessAdapter { } } - info!("Total {} transactions for account {} in range {}-{}", transactions.len(), account_id, start, end); + info!( + "Total {} transactions for account {} in range {}-{}", + transactions.len(), + account_id, + start, + end + ); Ok(transactions) } } diff --git a/banks2ff/src/adapters/gocardless/encryption.rs b/banks2ff/src/adapters/gocardless/encryption.rs index 60fec4b..8050c2a 100644 --- a/banks2ff/src/adapters/gocardless/encryption.rs +++ b/banks2ff/src/adapters/gocardless/encryption.rs @@ -27,13 +27,13 @@ //! - Key derivation: ~50-100ms (computed once per operation) //! - Memory: Minimal additional overhead -use aes_gcm::{Aes256Gcm, Key, Nonce}; use aes_gcm::aead::{Aead, KeyInit}; +use aes_gcm::{Aes256Gcm, Key, Nonce}; +use anyhow::{anyhow, Result}; use pbkdf2::pbkdf2_hmac; use rand::RngCore; use sha2::Sha256; use std::env; -use anyhow::{anyhow, Result}; const KEY_LEN: usize = 32; // 256-bit key const NONCE_LEN: usize = 12; // 96-bit nonce for AES-GCM @@ -72,7 +72,8 @@ impl Encryption { let nonce = Nonce::from_slice(&nonce_bytes); // Encrypt - let ciphertext = cipher.encrypt(nonce, data) + let ciphertext = cipher + .encrypt(nonce, data) .map_err(|e| anyhow!("Encryption failed: {}", e))?; // Prepend salt and nonce to ciphertext: [salt(16)][nonce(12)][ciphertext] @@ -100,7 +101,8 @@ impl Encryption { let cipher = Aes256Gcm::new(&key); // Decrypt - cipher.decrypt(nonce, ciphertext) + cipher + .decrypt(nonce, ciphertext) .map_err(|e| anyhow!("Decryption failed: {}", e)) } } @@ -170,4 +172,4 @@ mod tests { let decrypted = Encryption::decrypt(&encrypted).unwrap(); assert_eq!(data.to_vec(), decrypted); } -} \ No newline at end of file +} diff --git a/banks2ff/src/adapters/gocardless/mapper.rs b/banks2ff/src/adapters/gocardless/mapper.rs index e536dc2..14df657 100644 --- a/banks2ff/src/adapters/gocardless/mapper.rs +++ b/banks2ff/src/adapters/gocardless/mapper.rs @@ -1,15 +1,18 @@ -use rust_decimal::Decimal; -use rust_decimal::prelude::Signed; -use std::str::FromStr; -use anyhow::Result; use crate::core::models::BankTransaction; +use anyhow::Result; use gocardless_client::models::Transaction; +use rust_decimal::prelude::Signed; +use rust_decimal::Decimal; +use std::str::FromStr; pub fn map_transaction(tx: Transaction) -> Result { - let internal_id = tx.transaction_id + let internal_id = tx + .transaction_id .ok_or_else(|| anyhow::anyhow!("Transaction ID missing"))?; - let date_str = tx.booking_date.or(tx.value_date) + let date_str = tx + .booking_date + .or(tx.value_date) .ok_or_else(|| anyhow::anyhow!("Transaction date missing"))?; let date = chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d")?; @@ -23,7 +26,9 @@ pub fn map_transaction(tx: Transaction) -> Result { if let Some(exchanges) = tx.currency_exchange { if let Some(exchange) = exchanges.first() { - if let (Some(source_curr), Some(rate_str)) = (&exchange.source_currency, &exchange.exchange_rate) { + if let (Some(source_curr), Some(rate_str)) = + (&exchange.source_currency, &exchange.exchange_rate) + { foreign_currency = Some(source_curr.clone()); if let Ok(rate) = Decimal::from_str(rate_str) { let calc = amount.abs() * rate; @@ -42,7 +47,8 @@ pub fn map_transaction(tx: Transaction) -> Result { } // Fallback for description: Remittance Unstructured -> Debtor/Creditor Name -> "Unknown" - let description = tx.remittance_information_unstructured + let description = tx + .remittance_information_unstructured .or(tx.creditor_name.clone()) .or(tx.debtor_name.clone()) .unwrap_or_else(|| "Unknown Transaction".to_string()); @@ -56,14 +62,20 @@ pub fn map_transaction(tx: Transaction) -> Result { foreign_currency, description, counterparty_name: tx.creditor_name.or(tx.debtor_name), - counterparty_iban: tx.creditor_account.and_then(|a| a.iban).or(tx.debtor_account.and_then(|a| a.iban)), + counterparty_iban: tx + .creditor_account + .and_then(|a| a.iban) + .or(tx.debtor_account.and_then(|a| a.iban)), }) } fn validate_amount(amount: &Decimal) -> Result<()> { let abs = amount.abs(); if abs > Decimal::new(1_000_000_000, 0) { - return Err(anyhow::anyhow!("Amount exceeds reasonable bounds: {}", amount)); + return Err(anyhow::anyhow!( + "Amount exceeds reasonable bounds: {}", + amount + )); } if abs == Decimal::ZERO { return Err(anyhow::anyhow!("Amount cannot be zero")); @@ -73,10 +85,16 @@ fn validate_amount(amount: &Decimal) -> Result<()> { fn validate_currency(currency: &str) -> Result<()> { if currency.len() != 3 { - return Err(anyhow::anyhow!("Invalid currency code length: {}", currency)); + return Err(anyhow::anyhow!( + "Invalid currency code length: {}", + currency + )); } if !currency.chars().all(|c| c.is_ascii_uppercase()) { - return Err(anyhow::anyhow!("Invalid currency code format: {}", currency)); + return Err(anyhow::anyhow!( + "Invalid currency code format: {}", + currency + )); } Ok(()) } @@ -84,7 +102,7 @@ fn validate_currency(currency: &str) -> Result<()> { #[cfg(test)] mod tests { use super::*; - use gocardless_client::models::{TransactionAmount, CurrencyExchange}; + use gocardless_client::models::{CurrencyExchange, TransactionAmount}; #[test] fn test_map_normal_transaction() { @@ -161,8 +179,6 @@ mod tests { assert!(validate_amount(&amount).is_err()); } - - #[test] fn test_validate_currency_invalid_length() { assert!(validate_currency("EU").is_err()); diff --git a/banks2ff/src/adapters/gocardless/mod.rs b/banks2ff/src/adapters/gocardless/mod.rs index 2883edc..8569488 100644 --- a/banks2ff/src/adapters/gocardless/mod.rs +++ b/banks2ff/src/adapters/gocardless/mod.rs @@ -1,5 +1,5 @@ -pub mod client; -pub mod mapper; pub mod cache; +pub mod client; pub mod encryption; +pub mod mapper; pub mod transaction_cache; diff --git a/banks2ff/src/adapters/gocardless/transaction_cache.rs b/banks2ff/src/adapters/gocardless/transaction_cache.rs index 47a38dd..30fc14a 100644 --- a/banks2ff/src/adapters/gocardless/transaction_cache.rs +++ b/banks2ff/src/adapters/gocardless/transaction_cache.rs @@ -1,9 +1,9 @@ -use chrono::{NaiveDate, Days}; +use crate::adapters::gocardless::encryption::Encryption; +use anyhow::Result; +use chrono::{Days, NaiveDate}; +use gocardless_client::models::Transaction; use serde::{Deserialize, Serialize}; use std::path::Path; -use anyhow::Result; -use crate::adapters::gocardless::encryption::Encryption; -use gocardless_client::models::Transaction; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct AccountTransactionCache { @@ -21,7 +21,8 @@ pub struct CachedRange { impl AccountTransactionCache { /// Get cache file path for an account fn get_cache_path(account_id: &str) -> String { - let cache_dir = std::env::var("BANKS2FF_CACHE_DIR").unwrap_or_else(|_| "data/cache".to_string()); + let cache_dir = + std::env::var("BANKS2FF_CACHE_DIR").unwrap_or_else(|_| "data/cache".to_string()); format!("{}/transactions/{}.enc", cache_dir, account_id) } @@ -70,7 +71,9 @@ impl AccountTransactionCache { if Self::ranges_overlap(range.start_date, range.end_date, start, end) { for tx in &range.transactions { if let Some(booking_date_str) = &tx.booking_date { - if let Ok(booking_date) = NaiveDate::parse_from_str(booking_date_str, "%Y-%m-%d") { + if let Ok(booking_date) = + NaiveDate::parse_from_str(booking_date_str, "%Y-%m-%d") + { if booking_date >= start && booking_date <= end { result.push(tx.clone()); } @@ -83,8 +86,13 @@ impl AccountTransactionCache { } /// Get uncovered date ranges within requested period - pub fn get_uncovered_ranges(&self, start: NaiveDate, end: NaiveDate) -> Vec<(NaiveDate, NaiveDate)> { - let mut covered_periods: Vec<(NaiveDate, NaiveDate)> = self.ranges + pub fn get_uncovered_ranges( + &self, + start: NaiveDate, + end: NaiveDate, + ) -> Vec<(NaiveDate, NaiveDate)> { + let mut covered_periods: Vec<(NaiveDate, NaiveDate)> = self + .ranges .iter() .filter_map(|range| { if Self::ranges_overlap(range.start_date, range.end_date, start, end) { @@ -134,7 +142,12 @@ impl AccountTransactionCache { } /// Store transactions for a date range, merging with existing cache - pub fn store_transactions(&mut self, start: NaiveDate, end: NaiveDate, mut transactions: Vec) { + pub fn store_transactions( + &mut self, + start: NaiveDate, + end: NaiveDate, + mut transactions: Vec, + ) { Self::deduplicate_transactions(&mut transactions); let new_range = CachedRange { start_date: start, @@ -151,7 +164,12 @@ impl AccountTransactionCache { let mut remaining = Vec::new(); for range in &self.ranges { - if Self::ranges_overlap_or_adjacent(range.start_date, range.end_date, new_range.start_date, new_range.end_date) { + if Self::ranges_overlap_or_adjacent( + range.start_date, + range.end_date, + new_range.start_date, + new_range.end_date, + ) { to_merge.push(range.clone()); } else { remaining.push(range.clone()); @@ -169,15 +187,25 @@ impl AccountTransactionCache { } /// Check if two date ranges overlap - fn ranges_overlap(start1: NaiveDate, end1: NaiveDate, start2: NaiveDate, end2: NaiveDate) -> bool { + fn ranges_overlap( + start1: NaiveDate, + end1: NaiveDate, + start2: NaiveDate, + end2: NaiveDate, + ) -> bool { start1 <= end2 && start2 <= end1 } /// Check if two date ranges overlap or are adjacent - fn ranges_overlap_or_adjacent(start1: NaiveDate, end1: NaiveDate, start2: NaiveDate, end2: NaiveDate) -> bool { - Self::ranges_overlap(start1, end1, start2, end2) || - (end1 + Days::new(1)) == start2 || - (end2 + Days::new(1)) == start1 + fn ranges_overlap_or_adjacent( + start1: NaiveDate, + end1: NaiveDate, + start2: NaiveDate, + end2: NaiveDate, + ) -> bool { + Self::ranges_overlap(start1, end1, start2, end2) + || (end1 + Days::new(1)) == start2 + || (end2 + Days::new(1)) == start1 } /// Merge a list of ranges into minimal set @@ -194,7 +222,12 @@ impl AccountTransactionCache { let mut current = sorted[0].clone(); for range in sorted.into_iter().skip(1) { - if Self::ranges_overlap_or_adjacent(current.start_date, current.end_date, range.start_date, range.end_date) { + if Self::ranges_overlap_or_adjacent( + current.start_date, + current.end_date, + range.start_date, + range.end_date, + ) { // Merge current.start_date = current.start_date.min(range.start_date); current.end_date = current.end_date.max(range.end_date); @@ -227,8 +260,8 @@ impl AccountTransactionCache { #[cfg(test)] mod tests { use super::*; - use std::env; use chrono::NaiveDate; + use std::env; fn setup_test_env(test_name: &str) -> String { env::set_var("BANKS2FF_CACHE_KEY", "test-cache-key"); @@ -239,7 +272,10 @@ mod tests { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos(); - let cache_dir = format!("tmp/test-cache-{}-{}-{}", test_name, random_suffix, timestamp); + let cache_dir = format!( + "tmp/test-cache-{}-{}-{}", + test_name, random_suffix, timestamp + ); env::set_var("BANKS2FF_CACHE_DIR", cache_dir.clone()); cache_dir } @@ -261,8 +297,6 @@ mod tests { } } - - #[test] fn test_load_nonexistent_cache() { let cache_dir = setup_test_env("nonexistent"); @@ -291,7 +325,8 @@ mod tests { // Ensure env vars are set before load env::set_var("BANKS2FF_CACHE_KEY", "test-cache-key"); // Load - let loaded = AccountTransactionCache::load("test_account_empty").expect("Load should succeed"); + let loaded = + AccountTransactionCache::load("test_account_empty").expect("Load should succeed"); assert_eq!(loaded.account_id, "test_account_empty"); assert!(loaded.ranges.is_empty()); @@ -339,12 +374,16 @@ mod tests { // Ensure env vars are set before load env::set_var("BANKS2FF_CACHE_KEY", "test-cache-key"); // Load - let loaded = AccountTransactionCache::load("test_account_data").expect("Load should succeed"); + let loaded = + AccountTransactionCache::load("test_account_data").expect("Load should succeed"); assert_eq!(loaded.account_id, "test_account_data"); assert_eq!(loaded.ranges.len(), 1); assert_eq!(loaded.ranges[0].transactions.len(), 1); - assert_eq!(loaded.ranges[0].transactions[0].transaction_id, Some("test-tx-1".to_string())); + assert_eq!( + loaded.ranges[0].transactions[0].transaction_id, + Some("test-tx-1".to_string()) + ); cleanup_test_dir(&cache_dir); } @@ -426,8 +465,20 @@ mod tests { let end = NaiveDate::from_ymd_opt(2024, 1, 31).unwrap(); let uncovered = cache.get_uncovered_ranges(start, end); assert_eq!(uncovered.len(), 2); - assert_eq!(uncovered[0], (NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(), NaiveDate::from_ymd_opt(2024, 1, 9).unwrap())); - assert_eq!(uncovered[1], (NaiveDate::from_ymd_opt(2024, 1, 21).unwrap(), NaiveDate::from_ymd_opt(2024, 1, 31).unwrap())); + assert_eq!( + uncovered[0], + ( + NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(2024, 1, 9).unwrap() + ) + ); + assert_eq!( + uncovered[1], + ( + NaiveDate::from_ymd_opt(2024, 1, 21).unwrap(), + NaiveDate::from_ymd_opt(2024, 1, 31).unwrap() + ) + ); } #[test] @@ -552,4 +603,4 @@ mod tests { assert_eq!(cached.len(), 1); assert_eq!(cached[0].transaction_id, Some("tx1".to_string())); } -} \ No newline at end of file +} diff --git a/banks2ff/src/core/models.rs b/banks2ff/src/core/models.rs index 69f1f0e..787a52e 100644 --- a/banks2ff/src/core/models.rs +++ b/banks2ff/src/core/models.rs @@ -1,5 +1,5 @@ -use rust_decimal::Decimal; use chrono::NaiveDate; +use rust_decimal::Decimal; use std::fmt; use thiserror::Error; @@ -32,11 +32,20 @@ impl fmt::Debug for BankTransaction { .field("date", &self.date) .field("amount", &"[REDACTED]") .field("currency", &self.currency) - .field("foreign_amount", &self.foreign_amount.as_ref().map(|_| "[REDACTED]")) + .field( + "foreign_amount", + &self.foreign_amount.as_ref().map(|_| "[REDACTED]"), + ) .field("foreign_currency", &self.foreign_currency) .field("description", &"[REDACTED]") - .field("counterparty_name", &self.counterparty_name.as_ref().map(|_| "[REDACTED]")) - .field("counterparty_iban", &self.counterparty_iban.as_ref().map(|_| "[REDACTED]")) + .field( + "counterparty_name", + &self.counterparty_name.as_ref().map(|_| "[REDACTED]"), + ) + .field( + "counterparty_iban", + &self.counterparty_iban.as_ref().map(|_| "[REDACTED]"), + ) .finish() } } diff --git a/banks2ff/src/core/ports.rs b/banks2ff/src/core/ports.rs index 4a2ed4a..cda6dcc 100644 --- a/banks2ff/src/core/ports.rs +++ b/banks2ff/src/core/ports.rs @@ -1,9 +1,9 @@ +use crate::core::models::{Account, BankTransaction}; +use anyhow::Result; use async_trait::async_trait; use chrono::NaiveDate; -use anyhow::Result; #[cfg(test)] use mockall::automock; -use crate::core::models::{BankTransaction, Account}; #[derive(Debug, Default)] pub struct IngestResult { @@ -18,7 +18,12 @@ pub struct IngestResult { pub trait TransactionSource: Send + Sync { /// Fetch accounts. Optionally filter by a list of wanted IBANs to save requests. async fn get_accounts(&self, wanted_ibans: Option>) -> Result>; - async fn get_transactions(&self, account_id: &str, start: NaiveDate, end: NaiveDate) -> Result>; + async fn get_transactions( + &self, + account_id: &str, + start: NaiveDate, + end: NaiveDate, + ) -> Result>; } // Blanket implementation for references @@ -28,7 +33,12 @@ impl TransactionSource for &T { (**self).get_accounts(wanted_ibans).await } - async fn get_transactions(&self, account_id: &str, start: NaiveDate, end: NaiveDate) -> Result> { + async fn get_transactions( + &self, + account_id: &str, + start: NaiveDate, + end: NaiveDate, + ) -> Result> { (**self).get_transactions(account_id, start, end).await } } @@ -48,7 +58,11 @@ pub trait TransactionDestination: Send + Sync { // New granular methods for Healer Logic async fn get_last_transaction_date(&self, account_id: &str) -> Result>; - async fn find_transaction(&self, account_id: &str, transaction: &BankTransaction) -> Result>; + async fn find_transaction( + &self, + account_id: &str, + transaction: &BankTransaction, + ) -> Result>; async fn create_transaction(&self, account_id: &str, tx: &BankTransaction) -> Result<()>; async fn update_transaction_external_id(&self, id: &str, external_id: &str) -> Result<()>; } @@ -68,7 +82,11 @@ impl TransactionDestination for &T { (**self).get_last_transaction_date(account_id).await } - async fn find_transaction(&self, account_id: &str, transaction: &BankTransaction) -> Result> { + async fn find_transaction( + &self, + account_id: &str, + transaction: &BankTransaction, + ) -> Result> { (**self).find_transaction(account_id, transaction).await } @@ -77,6 +95,8 @@ impl TransactionDestination for &T { } async fn update_transaction_external_id(&self, id: &str, external_id: &str) -> Result<()> { - (**self).update_transaction_external_id(id, external_id).await + (**self) + .update_transaction_external_id(id, external_id) + .await } } diff --git a/banks2ff/src/core/sync.rs b/banks2ff/src/core/sync.rs index 8a9be8c..95c47b0 100644 --- a/banks2ff/src/core/sync.rs +++ b/banks2ff/src/core/sync.rs @@ -1,9 +1,8 @@ +use crate::core::models::{Account, SyncError}; +use crate::core::ports::{IngestResult, TransactionDestination, TransactionSource}; use anyhow::Result; -use tracing::{info, warn, instrument}; -use crate::core::ports::{IngestResult, TransactionSource, TransactionDestination}; -use crate::core::models::{SyncError, Account}; -use chrono::{NaiveDate, Local}; - +use chrono::{Local, NaiveDate}; +use tracing::{info, instrument, warn}; #[derive(Debug, Default)] pub struct SyncResult { @@ -24,14 +23,24 @@ pub async fn run_sync( info!("Starting synchronization..."); // Optimization: Get active Firefly IBANs first - let wanted_ibans = destination.get_active_account_ibans().await.map_err(SyncError::DestinationError)?; - info!("Syncing {} active accounts from Firefly III", wanted_ibans.len()); + let wanted_ibans = destination + .get_active_account_ibans() + .await + .map_err(SyncError::DestinationError)?; + info!( + "Syncing {} active accounts from Firefly III", + wanted_ibans.len() + ); - let accounts = source.get_accounts(Some(wanted_ibans)).await.map_err(SyncError::SourceError)?; + let accounts = source + .get_accounts(Some(wanted_ibans)) + .await + .map_err(SyncError::SourceError)?; info!("Found {} accounts from source", accounts.len()); // Default end date is Yesterday - let end_date = cli_end_date.unwrap_or_else(|| Local::now().date_naive() - chrono::Duration::days(1)); + let end_date = + cli_end_date.unwrap_or_else(|| Local::now().date_naive() - chrono::Duration::days(1)); let mut result = SyncResult::default(); @@ -42,19 +51,33 @@ pub async fn run_sync( info!("Processing account..."); // Process account with error handling - match process_single_account(&source, &destination, &account, cli_start_date, end_date, dry_run).await { + match process_single_account( + &source, + &destination, + &account, + cli_start_date, + end_date, + dry_run, + ) + .await + { Ok(stats) => { result.accounts_processed += 1; result.ingest.created += stats.created; result.ingest.healed += stats.healed; result.ingest.duplicates += stats.duplicates; result.ingest.errors += stats.errors; - info!("Account {} sync complete. Created: {}, Healed: {}, Duplicates: {}, Errors: {}", - account.id, stats.created, stats.healed, stats.duplicates, stats.errors); + info!( + "Account {} sync complete. Created: {}, Healed: {}, Duplicates: {}, Errors: {}", + account.id, stats.created, stats.healed, stats.duplicates, stats.errors + ); } Err(SyncError::AgreementExpired { agreement_id }) => { result.accounts_skipped_expired += 1; - warn!("Account {} skipped - associated agreement {} has expired", account.id, agreement_id); + warn!( + "Account {} skipped - associated agreement {} has expired", + account.id, agreement_id + ); } Err(SyncError::AccountSkipped { account_id, reason }) => { result.accounts_skipped_errors += 1; @@ -67,10 +90,14 @@ pub async fn run_sync( } } - info!("Synchronization finished. Processed: {}, Skipped (expired): {}, Skipped (errors): {}", - result.accounts_processed, result.accounts_skipped_expired, result.accounts_skipped_errors); - info!("Total transactions - Created: {}, Healed: {}, Duplicates: {}, Errors: {}", - result.ingest.created, result.ingest.healed, result.ingest.duplicates, result.ingest.errors); + info!( + "Synchronization finished. Processed: {}, Skipped (expired): {}, Skipped (errors): {}", + result.accounts_processed, result.accounts_skipped_expired, result.accounts_skipped_errors + ); + info!( + "Total transactions - Created: {}, Healed: {}, Duplicates: {}, Errors: {}", + result.ingest.created, result.ingest.healed, result.ingest.duplicates, result.ingest.errors + ); Ok(result) } @@ -83,7 +110,10 @@ async fn process_single_account( end_date: NaiveDate, dry_run: bool, ) -> Result { - let dest_id_opt = destination.resolve_account_id(&account.iban).await.map_err(SyncError::DestinationError)?; + let dest_id_opt = destination + .resolve_account_id(&account.iban) + .await + .map_err(SyncError::DestinationError)?; let Some(dest_id) = dest_id_opt else { return Err(SyncError::AccountSkipped { account_id: account.id.clone(), @@ -98,24 +128,34 @@ async fn process_single_account( d } else { // Default: Latest transaction date + 1 day - match destination.get_last_transaction_date(&dest_id).await.map_err(SyncError::DestinationError)? { + match destination + .get_last_transaction_date(&dest_id) + .await + .map_err(SyncError::DestinationError)? + { Some(last_date) => last_date + chrono::Duration::days(1), None => { // If no transaction exists in Firefly, we assume this is a fresh sync. // Default to syncing last 30 days. end_date - chrono::Duration::days(30) - }, + } } }; if start_date > end_date { - info!("Start date {} is after end date {}. Nothing to sync.", start_date, end_date); - return Ok(IngestResult::default()); + info!( + "Start date {} is after end date {}. Nothing to sync.", + start_date, end_date + ); + return Ok(IngestResult::default()); } info!("Syncing interval: {} to {}", start_date, end_date); - let transactions = match source.get_transactions(&account.id, start_date, end_date).await { + let transactions = match source + .get_transactions(&account.id, start_date, end_date) + .await + { Ok(txns) => txns, Err(e) => { let err_str = e.to_string(); @@ -139,49 +179,62 @@ async fn process_single_account( // Healer Logic Loop for tx in transactions { - // 1. Check if it exists - match destination.find_transaction(&dest_id, &tx).await.map_err(SyncError::DestinationError)? { - Some(existing) => { - if existing.has_external_id { - // Already synced properly - stats.duplicates += 1; - } else { - // Found "naked" transaction -> Heal it - if dry_run { - info!("[DRY RUN] Would heal transaction {} (Firefly ID: {})", tx.internal_id, existing.id); - stats.healed += 1; - } else { - info!("Healing transaction {} (Firefly ID: {})", tx.internal_id, existing.id); - if let Err(e) = destination.update_transaction_external_id(&existing.id, &tx.internal_id).await { - tracing::error!("Failed to heal transaction: {}", e); - stats.errors += 1; - } else { - stats.healed += 1; - } - } - } - }, - None => { - // New transaction - if dry_run { - info!("[DRY RUN] Would create transaction {}", tx.internal_id); - stats.created += 1; - } else if let Err(e) = destination.create_transaction(&dest_id, &tx).await { - // Firefly might still reject it as duplicate if hash matches, even if we didn't find it via heuristic - // (unlikely if heuristic is good, but possible) - let err_str = e.to_string(); - if err_str.contains("422") || err_str.contains("Duplicate") { - warn!("Duplicate rejected by Firefly: {}", tx.internal_id); - stats.duplicates += 1; - } else { - tracing::error!("Failed to create transaction: {}", e); - stats.errors += 1; - } - } else { - stats.created += 1; - } - } - } + // 1. Check if it exists + match destination + .find_transaction(&dest_id, &tx) + .await + .map_err(SyncError::DestinationError)? + { + Some(existing) => { + if existing.has_external_id { + // Already synced properly + stats.duplicates += 1; + } else { + // Found "naked" transaction -> Heal it + if dry_run { + info!( + "[DRY RUN] Would heal transaction {} (Firefly ID: {})", + tx.internal_id, existing.id + ); + stats.healed += 1; + } else { + info!( + "Healing transaction {} (Firefly ID: {})", + tx.internal_id, existing.id + ); + if let Err(e) = destination + .update_transaction_external_id(&existing.id, &tx.internal_id) + .await + { + tracing::error!("Failed to heal transaction: {}", e); + stats.errors += 1; + } else { + stats.healed += 1; + } + } + } + } + None => { + // New transaction + if dry_run { + info!("[DRY RUN] Would create transaction {}", tx.internal_id); + stats.created += 1; + } else if let Err(e) = destination.create_transaction(&dest_id, &tx).await { + // Firefly might still reject it as duplicate if hash matches, even if we didn't find it via heuristic + // (unlikely if heuristic is good, but possible) + let err_str = e.to_string(); + if err_str.contains("422") || err_str.contains("Duplicate") { + warn!("Duplicate rejected by Firefly: {}", tx.internal_id); + stats.duplicates += 1; + } else { + tracing::error!("Failed to create transaction: {}", e); + stats.errors += 1; + } + } else { + stats.created += 1; + } + } + } } Ok(stats) @@ -190,10 +243,10 @@ async fn process_single_account( #[cfg(test)] mod tests { use super::*; - use crate::core::ports::{MockTransactionSource, MockTransactionDestination, TransactionMatch}; use crate::core::models::{Account, BankTransaction}; - use rust_decimal::Decimal; + use crate::core::ports::{MockTransactionDestination, MockTransactionSource, TransactionMatch}; use mockall::predicate::*; + use rust_decimal::Decimal; #[tokio::test] async fn test_sync_flow_create_new() { @@ -201,13 +254,16 @@ mod tests { let mut dest = MockTransactionDestination::new(); // Source setup - source.expect_get_accounts() + source + .expect_get_accounts() .with(always()) // Match any argument - .returning(|_| Ok(vec![Account { - id: "src_1".to_string(), - iban: "NL01".to_string(), - currency: "EUR".to_string(), - }])); + .returning(|_| { + Ok(vec![Account { + id: "src_1".to_string(), + iban: "NL01".to_string(), + currency: "EUR".to_string(), + }]) + }); let tx = BankTransaction { internal_id: "tx1".into(), @@ -222,7 +278,8 @@ mod tests { }; let tx_clone = tx.clone(); - source.expect_get_transactions() + source + .expect_get_transactions() .returning(move |_, _, _| Ok(vec![tx.clone()])); // Destination setup @@ -231,7 +288,7 @@ mod tests { dest.expect_resolve_account_id() .returning(|_| Ok(Some("dest_1".into()))); - + dest.expect_get_last_transaction_date() .returning(|_| Ok(Some(NaiveDate::from_ymd_opt(2022, 12, 31).unwrap()))); @@ -239,7 +296,7 @@ mod tests { dest.expect_find_transaction() .times(1) .returning(|_, _| Ok(None)); - + // 2. Create -> Ok dest.expect_create_transaction() .with(eq("dest_1"), eq(tx_clone)) @@ -250,49 +307,50 @@ mod tests { let res = run_sync(&source, &dest, None, None, false).await; assert!(res.is_ok()); } - + #[tokio::test] async fn test_sync_flow_heal_existing() { let mut source = MockTransactionSource::new(); let mut dest = MockTransactionDestination::new(); - + dest.expect_get_active_account_ibans() .returning(|| Ok(vec!["NL01".to_string()])); - source.expect_get_accounts() - .with(always()) - .returning(|_| Ok(vec![Account { + source.expect_get_accounts().with(always()).returning(|_| { + Ok(vec![Account { id: "src_1".to_string(), iban: "NL01".to_string(), currency: "EUR".to_string(), - }])); + }]) + }); - source.expect_get_transactions() - .returning(|_, _, _| Ok(vec![ - BankTransaction { - internal_id: "tx1".into(), - date: NaiveDate::from_ymd_opt(2023, 1, 1).unwrap(), - amount: Decimal::new(100, 0), - currency: "EUR".into(), - foreign_amount: None, - foreign_currency: None, - description: "Test".into(), - counterparty_name: None, - counterparty_iban: None, - } - ])); + source.expect_get_transactions().returning(|_, _, _| { + Ok(vec![BankTransaction { + internal_id: "tx1".into(), + date: NaiveDate::from_ymd_opt(2023, 1, 1).unwrap(), + amount: Decimal::new(100, 0), + currency: "EUR".into(), + foreign_amount: None, + foreign_currency: None, + description: "Test".into(), + counterparty_name: None, + counterparty_iban: None, + }]) + }); - dest.expect_resolve_account_id().returning(|_| Ok(Some("dest_1".into()))); - dest.expect_get_last_transaction_date().returning(|_| Ok(Some(NaiveDate::from_ymd_opt(2022, 12, 31).unwrap()))); + dest.expect_resolve_account_id() + .returning(|_| Ok(Some("dest_1".into()))); + dest.expect_get_last_transaction_date() + .returning(|_| Ok(Some(NaiveDate::from_ymd_opt(2022, 12, 31).unwrap()))); // 1. Find -> Some(No External ID) - dest.expect_find_transaction() - .times(1) - .returning(|_, _| Ok(Some(TransactionMatch { + dest.expect_find_transaction().times(1).returning(|_, _| { + Ok(Some(TransactionMatch { id: "ff_tx_1".to_string(), has_external_id: false, - }))); - + })) + }); + // 2. Update -> Ok dest.expect_update_transaction_external_id() .with(eq("ff_tx_1"), eq("tx1")) @@ -302,22 +360,22 @@ mod tests { let res = run_sync(&source, &dest, None, None, false).await; assert!(res.is_ok()); } - + #[tokio::test] async fn test_sync_flow_dry_run() { let mut source = MockTransactionSource::new(); let mut dest = MockTransactionDestination::new(); - + dest.expect_get_active_account_ibans() .returning(|| Ok(vec!["NL01".to_string()])); - source.expect_get_accounts() - .with(always()) - .returning(|_| Ok(vec![Account { + source.expect_get_accounts().with(always()).returning(|_| { + Ok(vec![Account { id: "src_1".to_string(), iban: "NL01".to_string(), currency: "EUR".to_string(), - }])); + }]) + }); let tx = BankTransaction { internal_id: "tx1".into(), @@ -331,16 +389,18 @@ mod tests { counterparty_iban: None, }; - source.expect_get_transactions() + source + .expect_get_transactions() .returning(move |_, _, _| Ok(vec![tx.clone()])); - dest.expect_resolve_account_id().returning(|_| Ok(Some("dest_1".into()))); - dest.expect_get_last_transaction_date().returning(|_| Ok(Some(NaiveDate::from_ymd_opt(2022, 12, 31).unwrap()))); + dest.expect_resolve_account_id() + .returning(|_| Ok(Some("dest_1".into()))); + dest.expect_get_last_transaction_date() + .returning(|_| Ok(Some(NaiveDate::from_ymd_opt(2022, 12, 31).unwrap()))); // 1. Find -> None (New transaction) - dest.expect_find_transaction() - .returning(|_, _| Ok(None)); - + dest.expect_find_transaction().returning(|_, _| Ok(None)); + // 2. Create -> NEVER Called (Dry Run) dest.expect_create_transaction().never(); dest.expect_update_transaction_external_id().never(); @@ -348,4 +408,4 @@ mod tests { let res = run_sync(source, dest, None, None, true).await; assert!(res.is_ok()); } -} \ No newline at end of file +} diff --git a/banks2ff/src/debug.rs b/banks2ff/src/debug.rs index 7808966..6eec9e8 100644 --- a/banks2ff/src/debug.rs +++ b/banks2ff/src/debug.rs @@ -1,11 +1,11 @@ -use reqwest_middleware::{Middleware, Next}; -use task_local_extensions::Extensions; -use reqwest::{Request, Response}; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::fs; -use std::path::Path; use chrono::Utc; use hyper::Body; +use reqwest::{Request, Response}; +use reqwest_middleware::{Middleware, Next}; +use std::fs; +use std::path::Path; +use std::sync::atomic::{AtomicU64, Ordering}; +use task_local_extensions::Extensions; static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(0); @@ -51,7 +51,11 @@ impl Middleware for DebugLogger { log_content.push_str("# Request:\n"); log_content.push_str(&format!("{} {} HTTP/1.1\n", req.method(), req.url())); for (key, value) in req.headers() { - log_content.push_str(&format!("{}: {}\n", key, value.to_str().unwrap_or("[INVALID]"))); + log_content.push_str(&format!( + "{}: {}\n", + key, + value.to_str().unwrap_or("[INVALID]") + )); } if let Some(body) = req.body() { if let Some(bytes) = body.as_bytes() { @@ -70,13 +74,26 @@ impl Middleware for DebugLogger { // Response log_content.push_str("# Response:\n"); - log_content.push_str(&format!("HTTP/1.1 {} {}\n", status.as_u16(), status.canonical_reason().unwrap_or("Unknown"))); + log_content.push_str(&format!( + "HTTP/1.1 {} {}\n", + status.as_u16(), + status.canonical_reason().unwrap_or("Unknown") + )); for (key, value) in &headers { - log_content.push_str(&format!("{}: {}\n", key, value.to_str().unwrap_or("[INVALID]"))); + log_content.push_str(&format!( + "{}: {}\n", + key, + value.to_str().unwrap_or("[INVALID]") + )); } // Read body - let body_bytes = response.bytes().await.map_err(|e| reqwest_middleware::Error::Middleware(anyhow::anyhow!("Failed to read response body: {}", e)))?; + let body_bytes = response.bytes().await.map_err(|e| { + reqwest_middleware::Error::Middleware(anyhow::anyhow!( + "Failed to read response body: {}", + e + )) + })?; let body_str = String::from_utf8_lossy(&body_bytes); log_content.push_str(&format!("\n{}", body_str)); @@ -86,9 +103,7 @@ impl Middleware for DebugLogger { } // Reconstruct response - let mut builder = http::Response::builder() - .status(status) - .version(version); + let mut builder = http::Response::builder().status(status).version(version); for (key, value) in &headers { builder = builder.header(key, value); } @@ -113,4 +128,4 @@ fn build_curl_command(req: &Request) -> String { } curl -} \ No newline at end of file +} diff --git a/banks2ff/src/main.rs b/banks2ff/src/main.rs index 5d63826..f701b44 100644 --- a/banks2ff/src/main.rs +++ b/banks2ff/src/main.rs @@ -2,17 +2,17 @@ mod adapters; mod core; mod debug; -use clap::Parser; -use tracing::{info, error}; -use crate::adapters::gocardless::client::GoCardlessAdapter; use crate::adapters::firefly::client::FireflyAdapter; +use crate::adapters::gocardless::client::GoCardlessAdapter; use crate::core::sync::run_sync; use crate::debug::DebugLogger; -use gocardless_client::client::GoCardlessClient; +use chrono::NaiveDate; +use clap::Parser; use firefly_client::client::FireflyClient; +use gocardless_client::client::GoCardlessClient; use reqwest_middleware::ClientBuilder; use std::env; -use chrono::NaiveDate; +use tracing::{error, info}; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -56,7 +56,8 @@ async fn main() -> anyhow::Result<()> { } // Config Load - let gc_url = env::var("GOCARDLESS_URL").unwrap_or_else(|_| "https://bankaccountdata.gocardless.com".to_string()); + let gc_url = env::var("GOCARDLESS_URL") + .unwrap_or_else(|_| "https://bankaccountdata.gocardless.com".to_string()); let gc_id = env::var("GOCARDLESS_ID").expect("GOCARDLESS_ID not set"); let gc_key = env::var("GOCARDLESS_KEY").expect("GOCARDLESS_KEY not set"); @@ -90,10 +91,19 @@ async fn main() -> anyhow::Result<()> { match run_sync(source, destination, args.start, args.end, args.dry_run).await { Ok(result) => { info!("Sync completed successfully."); - info!("Accounts processed: {}, skipped (expired): {}, skipped (errors): {}", - result.accounts_processed, result.accounts_skipped_expired, result.accounts_skipped_errors); - info!("Transactions - Created: {}, Healed: {}, Duplicates: {}, Errors: {}", - result.ingest.created, result.ingest.healed, result.ingest.duplicates, result.ingest.errors); + info!( + "Accounts processed: {}, skipped (expired): {}, skipped (errors): {}", + result.accounts_processed, + result.accounts_skipped_expired, + result.accounts_skipped_errors + ); + info!( + "Transactions - Created: {}, Healed: {}, Duplicates: {}, Errors: {}", + result.ingest.created, + result.ingest.healed, + result.ingest.duplicates, + result.ingest.errors + ); } Err(e) => error!("Sync failed: {}", e), } diff --git a/firefly-client/src/client.rs b/firefly-client/src/client.rs index f0dd43d..4f9b4da 100644 --- a/firefly-client/src/client.rs +++ b/firefly-client/src/client.rs @@ -1,9 +1,9 @@ +use crate::models::{AccountArray, TransactionArray, TransactionStore, TransactionUpdate}; use reqwest::Url; use reqwest_middleware::ClientWithMiddleware; use serde::de::DeserializeOwned; use thiserror::Error; use tracing::instrument; -use crate::models::{AccountArray, TransactionStore, TransactionArray, TransactionUpdate}; #[derive(Error, Debug)] pub enum FireflyError { @@ -28,10 +28,16 @@ impl FireflyClient { Self::with_client(base_url, access_token, None) } - pub fn with_client(base_url: &str, access_token: &str, client: Option) -> Result { + pub fn with_client( + base_url: &str, + access_token: &str, + client: Option, + ) -> Result { Ok(Self { base_url: Url::parse(base_url)?, - client: client.unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::new()).build()), + client: client.unwrap_or_else(|| { + reqwest_middleware::ClientBuilder::new(reqwest::Client::new()).build() + }), access_token: access_token.to_string(), }) } @@ -39,12 +45,11 @@ impl FireflyClient { #[instrument(skip(self))] pub async fn get_accounts(&self, _iban: &str) -> Result { let mut url = self.base_url.join("/api/v1/accounts")?; - url.query_pairs_mut() - .append_pair("type", "asset"); - + url.query_pairs_mut().append_pair("type", "asset"); + self.get_authenticated(url).await } - + #[instrument(skip(self))] pub async fn search_accounts(&self, query: &str) -> Result { let mut url = self.base_url.join("/api/v1/search/accounts")?; @@ -52,15 +57,20 @@ impl FireflyClient { .append_pair("query", query) .append_pair("type", "asset") .append_pair("field", "all"); - + self.get_authenticated(url).await } #[instrument(skip(self, transaction))] - pub async fn store_transaction(&self, transaction: TransactionStore) -> Result<(), FireflyError> { + pub async fn store_transaction( + &self, + transaction: TransactionStore, + ) -> Result<(), FireflyError> { let url = self.base_url.join("/api/v1/transactions")?; - - let response = self.client.post(url) + + let response = self + .client + .post(url) .bearer_auth(&self.access_token) .header("accept", "application/json") .json(&transaction) @@ -70,15 +80,25 @@ impl FireflyClient { if !response.status().is_success() { let status = response.status(); let text = response.text().await?; - return Err(FireflyError::ApiError(format!("Store Transaction Failed {}: {}", status, text))); + return Err(FireflyError::ApiError(format!( + "Store Transaction Failed {}: {}", + status, text + ))); } - + Ok(()) } - + #[instrument(skip(self))] - pub async fn list_account_transactions(&self, account_id: &str, start: Option<&str>, end: Option<&str>) -> Result { - let mut url = self.base_url.join(&format!("/api/v1/accounts/{}/transactions", account_id))?; + pub async fn list_account_transactions( + &self, + account_id: &str, + start: Option<&str>, + end: Option<&str>, + ) -> Result { + let mut url = self + .base_url + .join(&format!("/api/v1/accounts/{}/transactions", account_id))?; { let mut pairs = url.query_pairs_mut(); if let Some(s) = start { @@ -88,17 +108,25 @@ impl FireflyClient { pairs.append_pair("end", e); } // Limit to 50, could be higher but safer to page if needed. For heuristic checks 50 is usually plenty per day range. - pairs.append_pair("limit", "50"); + pairs.append_pair("limit", "50"); } self.get_authenticated(url).await } #[instrument(skip(self, update))] - pub async fn update_transaction(&self, id: &str, update: TransactionUpdate) -> Result<(), FireflyError> { - let url = self.base_url.join(&format!("/api/v1/transactions/{}", id))?; + pub async fn update_transaction( + &self, + id: &str, + update: TransactionUpdate, + ) -> Result<(), FireflyError> { + let url = self + .base_url + .join(&format!("/api/v1/transactions/{}", id))?; - let response = self.client.put(url) + let response = self + .client + .put(url) .bearer_auth(&self.access_token) .header("accept", "application/json") .json(&update) @@ -106,25 +134,33 @@ impl FireflyClient { .await?; if !response.status().is_success() { - let status = response.status(); - let text = response.text().await?; - return Err(FireflyError::ApiError(format!("Update Transaction Failed {}: {}", status, text))); + let status = response.status(); + let text = response.text().await?; + return Err(FireflyError::ApiError(format!( + "Update Transaction Failed {}: {}", + status, text + ))); } Ok(()) } async fn get_authenticated(&self, url: Url) -> Result { - let response = self.client.get(url) + let response = self + .client + .get(url) .bearer_auth(&self.access_token) .header("accept", "application/json") .send() .await?; if !response.status().is_success() { - let status = response.status(); - let text = response.text().await?; - return Err(FireflyError::ApiError(format!("API request failed {}: {}", status, text))); + let status = response.status(); + let text = response.text().await?; + return Err(FireflyError::ApiError(format!( + "API request failed {}: {}", + status, text + ))); } let data = response.json().await?; diff --git a/firefly-client/tests/client_test.rs b/firefly-client/tests/client_test.rs index 1ed8ef5..f8477d3 100644 --- a/firefly-client/tests/client_test.rs +++ b/firefly-client/tests/client_test.rs @@ -1,8 +1,8 @@ use firefly_client::client::FireflyClient; -use firefly_client::models::{TransactionStore, TransactionSplitStore}; -use wiremock::matchers::{method, path, header}; -use wiremock::{Mock, MockServer, ResponseTemplate}; +use firefly_client::models::{TransactionSplitStore, TransactionStore}; use std::fs; +use wiremock::matchers::{header, method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; #[tokio::test] async fn test_search_accounts() { @@ -21,7 +21,10 @@ async fn test_search_accounts() { assert_eq!(accounts.data.len(), 1); assert_eq!(accounts.data[0].attributes.name, "Checking Account"); - assert_eq!(accounts.data[0].attributes.iban.as_deref(), Some("NL01BANK0123456789")); + assert_eq!( + accounts.data[0].attributes.iban.as_deref(), + Some("NL01BANK0123456789") + ); } #[tokio::test] @@ -36,7 +39,7 @@ async fn test_store_transaction() { .await; let client = FireflyClient::new(&mock_server.uri(), "my-token").unwrap(); - + let tx = TransactionStore { transactions: vec![TransactionSplitStore { transaction_type: "withdrawal".to_string(), diff --git a/gocardless-client/src/client.rs b/gocardless-client/src/client.rs index bb1a991..d01d09e 100644 --- a/gocardless-client/src/client.rs +++ b/gocardless-client/src/client.rs @@ -1,9 +1,11 @@ +use crate::models::{ + Account, EndUserAgreement, PaginatedResponse, Requisition, TokenResponse, TransactionsResponse, +}; use reqwest::Url; use reqwest_middleware::ClientWithMiddleware; use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{debug, instrument}; -use crate::models::{TokenResponse, PaginatedResponse, Requisition, Account, TransactionsResponse, EndUserAgreement}; #[derive(Error, Debug)] pub enum GoCardlessError { @@ -39,10 +41,17 @@ impl GoCardlessClient { Self::with_client(base_url, secret_id, secret_key, None) } - pub fn with_client(base_url: &str, secret_id: &str, secret_key: &str, client: Option) -> Result { + pub fn with_client( + base_url: &str, + secret_id: &str, + secret_key: &str, + client: Option, + ) -> Result { Ok(Self { base_url: Url::parse(base_url)?, - client: client.unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::new()).build()), + client: client.unwrap_or_else(|| { + reqwest_middleware::ClientBuilder::new(reqwest::Client::new()).build() + }), secret_id: secret_id.to_string(), secret_key: secret_key.to_string(), access_token: None, @@ -67,40 +76,47 @@ impl GoCardlessClient { }; debug!("Requesting new access token"); - let response = self.client.post(url) - .json(&body) - .send() - .await?; + let response = self.client.post(url).json(&body).send().await?; if !response.status().is_success() { let status = response.status(); let text = response.text().await?; - return Err(GoCardlessError::ApiError(format!("Token request failed {}: {}", status, text))); + return Err(GoCardlessError::ApiError(format!( + "Token request failed {}: {}", + status, text + ))); } let token_resp: TokenResponse = response.json().await?; self.access_token = Some(token_resp.access); - self.access_expires_at = Some(chrono::Utc::now() + chrono::Duration::seconds(token_resp.access_expires as i64)); + self.access_expires_at = + Some(chrono::Utc::now() + chrono::Duration::seconds(token_resp.access_expires as i64)); debug!("Access token obtained"); - + Ok(()) } #[instrument(skip(self))] - pub async fn get_requisitions(&self) -> Result, GoCardlessError> { + pub async fn get_requisitions( + &self, + ) -> Result, GoCardlessError> { let url = self.base_url.join("/api/v2/requisitions/")?; self.get_authenticated(url).await } #[instrument(skip(self))] - pub async fn get_agreements(&self) -> Result, GoCardlessError> { + pub async fn get_agreements( + &self, + ) -> Result, GoCardlessError> { let url = self.base_url.join("/api/v2/agreements/enduser/")?; self.get_authenticated(url).await } #[instrument(skip(self))] pub async fn get_agreement(&self, id: &str) -> Result { - let url = self.base_url.join(&format!("/api/v2/agreements/enduser/{}/", id))?; + let url = self + .base_url + .join(&format!("/api/v2/agreements/enduser/{}/", id))?; self.get_authenticated(url).await } @@ -132,9 +148,16 @@ impl GoCardlessClient { } #[instrument(skip(self))] - pub async fn get_transactions(&self, account_id: &str, date_from: Option<&str>, date_to: Option<&str>) -> Result { - let mut url = self.base_url.join(&format!("/api/v2/accounts/{}/transactions/", account_id))?; - + pub async fn get_transactions( + &self, + account_id: &str, + date_from: Option<&str>, + date_to: Option<&str>, + ) -> Result { + let mut url = self + .base_url + .join(&format!("/api/v2/accounts/{}/transactions/", account_id))?; + { let mut pairs = url.query_pairs_mut(); if let Some(from) = date_from { @@ -148,19 +171,29 @@ impl GoCardlessClient { self.get_authenticated(url).await } - async fn get_authenticated Deserialize<'de>>(&self, url: Url) -> Result { - let token = self.access_token.as_ref().ok_or(GoCardlessError::ApiError("No access token available. Call obtain_access_token() first.".into()))?; + async fn get_authenticated Deserialize<'de>>( + &self, + url: Url, + ) -> Result { + let token = self.access_token.as_ref().ok_or(GoCardlessError::ApiError( + "No access token available. Call obtain_access_token() first.".into(), + ))?; - let response = self.client.get(url) + let response = self + .client + .get(url) .bearer_auth(token) .header("accept", "application/json") .send() .await?; if !response.status().is_success() { - let status = response.status(); - let text = response.text().await?; - return Err(GoCardlessError::ApiError(format!("API request failed {}: {}", status, text))); + let status = response.status(); + let text = response.text().await?; + return Err(GoCardlessError::ApiError(format!( + "API request failed {}: {}", + status, text + ))); } let data = response.json().await?;