rsync: add integrity information

This commit is contained in:
Girish Ramakrishnan
2025-08-13 09:39:36 +05:30
parent 28ac9e153e
commit 2fabfbe8f6
4 changed files with 49 additions and 31 deletions

View File

@@ -23,6 +23,7 @@ const assert = require('assert'),
paths = require('../paths.js'),
ProgressStream = require('../progress-stream.js'),
promiseRetry = require('../promise-retry.js'),
{ Readable } = require('stream'),
safe = require('safetydance'),
shell = require('../shell.js')('backupformat/rsync'),
stream = require('stream/promises'),
@@ -50,22 +51,25 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) {
progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong
});
const hash = crypto.createHash('sha256');
let pipeline = null;
if (encryption) {
const encryptStream = new EncryptStream(encryption);
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, uploader.stream));
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, uploader.stream));
} else {
pipeline = safe(stream.pipeline(sourceStream, ps, uploader.stream));
pipeline = safe(stream.pipeline(sourceStream, ps, hash, uploader.stream));
}
const [error] = await safe(pipeline);
if (error && error.message.includes('ENOENT')) { // ignore error if file disappears
}
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`);
if (error && !error.message.includes('ENOENT')) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); // ignore error if file disappears
// debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`);
await uploader.finish();
return {
stats: ps.stats(),
integrity: { size: ps.stats().transferred, sha256: hash.digest('hex') }
};
}
async function sync(backupTarget, remotePath, dataLayout, progressCallback) {
@@ -76,10 +80,10 @@ async function sync(backupTarget, remotePath, dataLayout, progressCallback) {
// the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB
const concurrency = backupTarget.limits?.syncConcurrency || (backupTarget.provider === 's3' ? 20 : 10);
const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupTarget.id, `${dataLayout.getBasename()}.sync.cache`);
const changes = await syncer.sync(dataLayout, { cacheFile });
debug(`sync: processing ${changes.delQueue.length} deletes and ${changes.addQueue.length} additions`);
const { delQueue, addQueue, integrityMap } = await syncer.sync(dataLayout, cacheFile);
debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`);
const aggredgatedStats = { added: addQueue.length, deleted: delQueue.length, size: 0, startTime: Date.now() };
async function processSyncerChange(change) {
debug('sync: processing task: %j', change);
@@ -99,20 +103,27 @@ async function sync(backupTarget, remotePath, dataLayout, progressCallback) {
debug(`Adding ${change.path} position ${change.position} try ${retryCount}`);
const uploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, fullPath);
await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback);
const { stats, integrity } = await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback);
integrityMap.set(destPath, integrity);
aggredgatedStats.size += stats.size;
});
}
}
const [delError] = await safe(async.eachLimit(changes.delQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback)));
const [delError] = await safe(async.eachLimit(delQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback)));
debug('sync: done processing deletes. error: %o', delError);
if (delError) throw delError;
const [addError] = await safe(async.eachLimit(changes.addQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback)));
const [addError] = await safe(async.eachLimit(addQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback)));
debug('sync: done processing adds. error: %o', addError);
if (addError) throw addError;
await syncer.finalize(changes);
await syncer.finalize(cacheFile);
return {
stats: aggredgatedStats,
integrity: [...integrityMap.entries()].sort(([a], [b]) => a < b) // for readability, order the entries
};
}
// this is not part of 'snapshotting' because we need root access to traverse
@@ -271,7 +282,15 @@ async function upload(backupTarget, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof progressCallback, 'function');
await saveFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`);
await sync(backupTarget, remotePath, dataLayout, progressCallback);
const { stats, integrity } = await sync(backupTarget, remotePath, dataLayout, progressCallback);
const integrityDataJsonString = JSON.stringify(integrity, null, 4);
const integrityDataStream = Readable.from(integrityDataJsonString);
const integrityUploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, `${remotePath}.checksum`);
await stream.pipeline(integrityDataStream, integrityUploader.stream);
await integrityUploader.finish();
const signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey);
return { stats, integrity: { signature } };
}
function getFileExtension(encryption) {

View File

@@ -257,15 +257,15 @@ async function upload(backupTarget, remotePath, dataLayout, progressCallback) {
const { stats, integrity } = await tarPack(dataLayout, backupTarget.encryption, uploader, progressCallback);
const integrityMap = new Map();
integrityMap.set(path.basename(remotePath), ...integrity);
integrityMap.set(path.basename(remotePath), integrity);
const integrityDataJsonString = JSON.stringify([...integrityMap], null, 4);
const integrityDataStream = Readable.from(integrityDataJsonString);
const integrityUploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, `${remotePath}.checksum`);
await stream.pipeline(integrityDataStream, integrityUploader.stream);
await integrityUploader.finish();
integrity.signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey);
return { stats, integrity };
const signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey);
return { stats, integrity: { signature } };
});
}