Completely replace implementation #1
13
AGENTS.md
13
AGENTS.md
@@ -191,3 +191,16 @@ mod tests {
|
|||||||
- **Error Context**: Provide context in error messages for debugging
|
- **Error Context**: Provide context in error messages for debugging
|
||||||
- **Metrics**: Consider adding metrics for sync operations
|
- **Metrics**: Consider adding metrics for sync operations
|
||||||
- **Log Levels**: Use appropriate log levels (debug, info, warn, error)
|
- **Log Levels**: Use appropriate log levels (debug, info, warn, error)
|
||||||
|
|
||||||
|
## Documentation Guidelines
|
||||||
|
|
||||||
|
### README.md
|
||||||
|
- **Keep High-Level**: Focus on user benefits and key features, not technical implementation details
|
||||||
|
- **User-Centric**: Describe what the tool does and why users would want it
|
||||||
|
- **Skip Implementation Details**: Avoid technical jargon, architecture specifics, or internal implementation that users don't need to know
|
||||||
|
- **Feature Descriptions**: Use concise, benefit-focused language (e.g., "Robust Error Handling" rather than "Implements EUA expiry detection with multiple requisition fallback")
|
||||||
|
|
||||||
|
### Technical Documentation
|
||||||
|
- **docs/architecture.md**: Detailed technical specifications, implementation details, and developer-focused content
|
||||||
|
- **specs/**: Implementation planning, API specifications, and historical context
|
||||||
|
- **Code Comments**: Use for implementation details and complex logic explanations
|
||||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -174,6 +174,7 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"task-local-extensions",
|
"task-local-extensions",
|
||||||
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
|||||||
105
README.md
105
README.md
@@ -2,84 +2,57 @@
|
|||||||
|
|
||||||
A robust command-line tool to synchronize bank transactions from GoCardless (formerly Nordigen) to Firefly III.
|
A robust command-line tool to synchronize bank transactions from GoCardless (formerly Nordigen) to Firefly III.
|
||||||
|
|
||||||
## Architecture
|
## ✨ Key Benefits
|
||||||
|
|
||||||
This project is a Rust Workspace consisting of:
|
- **Automatic Transaction Sync**: Keep your Firefly III finances up-to-date with your bank accounts
|
||||||
- `banks2ff`: The main CLI application (Hexagonal Architecture).
|
- **Multi-Currency Support**: Handles international transactions and foreign currencies correctly
|
||||||
- `gocardless-client`: A hand-crafted, strongly-typed library for the GoCardless Bank Account Data API.
|
- **Smart Duplicate Detection**: Avoids double-counting transactions automatically
|
||||||
- `firefly-client`: A hand-crafted, strongly-typed library for the Firefly III API.
|
- **Reliable Operation**: Continues working even when some accounts need attention
|
||||||
|
- **Safe Preview Mode**: Test changes before applying them to your finances
|
||||||
|
- **Rate Limit Aware**: Works within API limits to ensure consistent access
|
||||||
|
|
||||||
## Features
|
## 🚀 Quick Start
|
||||||
|
|
||||||
- **Multi-Currency Support**: Correctly handles foreign currency transactions by extracting exchange rate data.
|
### Prerequisites
|
||||||
- **Idempotency (Healer Mode)**:
|
|
||||||
- Detects duplicates using a windowed search (Date +/- 3 days, exact Amount).
|
|
||||||
- "Heals" historical transactions by updating them with the correct `external_id`.
|
|
||||||
- Skips transactions that already have a matching `external_id`.
|
|
||||||
- **Clean Architecture**: Decoupled core logic makes it reliable and testable.
|
|
||||||
- **Observability**: Structured logging via `tracing`.
|
|
||||||
- **Dry Run**: Preview changes without writing to Firefly III.
|
|
||||||
- **Rate Limit Protection**:
|
|
||||||
- Caches GoCardless account details to avoid unnecessary calls.
|
|
||||||
- Respects token expiry to minimize auth calls.
|
|
||||||
- Handles `429 Too Many Requests` gracefully by skipping affected accounts.
|
|
||||||
|
|
||||||
## Setup & Configuration
|
|
||||||
|
|
||||||
1. **Prerequisites**:
|
|
||||||
- Rust (latest stable)
|
- Rust (latest stable)
|
||||||
- An account with GoCardless Bank Account Data (get your `secret_id` and `secret_key`).
|
- GoCardless Bank Account Data account
|
||||||
- A running Firefly III instance (get your Personal Access Token).
|
- Running Firefly III instance
|
||||||
|
|
||||||
2. **Environment Variables**:
|
### Setup
|
||||||
Copy `env.example` to `.env` and fill in your details:
|
1. Copy environment template: `cp env.example .env`
|
||||||
|
2. Fill in your credentials in `.env`:
|
||||||
|
- `GOCARDLESS_ID`: Your GoCardless Secret ID
|
||||||
|
- `GOCARDLESS_KEY`: Your GoCardless Secret Key
|
||||||
|
- `FIREFLY_III_URL`: Your Firefly instance URL
|
||||||
|
- `FIREFLY_III_API_KEY`: Your Personal Access Token
|
||||||
|
|
||||||
|
### Usage
|
||||||
```bash
|
```bash
|
||||||
cp env.example .env
|
# Sync all accounts (automatic date range)
|
||||||
```
|
|
||||||
|
|
||||||
Required variables:
|
|
||||||
- `GOCARDLESS_ID`: Your GoCardless Secret ID.
|
|
||||||
- `GOCARDLESS_KEY`: Your GoCardless Secret Key.
|
|
||||||
- `FIREFLY_III_URL`: The base URL of your Firefly instance (e.g., `https://money.example.com`).
|
|
||||||
- `FIREFLY_III_API_KEY`: Your Personal Access Token.
|
|
||||||
|
|
||||||
Optional:
|
|
||||||
- `GOCARDLESS_URL`: Defaults to `https://bankaccountdata.gocardless.com`.
|
|
||||||
- `RUST_LOG`: Set log level (e.g., `info`, `debug`, `trace`).
|
|
||||||
|
|
||||||
## Testing
|
|
||||||
|
|
||||||
The project has a comprehensive test suite using `wiremock` for API clients and `mockall` for core logic.
|
|
||||||
|
|
||||||
To run all tests:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cargo test --workspace
|
|
||||||
```
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
To run the synchronization:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Run via cargo (defaults: Start = Last Firefly Date + 1, End = Yesterday)
|
|
||||||
cargo run -p banks2ff
|
cargo run -p banks2ff
|
||||||
|
|
||||||
# Dry Run (Read-only)
|
# Preview changes without saving
|
||||||
cargo run -p banks2ff -- --dry-run
|
cargo run -p banks2ff -- --dry-run
|
||||||
|
|
||||||
# Custom Date Range
|
# Sync specific date range
|
||||||
cargo run -p banks2ff -- --start 2023-01-01 --end 2023-01-31
|
cargo run -p banks2ff -- --start 2023-01-01 --end 2023-01-31
|
||||||
```
|
```
|
||||||
|
|
||||||
## How it works
|
## 📋 What It Does
|
||||||
|
|
||||||
1. **Fetch**: Retrieves active accounts from GoCardless (filtered by those present in Firefly III to save requests).
|
Banks2FF automatically:
|
||||||
2. **Match**: Resolves the destination account in Firefly III by matching the IBAN.
|
1. Connects to your bank accounts via GoCardless
|
||||||
3. **Sync Window**: Determines the start date automatically by finding the latest transaction in Firefly for that account.
|
2. Finds matching accounts in your Firefly III instance
|
||||||
4. **Process**: For each transaction:
|
3. Downloads new transactions since your last sync
|
||||||
- **Search**: Checks Firefly for an existing transaction (matching Amount and Date +/- 3 days).
|
4. Adds them to Firefly III (avoiding duplicates)
|
||||||
- **Heal**: If found but missing an `external_id`, it updates the transaction.
|
5. Handles errors gracefully - keeps working even if some accounts have issues
|
||||||
- **Skip**: If found and matches `external_id`, it skips.
|
|
||||||
- **Create**: If not found, it creates a new transaction.
|
## 🔧 Troubleshooting
|
||||||
|
|
||||||
|
- **Account not syncing?** Check that the IBAN matches between GoCardless and Firefly III
|
||||||
|
- **Missing transactions?** The tool syncs from the last transaction date forward
|
||||||
|
- **Rate limited?** The tool automatically handles API limits and retries appropriately
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
*For technical details, see [docs/architecture.md](docs/architecture.md)*
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ authors.workspace = true
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
anyhow = { workspace = true }
|
anyhow = { workspace = true }
|
||||||
|
thiserror = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
tracing-subscriber = { workspace = true }
|
tracing-subscriber = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use crate::core::models::{Account, BankTransaction};
|
|||||||
use crate::adapters::gocardless::mapper::map_transaction;
|
use crate::adapters::gocardless::mapper::map_transaction;
|
||||||
use crate::adapters::gocardless::cache::AccountCache;
|
use crate::adapters::gocardless::cache::AccountCache;
|
||||||
use gocardless_client::client::GoCardlessClient;
|
use gocardless_client::client::GoCardlessClient;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
@@ -53,6 +54,23 @@ impl TransactionSource for GoCardlessAdapter {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if agreement is expired
|
||||||
|
if let Some(agreement_id) = &req.agreement {
|
||||||
|
match client.is_agreement_expired(agreement_id).await {
|
||||||
|
Ok(true) => {
|
||||||
|
warn!("Skipping requisition {} - agreement {} has expired", req.id, agreement_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(false) => {
|
||||||
|
// Agreement is valid, proceed
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to check agreement {} expiry: {}. Skipping requisition.", agreement_id, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(req_accounts) = req.accounts {
|
if let Some(req_accounts) = req.accounts {
|
||||||
for acc_id in req_accounts {
|
for acc_id in req_accounts {
|
||||||
// 1. Check Cache
|
// 1. Check Cache
|
||||||
@@ -145,6 +163,11 @@ impl TransactionSource for GoCardlessAdapter {
|
|||||||
// Returning empty list allows other accounts to potentially proceed if limits are per-account (which GC says they are!)
|
// Returning empty list allows other accounts to potentially proceed if limits are per-account (which GC says they are!)
|
||||||
return Ok(vec![]);
|
return Ok(vec![]);
|
||||||
}
|
}
|
||||||
|
if err_str.contains("401") && (err_str.contains("expired") || err_str.contains("EUA")) {
|
||||||
|
warn!("EUA expired for account {}. Skipping.", account_id);
|
||||||
|
// Return empty list to skip this account gracefully
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
Err(e.into())
|
Err(e.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use rust_decimal::Decimal;
|
use rust_decimal::Decimal;
|
||||||
use chrono::NaiveDate;
|
use chrono::NaiveDate;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
#[derive(Clone, PartialEq)]
|
#[derive(Clone, PartialEq)]
|
||||||
pub struct BankTransaction {
|
pub struct BankTransaction {
|
||||||
@@ -104,3 +105,15 @@ mod tests {
|
|||||||
assert!(!debug_str.contains("DE1234567890"));
|
assert!(!debug_str.contains("DE1234567890"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum SyncError {
|
||||||
|
#[error("End User Agreement {agreement_id} has expired")]
|
||||||
|
AgreementExpired { agreement_id: String },
|
||||||
|
#[error("Account {account_id} skipped: {reason}")]
|
||||||
|
AccountSkipped { account_id: String, reason: String },
|
||||||
|
#[error("Source error: {0}")]
|
||||||
|
SourceError(anyhow::Error),
|
||||||
|
#[error("Destination error: {0}")]
|
||||||
|
DestinationError(anyhow::Error),
|
||||||
|
}
|
||||||
|
|||||||
@@ -21,6 +21,18 @@ pub trait TransactionSource: Send + Sync {
|
|||||||
async fn get_transactions(&self, account_id: &str, start: NaiveDate, end: NaiveDate) -> Result<Vec<BankTransaction>>;
|
async fn get_transactions(&self, account_id: &str, start: NaiveDate, end: NaiveDate) -> Result<Vec<BankTransaction>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Blanket implementation for references
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: TransactionSource> TransactionSource for &T {
|
||||||
|
async fn get_accounts(&self, wanted_ibans: Option<Vec<String>>) -> Result<Vec<Account>> {
|
||||||
|
(**self).get_accounts(wanted_ibans).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_transactions(&self, account_id: &str, start: NaiveDate, end: NaiveDate) -> Result<Vec<BankTransaction>> {
|
||||||
|
(**self).get_transactions(account_id, start, end).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct TransactionMatch {
|
pub struct TransactionMatch {
|
||||||
pub id: String,
|
pub id: String,
|
||||||
@@ -40,3 +52,31 @@ pub trait TransactionDestination: Send + Sync {
|
|||||||
async fn create_transaction(&self, account_id: &str, tx: &BankTransaction) -> Result<()>;
|
async fn create_transaction(&self, account_id: &str, tx: &BankTransaction) -> Result<()>;
|
||||||
async fn update_transaction_external_id(&self, id: &str, external_id: &str) -> Result<()>;
|
async fn update_transaction_external_id(&self, id: &str, external_id: &str) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Blanket implementation for references
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: TransactionDestination> TransactionDestination for &T {
|
||||||
|
async fn resolve_account_id(&self, iban: &str) -> Result<Option<String>> {
|
||||||
|
(**self).resolve_account_id(iban).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_active_account_ibans(&self) -> Result<Vec<String>> {
|
||||||
|
(**self).get_active_account_ibans().await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_last_transaction_date(&self, account_id: &str) -> Result<Option<NaiveDate>> {
|
||||||
|
(**self).get_last_transaction_date(account_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_transaction(&self, account_id: &str, transaction: &BankTransaction) -> Result<Option<TransactionMatch>> {
|
||||||
|
(**self).find_transaction(account_id, transaction).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_transaction(&self, account_id: &str, tx: &BankTransaction) -> Result<()> {
|
||||||
|
(**self).create_transaction(account_id, tx).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_transaction_external_id(&self, id: &str, external_id: &str) -> Result<()> {
|
||||||
|
(**self).update_transaction_external_id(id, external_id).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,41 +1,94 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use tracing::{info, warn, instrument};
|
use tracing::{info, warn, instrument};
|
||||||
use crate::core::ports::{TransactionSource, TransactionDestination, IngestResult};
|
use crate::core::ports::{IngestResult, TransactionSource, TransactionDestination};
|
||||||
|
use crate::core::models::{SyncError, Account};
|
||||||
use chrono::{NaiveDate, Local};
|
use chrono::{NaiveDate, Local};
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct SyncResult {
|
||||||
|
pub ingest: IngestResult,
|
||||||
|
pub accounts_processed: usize,
|
||||||
|
pub accounts_skipped_expired: usize,
|
||||||
|
pub accounts_skipped_errors: usize,
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(skip(source, destination))]
|
#[instrument(skip(source, destination))]
|
||||||
pub async fn run_sync<S, D>(
|
pub async fn run_sync(
|
||||||
source: &S,
|
source: impl TransactionSource,
|
||||||
destination: &D,
|
destination: impl TransactionDestination,
|
||||||
cli_start_date: Option<NaiveDate>,
|
cli_start_date: Option<NaiveDate>,
|
||||||
cli_end_date: Option<NaiveDate>,
|
cli_end_date: Option<NaiveDate>,
|
||||||
dry_run: bool,
|
dry_run: bool,
|
||||||
) -> Result<()>
|
) -> Result<SyncResult> {
|
||||||
where
|
|
||||||
S: TransactionSource,
|
|
||||||
D: TransactionDestination,
|
|
||||||
{
|
|
||||||
info!("Starting synchronization...");
|
info!("Starting synchronization...");
|
||||||
|
|
||||||
// Optimization: Get active Firefly IBANs first
|
// Optimization: Get active Firefly IBANs first
|
||||||
let wanted_ibans = destination.get_active_account_ibans().await?;
|
let wanted_ibans = destination.get_active_account_ibans().await.map_err(SyncError::DestinationError)?;
|
||||||
info!("Syncing {} active accounts from Firefly III", wanted_ibans.len());
|
info!("Syncing {} active accounts from Firefly III", wanted_ibans.len());
|
||||||
|
|
||||||
let accounts = source.get_accounts(Some(wanted_ibans)).await?;
|
let accounts = source.get_accounts(Some(wanted_ibans)).await.map_err(SyncError::SourceError)?;
|
||||||
|
info!("Found {} accounts from source", accounts.len());
|
||||||
|
|
||||||
// Default end date is Yesterday
|
// Default end date is Yesterday
|
||||||
let end_date = cli_end_date.unwrap_or_else(|| Local::now().date_naive() - chrono::Duration::days(1));
|
let end_date = cli_end_date.unwrap_or_else(|| Local::now().date_naive() - chrono::Duration::days(1));
|
||||||
|
|
||||||
|
let mut result = SyncResult::default();
|
||||||
|
|
||||||
for account in accounts {
|
for account in accounts {
|
||||||
let span = tracing::info_span!("sync_account", account_id = %account.id);
|
let span = tracing::info_span!("sync_account", account_id = %account.id);
|
||||||
let _enter = span.enter();
|
let _enter = span.enter();
|
||||||
|
|
||||||
info!("Processing account...");
|
info!("Processing account...");
|
||||||
|
|
||||||
let dest_id_opt = destination.resolve_account_id(&account.iban).await?;
|
// Process account with error handling
|
||||||
|
match process_single_account(&source, &destination, &account, cli_start_date, end_date, dry_run).await {
|
||||||
|
Ok(stats) => {
|
||||||
|
result.accounts_processed += 1;
|
||||||
|
result.ingest.created += stats.created;
|
||||||
|
result.ingest.healed += stats.healed;
|
||||||
|
result.ingest.duplicates += stats.duplicates;
|
||||||
|
result.ingest.errors += stats.errors;
|
||||||
|
info!("Account {} sync complete. Created: {}, Healed: {}, Duplicates: {}, Errors: {}",
|
||||||
|
account.id, stats.created, stats.healed, stats.duplicates, stats.errors);
|
||||||
|
}
|
||||||
|
Err(SyncError::AgreementExpired { agreement_id }) => {
|
||||||
|
result.accounts_skipped_expired += 1;
|
||||||
|
warn!("Account {} skipped - associated agreement {} has expired", account.id, agreement_id);
|
||||||
|
}
|
||||||
|
Err(SyncError::AccountSkipped { account_id, reason }) => {
|
||||||
|
result.accounts_skipped_errors += 1;
|
||||||
|
warn!("Account {} skipped: {}", account_id, reason);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
result.accounts_skipped_errors += 1;
|
||||||
|
warn!("Account {} failed with error: {}", account.id, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Synchronization finished. Processed: {}, Skipped (expired): {}, Skipped (errors): {}",
|
||||||
|
result.accounts_processed, result.accounts_skipped_expired, result.accounts_skipped_errors);
|
||||||
|
info!("Total transactions - Created: {}, Healed: {}, Duplicates: {}, Errors: {}",
|
||||||
|
result.ingest.created, result.ingest.healed, result.ingest.duplicates, result.ingest.errors);
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_single_account(
|
||||||
|
source: &impl TransactionSource,
|
||||||
|
destination: &impl TransactionDestination,
|
||||||
|
account: &Account,
|
||||||
|
cli_start_date: Option<NaiveDate>,
|
||||||
|
end_date: NaiveDate,
|
||||||
|
dry_run: bool,
|
||||||
|
) -> Result<IngestResult, SyncError> {
|
||||||
|
let dest_id_opt = destination.resolve_account_id(&account.iban).await.map_err(SyncError::DestinationError)?;
|
||||||
let Some(dest_id) = dest_id_opt else {
|
let Some(dest_id) = dest_id_opt else {
|
||||||
warn!("Account {} not found in destination. Skipping.", account.id);
|
return Err(SyncError::AccountSkipped {
|
||||||
continue;
|
account_id: account.id.clone(),
|
||||||
|
reason: "Not found in destination".to_string(),
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Resolved destination ID: {}", dest_id);
|
info!("Resolved destination ID: {}", dest_id);
|
||||||
@@ -45,7 +98,7 @@ where
|
|||||||
d
|
d
|
||||||
} else {
|
} else {
|
||||||
// Default: Latest transaction date + 1 day
|
// Default: Latest transaction date + 1 day
|
||||||
match destination.get_last_transaction_date(&dest_id).await? {
|
match destination.get_last_transaction_date(&dest_id).await.map_err(SyncError::DestinationError)? {
|
||||||
Some(last_date) => last_date + chrono::Duration::days(1),
|
Some(last_date) => last_date + chrono::Duration::days(1),
|
||||||
None => {
|
None => {
|
||||||
// If no transaction exists in Firefly, we assume this is a fresh sync.
|
// If no transaction exists in Firefly, we assume this is a fresh sync.
|
||||||
@@ -57,21 +110,27 @@ where
|
|||||||
|
|
||||||
if start_date > end_date {
|
if start_date > end_date {
|
||||||
info!("Start date {} is after end date {}. Nothing to sync.", start_date, end_date);
|
info!("Start date {} is after end date {}. Nothing to sync.", start_date, end_date);
|
||||||
continue;
|
return Ok(IngestResult::default());
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Syncing interval: {} to {}", start_date, end_date);
|
info!("Syncing interval: {} to {}", start_date, end_date);
|
||||||
|
|
||||||
// Optimization: Only use active accounts is already filtered in resolve_account_id
|
let transactions = match source.get_transactions(&account.id, start_date, end_date).await {
|
||||||
// However, GoCardless requisitions can expire.
|
Ok(txns) => txns,
|
||||||
// We should check if we can optimize the GoCardless fetching side.
|
Err(e) => {
|
||||||
// But currently get_transactions takes an account_id.
|
let err_str = e.to_string();
|
||||||
|
if err_str.contains("401") && (err_str.contains("expired") || err_str.contains("EUA")) {
|
||||||
let transactions = source.get_transactions(&account.id, start_date, end_date).await?;
|
return Err(SyncError::AgreementExpired {
|
||||||
|
agreement_id: "unknown".to_string(), // We don't have the agreement ID here
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return Err(SyncError::SourceError(e));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if transactions.is_empty() {
|
if transactions.is_empty() {
|
||||||
info!("No transactions found for period.");
|
info!("No transactions found for period.");
|
||||||
continue;
|
return Ok(IngestResult::default());
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Fetched {} transactions from source.", transactions.len());
|
info!("Fetched {} transactions from source.", transactions.len());
|
||||||
@@ -81,7 +140,7 @@ where
|
|||||||
// Healer Logic Loop
|
// Healer Logic Loop
|
||||||
for tx in transactions {
|
for tx in transactions {
|
||||||
// 1. Check if it exists
|
// 1. Check if it exists
|
||||||
match destination.find_transaction(&dest_id, &tx).await? {
|
match destination.find_transaction(&dest_id, &tx).await.map_err(SyncError::DestinationError)? {
|
||||||
Some(existing) => {
|
Some(existing) => {
|
||||||
if existing.has_external_id {
|
if existing.has_external_id {
|
||||||
// Already synced properly
|
// Already synced properly
|
||||||
@@ -127,12 +186,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Sync complete. Created: {}, Healed: {}, Duplicates: {}, Errors: {}",
|
Ok(stats)
|
||||||
stats.created, stats.healed, stats.duplicates, stats.errors);
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Synchronization finished.");
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -293,7 +347,7 @@ mod tests {
|
|||||||
dest.expect_create_transaction().never();
|
dest.expect_create_transaction().never();
|
||||||
dest.expect_update_transaction_external_id().never();
|
dest.expect_update_transaction_external_id().never();
|
||||||
|
|
||||||
let res = run_sync(&source, &dest, None, None, true).await;
|
let res = run_sync(source, dest, None, None, true).await;
|
||||||
assert!(res.is_ok());
|
assert!(res.is_ok());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -87,8 +87,14 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let destination = FireflyAdapter::new(ff_client);
|
let destination = FireflyAdapter::new(ff_client);
|
||||||
|
|
||||||
// Run
|
// Run
|
||||||
match run_sync(&source, &destination, args.start, args.end, args.dry_run).await {
|
match run_sync(source, destination, args.start, args.end, args.dry_run).await {
|
||||||
Ok(_) => info!("Sync completed successfully."),
|
Ok(result) => {
|
||||||
|
info!("Sync completed successfully.");
|
||||||
|
info!("Accounts processed: {}, skipped (expired): {}, skipped (errors): {}",
|
||||||
|
result.accounts_processed, result.accounts_skipped_expired, result.accounts_skipped_errors);
|
||||||
|
info!("Transactions - Created: {}, Healed: {}, Duplicates: {}, Errors: {}",
|
||||||
|
result.ingest.created, result.ingest.healed, result.ingest.duplicates, result.ingest.errors);
|
||||||
|
}
|
||||||
Err(e) => error!("Sync failed: {}", e),
|
Err(e) => error!("Sync failed: {}", e),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -58,16 +58,19 @@ Both clients are hand-crafted using `reqwest`:
|
|||||||
|
|
||||||
## Synchronization Process
|
## Synchronization Process
|
||||||
|
|
||||||
The "Healer" strategy ensures idempotency:
|
The "Healer" strategy ensures idempotency with robust error handling:
|
||||||
|
|
||||||
1. **Account Discovery**: Fetch active accounts from GoCardless
|
1. **Account Discovery**: Fetch active accounts from GoCardless (filtered by End User Agreement (EUA) validity)
|
||||||
2. **Account Matching**: Match GoCardless accounts to Firefly asset accounts by IBAN
|
2. **Agreement Validation**: Check EUA expiry status for each account's requisition
|
||||||
3. **Date Window**: Calculate sync range (Last Firefly transaction + 1 to Yesterday)
|
3. **Account Matching**: Match GoCardless accounts to Firefly asset accounts by IBAN
|
||||||
4. **Transaction Processing**:
|
4. **Error-Aware Processing**: Continue with valid accounts when some have expired agreements
|
||||||
|
5. **Date Window**: Calculate sync range (Last Firefly transaction + 1 to Yesterday)
|
||||||
|
6. **Transaction Processing** (with error recovery):
|
||||||
- **Search**: Look for existing transaction using windowed heuristic (date ± 3 days, exact amount)
|
- **Search**: Look for existing transaction using windowed heuristic (date ± 3 days, exact amount)
|
||||||
- **Heal**: If found without `external_id`, update with GoCardless transaction ID
|
- **Heal**: If found without `external_id`, update with GoCardless transaction ID
|
||||||
- **Skip**: If found with matching `external_id`, ignore
|
- **Skip**: If found with matching `external_id`, ignore
|
||||||
- **Create**: If not found, create new transaction in Firefly
|
- **Create**: If not found, create new transaction in Firefly
|
||||||
|
- **Error Handling**: Log issues but continue with other transactions/accounts
|
||||||
|
|
||||||
## Key Features
|
## Key Features
|
||||||
|
|
||||||
@@ -81,6 +84,13 @@ The "Healer" strategy ensures idempotency:
|
|||||||
- **Token Reuse**: Maintains tokens until expiry to minimize auth requests
|
- **Token Reuse**: Maintains tokens until expiry to minimize auth requests
|
||||||
- **Graceful Handling**: Continues sync for other accounts when encountering 429 errors
|
- **Graceful Handling**: Continues sync for other accounts when encountering 429 errors
|
||||||
|
|
||||||
|
### Agreement Expiry Handling
|
||||||
|
- **Proactive Validation**: Checks End User Agreement (EUA) expiry before making API calls to avoid unnecessary requests
|
||||||
|
- **Reactive Recovery**: Detects expired agreements from API 401 errors and skips affected accounts
|
||||||
|
- **Continued Operation**: Maintains partial sync success even when some accounts are inaccessible
|
||||||
|
- **User Feedback**: Provides detailed reporting on account status and re-authorization needs
|
||||||
|
- **Multiple Requisitions**: Supports accounts linked to multiple requisitions, using the most recent valid one
|
||||||
|
|
||||||
### Idempotency
|
### Idempotency
|
||||||
- GoCardless `transactionId` → Firefly `external_id` mapping
|
- GoCardless `transactionId` → Firefly `external_id` mapping
|
||||||
- Windowed duplicate detection prevents double-creation
|
- Windowed duplicate detection prevents double-creation
|
||||||
@@ -101,10 +111,11 @@ GoCardless API → GoCardlessAdapter → TransactionSource → SyncEngine → Tr
|
|||||||
|
|
||||||
## Error Handling
|
## Error Handling
|
||||||
|
|
||||||
- **Custom Errors**: `thiserror` for domain-specific error types
|
- **Custom Errors**: `thiserror` for domain-specific error types including End User Agreement (EUA) expiry (`SyncError::AgreementExpired`)
|
||||||
- **Propagation**: `anyhow` for error context across async boundaries
|
- **Propagation**: `anyhow` for error context across async boundaries
|
||||||
- **Graceful Degradation**: Rate limits and network issues don't crash entire sync
|
- **Graceful Degradation**: Rate limits, network issues, and expired agreements don't crash entire sync
|
||||||
- **Structured Logging**: `tracing` for observability and debugging
|
- **Partial Success**: Continues processing available accounts when some fail
|
||||||
|
- **Structured Logging**: `tracing` for observability and debugging with account-level context
|
||||||
|
|
||||||
## Configuration Management
|
## Configuration Management
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use reqwest_middleware::ClientWithMiddleware;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::{debug, instrument};
|
use tracing::{debug, instrument};
|
||||||
use crate::models::{TokenResponse, PaginatedResponse, Requisition, Account, TransactionsResponse};
|
use crate::models::{TokenResponse, PaginatedResponse, Requisition, Account, TransactionsResponse, EndUserAgreement};
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum GoCardlessError {
|
pub enum GoCardlessError {
|
||||||
@@ -92,6 +92,39 @@ impl GoCardlessClient {
|
|||||||
self.get_authenticated(url).await
|
self.get_authenticated(url).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
|
pub async fn get_agreements(&self) -> Result<PaginatedResponse<EndUserAgreement>, GoCardlessError> {
|
||||||
|
let url = self.base_url.join("/api/v2/agreements/enduser/")?;
|
||||||
|
self.get_authenticated(url).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
|
pub async fn get_agreement(&self, id: &str) -> Result<EndUserAgreement, GoCardlessError> {
|
||||||
|
let url = self.base_url.join(&format!("/api/v2/agreements/enduser/{}/", id))?;
|
||||||
|
self.get_authenticated(url).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
|
pub async fn is_agreement_expired(&self, agreement_id: &str) -> Result<bool, GoCardlessError> {
|
||||||
|
let agreement = self.get_agreement(agreement_id).await?;
|
||||||
|
|
||||||
|
// If not accepted, it's not valid
|
||||||
|
let Some(accepted_str) = agreement.accepted else {
|
||||||
|
return Ok(true);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Parse acceptance date
|
||||||
|
let accepted = chrono::DateTime::parse_from_rfc3339(&accepted_str)
|
||||||
|
.map_err(|e| GoCardlessError::ApiError(format!("Invalid date format: {}", e)))?
|
||||||
|
.with_timezone(&chrono::Utc);
|
||||||
|
|
||||||
|
// Get validity period (default 90 days)
|
||||||
|
let valid_days = agreement.access_valid_for_days.unwrap_or(90) as i64;
|
||||||
|
let expiry = accepted + chrono::Duration::days(valid_days);
|
||||||
|
|
||||||
|
Ok(chrono::Utc::now() > expiry)
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
pub async fn get_account(&self, id: &str) -> Result<Account, GoCardlessError> {
|
pub async fn get_account(&self, id: &str) -> Result<Account, GoCardlessError> {
|
||||||
let url = self.base_url.join(&format!("/api/v2/accounts/{}/", id))?;
|
let url = self.base_url.join(&format!("/api/v2/accounts/{}/", id))?;
|
||||||
|
|||||||
@@ -14,6 +14,16 @@ pub struct Requisition {
|
|||||||
pub status: String,
|
pub status: String,
|
||||||
pub accounts: Option<Vec<String>>,
|
pub accounts: Option<Vec<String>>,
|
||||||
pub reference: Option<String>,
|
pub reference: Option<String>,
|
||||||
|
pub agreement: Option<String>, // EUA ID associated with this requisition
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct EndUserAgreement {
|
||||||
|
pub id: String,
|
||||||
|
pub created: Option<String>,
|
||||||
|
pub accepted: Option<String>, // When user accepted the agreement
|
||||||
|
pub access_valid_for_days: Option<i32>, // Validity period (default 90)
|
||||||
|
pub institution_id: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ Refactor the `bank2ff` application from a prototype script into a robust, testab
|
|||||||
- **Healer Strategy**: Detect and heal historical duplicates that lack external IDs.
|
- **Healer Strategy**: Detect and heal historical duplicates that lack external IDs.
|
||||||
- **Dry Run**: Safe mode to preview changes.
|
- **Dry Run**: Safe mode to preview changes.
|
||||||
- **Rate Limit Handling**: Smart caching and graceful skipping to respect 4 requests/day limits.
|
- **Rate Limit Handling**: Smart caching and graceful skipping to respect 4 requests/day limits.
|
||||||
|
- **Robust Agreement Handling**: Gracefully handle expired GoCardless EUAs without failing entire sync.
|
||||||
|
|
||||||
## 2. Architecture
|
## 2. Architecture
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user