syncer: simply return the changes

this is easier to test. the initial code wanted to make the changes a stream.
but this never happenned since the need never arose
This commit is contained in:
Girish Ramakrishnan
2025-02-13 17:05:35 +01:00
parent 6a303ae50a
commit b94ce542c3
3 changed files with 181 additions and 271 deletions

View File

@@ -77,6 +77,29 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) {
await uploader.finish();
}
async function processChange(change, backupConfig, remotePath, dataLayout, progressCallback) {
debug('sync: processing task: %j', change);
// the empty task.path is special to signify the directory
const destPath = change.path && backupConfig.encryptedFilenames ? hush.encryptFilePath(change.path, backupConfig.encryption) : change.path;
const backupFilePath = path.join(getBackupFilePath(backupConfig, remotePath), destPath);
if (change.operation === 'removedir') {
debug(`Removing directory ${backupFilePath}`);
await storage.api(backupConfig.provider).removeDir(backupConfig, backupFilePath, progressCallback);
} else if (change.operation === 'remove') {
debug(`Removing ${backupFilePath}`);
await storage.api(backupConfig.provider).remove(backupConfig, backupFilePath);
} else if (change.operation === 'add') {
await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => {
progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
debug(`Adding ${change.path} position ${change.position} try ${retryCount}`);
const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath);
await addFile(dataLayout.toLocalPath('./' + change.path), backupConfig.encryption, uploader, progressCallback);
});
}
}
async function sync(backupConfig, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof remotePath, 'string');
@@ -86,28 +109,17 @@ async function sync(backupConfig, remotePath, dataLayout, progressCallback) {
// the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB
const concurrency = backupConfig.limits?.syncConcurrency || (backupConfig.provider === 's3' ? 20 : 10);
await syncer.sync(dataLayout, async function processTask(task) {
debug('sync: processing task: %j', task);
// the empty task.path is special to signify the directory
const destPath = task.path && backupConfig.encryptedFilenames ? hush.encryptFilePath(task.path, backupConfig.encryption) : task.path;
const backupFilePath = path.join(getBackupFilePath(backupConfig, remotePath), destPath);
const changes = await syncer.sync(dataLayout);
debug(`sync: processing ${changes.delQueue.length} deletes and ${changes.addQueue.length} additions`);
if (task.operation === 'removedir') {
debug(`Removing directory ${backupFilePath}`);
await storage.api(backupConfig.provider).removeDir(backupConfig, backupFilePath, progressCallback);
} else if (task.operation === 'remove') {
debug(`Removing ${backupFilePath}`);
await storage.api(backupConfig.provider).remove(backupConfig, backupFilePath);
} else if (task.operation === 'add') {
await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => {
progressCallback({ message: `Adding ${task.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
debug(`Adding ${task.path} position ${task.position} try ${retryCount}`);
const [delError] = await safe(async.eachLimit(changes.delQueue, concurrency, async (task) => await processChange(task, backupConfig, remotePath, dataLayout, progressCallback)));
debug('sync: done processing deletes. error: %o', delError);
const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath);
await addFile(dataLayout.toLocalPath('./' + task.path), backupConfig.encryption, uploader, progressCallback);
});
}
}, concurrency);
const [addError] = await safe(async.eachLimit(changes.addQueue, concurrency, async (task) => await processChange(task, backupConfig, remotePath, dataLayout, progressCallback)));
debug('sync: done processing adds. error: %o', addError);
if (addError) throw addError;
await syncer.finalize(changes);
}
// this is not part of 'snapshotting' because we need root access to traverse