Files
banks2ff/banks2ff/src/adapters/gocardless/client.rs
Jacob Kiers b85c366176 feat: implement account linking and management system
Add comprehensive account linking functionality to automatically match bank accounts to Firefly III accounts, with manual override options. This includes:

- New LinkStore module for persistent storage of account links with auto-linking based on IBAN matching
- Extended adapter traits with inspection methods (list_accounts, get_account_status, etc.) and discover_accounts for account discovery
- Integration of linking into sync logic to automatically discover and link accounts before syncing transactions
- CLI commands for managing account links (list, create, etc.)
- Updated README with new features and usage examples

This enables users to easily manage account mappings between sources and destinations, reducing manual configuration and improving sync reliability.
2025-11-27 21:59:29 +01:00

385 lines
14 KiB
Rust

use crate::adapters::gocardless::cache::AccountCache;
use crate::adapters::gocardless::mapper::map_transaction;
use crate::adapters::gocardless::transaction_cache::AccountTransactionCache;
use crate::core::models::{
Account, AccountStatus, AccountSummary, BankTransaction, CacheInfo, TransactionInfo,
};
use crate::core::ports::TransactionSource;
use anyhow::Result;
use async_trait::async_trait;
use chrono::NaiveDate;
use gocardless_client::client::GoCardlessClient;
use tracing::{debug, info, instrument, warn};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct GoCardlessAdapter {
client: Arc<Mutex<GoCardlessClient>>,
cache: Arc<Mutex<AccountCache>>,
transaction_caches: Arc<Mutex<HashMap<String, AccountTransactionCache>>>,
}
impl GoCardlessAdapter {
pub fn new(client: GoCardlessClient) -> Self {
Self {
client: Arc::new(Mutex::new(client)),
cache: Arc::new(Mutex::new(AccountCache::load())),
transaction_caches: Arc::new(Mutex::new(HashMap::new())),
}
}
}
#[async_trait]
impl TransactionSource for GoCardlessAdapter {
#[instrument(skip(self))]
async fn get_accounts(&self, wanted_ibans: Option<Vec<String>>) -> Result<Vec<Account>> {
let mut client = self.client.lock().await;
let mut cache = self.cache.lock().await;
// Ensure token
client.obtain_access_token().await?;
let requisitions = client.get_requisitions().await?;
let mut accounts = Vec::new();
// Build a hashset of wanted IBANs if provided, for faster lookup
let wanted_set = wanted_ibans.map(|list| {
list.into_iter()
.map(|i| i.replace(" ", ""))
.collect::<std::collections::HashSet<_>>()
});
let mut found_count = 0;
let target_count = wanted_set.as_ref().map(|s| s.len()).unwrap_or(0);
for req in requisitions.results {
// Optimization: Only process Linked requisitions to avoid 401/403 on expired ones
if req.status != "LN" {
continue;
}
// Check if agreement is expired
if let Some(agreement_id) = &req.agreement {
match client.is_agreement_expired(agreement_id).await {
Ok(true) => {
debug!(
"Skipping requisition {} - agreement {} has expired",
req.id, agreement_id
);
continue;
}
Ok(false) => {
// Agreement is valid, proceed
}
Err(e) => {
warn!(
"Failed to check agreement {} expiry: {}. Skipping requisition.",
agreement_id, e
);
continue;
}
}
}
if let Some(req_accounts) = req.accounts {
for acc_id in req_accounts {
// 1. Check Cache
let mut iban_opt = cache.get_iban(&acc_id);
// 2. Fetch if missing
if iban_opt.is_none() {
match client.get_account(&acc_id).await {
Ok(details) => {
let new_iban = details.iban.unwrap_or_default();
cache.insert(acc_id.clone(), new_iban.clone());
cache.save();
iban_opt = Some(new_iban);
}
Err(e) => {
// If rate limit hit here, we might want to skip this account and continue?
// But get_account is critical to identify the account.
// If we fail here, we can't match.
warn!("Failed to fetch details for account {}: {}", acc_id, e);
continue;
}
}
}
let iban = iban_opt.unwrap_or_default();
let mut keep = true;
if let Some(ref wanted) = wanted_set {
if !wanted.contains(&iban.replace(" ", "")) {
keep = false;
} else {
found_count += 1;
}
}
if keep {
accounts.push(Account {
id: acc_id,
iban,
currency: "EUR".to_string(),
});
}
// Optimization: Stop if we found all wanted accounts
if wanted_set.is_some() && found_count >= target_count && target_count > 0 {
info!(
"Found all {} wanted accounts. Stopping search.",
target_count
);
return Ok(accounts);
}
}
}
}
info!("Found {} matching accounts in GoCardless", accounts.len());
Ok(accounts)
}
#[instrument(skip(self))]
async fn get_transactions(
&self,
account_id: &str,
start: NaiveDate,
end: NaiveDate,
) -> Result<Vec<BankTransaction>> {
let mut client = self.client.lock().await;
client.obtain_access_token().await?;
// Load or get transaction cache
let mut caches = self.transaction_caches.lock().await;
let cache = caches.entry(account_id.to_string()).or_insert_with(|| {
AccountTransactionCache::load(account_id).unwrap_or_else(|_| AccountTransactionCache {
account_id: account_id.to_string(),
ranges: Vec::new(),
})
});
// Get cached transactions
let mut raw_transactions = cache.get_cached_transactions(start, end);
// Get uncovered ranges
let uncovered_ranges = cache.get_uncovered_ranges(start, end);
// Fetch missing ranges
for (range_start, range_end) in uncovered_ranges {
let response_result = client
.get_transactions(
account_id,
Some(&range_start.to_string()),
Some(&range_end.to_string()),
)
.await;
match response_result {
Ok(response) => {
let raw_txs = response.transactions.booked.clone();
raw_transactions.extend(raw_txs.clone());
cache.store_transactions(range_start, range_end, raw_txs);
info!(
"Fetched {} transactions for account {} in range {}-{}",
response.transactions.booked.len(),
account_id,
range_start,
range_end
);
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("429") {
warn!(
"Rate limit reached for account {} in range {}-{}. Skipping.",
account_id, range_start, range_end
);
continue;
}
if err_str.contains("401")
&& (err_str.contains("expired") || err_str.contains("EUA"))
{
debug!(
"EUA expired for account {} in range {}-{}. Skipping.",
account_id, range_start, range_end
);
continue;
}
return Err(e.into());
}
}
}
// Save cache
cache.save()?;
// Map to BankTransaction
let mut transactions = Vec::new();
for tx in raw_transactions {
match map_transaction(tx) {
Ok(t) => transactions.push(t),
Err(e) => tracing::error!("Failed to map transaction: {}", e),
}
}
info!(
"Total {} transactions for account {} in range {}-{}",
transactions.len(),
account_id,
start,
end
);
Ok(transactions)
}
#[instrument(skip(self))]
async fn list_accounts(&self) -> Result<Vec<AccountSummary>> {
let mut client = self.client.lock().await;
let mut cache = self.cache.lock().await;
client.obtain_access_token().await?;
let requisitions = client.get_requisitions().await?;
let mut summaries = Vec::new();
for req in requisitions.results {
if req.status != "LN" {
continue;
}
if let Some(agreement_id) = &req.agreement {
if client.is_agreement_expired(agreement_id).await? {
continue;
}
}
if let Some(req_accounts) = req.accounts {
for acc_id in req_accounts {
let iban = if let Some(iban) = cache.get_iban(&acc_id) {
iban
} else {
// Fetch if not cached
match client.get_account(&acc_id).await {
Ok(details) => {
let iban = details.iban.unwrap_or_default();
cache.insert(acc_id.clone(), iban.clone());
cache.save();
iban
}
Err(_) => "Unknown".to_string(),
}
};
summaries.push(AccountSummary {
id: acc_id,
iban,
currency: "EUR".to_string(), // Assuming EUR for now
status: "linked".to_string(),
});
}
}
}
Ok(summaries)
}
#[instrument(skip(self))]
async fn get_account_status(&self) -> Result<Vec<AccountStatus>> {
let caches = self.transaction_caches.lock().await;
let mut statuses = Vec::new();
for (account_id, cache) in caches.iter() {
let iban = self
.cache
.lock()
.await
.get_iban(account_id)
.unwrap_or_else(|| "Unknown".to_string());
let transaction_count = cache.ranges.iter().map(|r| r.transactions.len()).sum();
let last_sync_date = cache.ranges.iter().map(|r| r.end_date).max();
statuses.push(AccountStatus {
account_id: account_id.clone(),
iban,
last_sync_date,
transaction_count,
status: if transaction_count > 0 {
"synced"
} else {
"pending"
}
.to_string(),
});
}
Ok(statuses)
}
#[instrument(skip(self))]
async fn get_transaction_info(&self, account_id: &str) -> Result<TransactionInfo> {
let caches = self.transaction_caches.lock().await;
if let Some(cache) = caches.get(account_id) {
let total_count = cache.ranges.iter().map(|r| r.transactions.len()).sum();
let date_range = if cache.ranges.is_empty() {
None
} else {
let min_date = cache.ranges.iter().map(|r| r.start_date).min();
let max_date = cache.ranges.iter().map(|r| r.end_date).max();
min_date.and_then(|min| max_date.map(|max| (min, max)))
};
let last_updated = cache.ranges.iter().map(|r| r.end_date).max();
Ok(TransactionInfo {
account_id: account_id.to_string(),
total_count,
date_range,
last_updated,
})
} else {
Ok(TransactionInfo {
account_id: account_id.to_string(),
total_count: 0,
date_range: None,
last_updated: None,
})
}
}
#[instrument(skip(self))]
async fn get_cache_info(&self) -> Result<Vec<CacheInfo>> {
let mut infos = Vec::new();
// Account cache
let account_cache = self.cache.lock().await;
infos.push(CacheInfo {
account_id: None,
cache_type: "account".to_string(),
entry_count: account_cache.accounts.len(),
total_size_bytes: 0, // Not tracking size
last_updated: None, // Not tracking
});
// Transaction caches
let transaction_caches = self.transaction_caches.lock().await;
for (account_id, cache) in transaction_caches.iter() {
infos.push(CacheInfo {
account_id: Some(account_id.clone()),
cache_type: "transaction".to_string(),
entry_count: cache.ranges.len(),
total_size_bytes: 0, // Not tracking
last_updated: cache.ranges.iter().map(|r| r.end_date).max(),
});
}
Ok(infos)
}
#[instrument(skip(self))]
async fn discover_accounts(&self) -> Result<Vec<Account>> {
self.get_accounts(None).await
}
}