diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index 699b73880..c24d26175 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -23,6 +23,7 @@ const assert = require('assert'), paths = require('../paths.js'), ProgressStream = require('../progress-stream.js'), promiseRetry = require('../promise-retry.js'), + { Readable } = require('stream'), safe = require('safetydance'), shell = require('../shell.js')('backupformat/rsync'), stream = require('stream/promises'), @@ -50,22 +51,25 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong }); + const hash = crypto.createHash('sha256'); + let pipeline = null; if (encryption) { const encryptStream = new EncryptStream(encryption); - pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, uploader.stream)); + pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, uploader.stream)); } else { - pipeline = safe(stream.pipeline(sourceStream, ps, uploader.stream)); + pipeline = safe(stream.pipeline(sourceStream, ps, hash, uploader.stream)); } const [error] = await safe(pipeline); - if (error && error.message.includes('ENOENT')) { // ignore error if file disappears - } - - if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); + if (error && !error.message.includes('ENOENT')) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); // ignore error if file disappears // debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`); await uploader.finish(); + return { + stats: ps.stats(), + integrity: { size: ps.stats().transferred, sha256: hash.digest('hex') } + }; } async function sync(backupTarget, remotePath, dataLayout, progressCallback) { @@ -76,10 +80,10 @@ async function sync(backupTarget, remotePath, dataLayout, progressCallback) { // the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB const concurrency = backupTarget.limits?.syncConcurrency || (backupTarget.provider === 's3' ? 20 : 10); - const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupTarget.id, `${dataLayout.getBasename()}.sync.cache`); - const changes = await syncer.sync(dataLayout, { cacheFile }); - debug(`sync: processing ${changes.delQueue.length} deletes and ${changes.addQueue.length} additions`); + const { delQueue, addQueue, integrityMap } = await syncer.sync(dataLayout, cacheFile); + debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`); + const aggredgatedStats = { added: addQueue.length, deleted: delQueue.length, size: 0, startTime: Date.now() }; async function processSyncerChange(change) { debug('sync: processing task: %j', change); @@ -99,20 +103,27 @@ async function sync(backupTarget, remotePath, dataLayout, progressCallback) { debug(`Adding ${change.path} position ${change.position} try ${retryCount}`); const uploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, fullPath); - await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback); + const { stats, integrity } = await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback); + integrityMap.set(destPath, integrity); + aggredgatedStats.size += stats.size; }); } } - const [delError] = await safe(async.eachLimit(changes.delQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback))); + const [delError] = await safe(async.eachLimit(delQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback))); debug('sync: done processing deletes. error: %o', delError); if (delError) throw delError; - const [addError] = await safe(async.eachLimit(changes.addQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback))); + const [addError] = await safe(async.eachLimit(addQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback))); debug('sync: done processing adds. error: %o', addError); if (addError) throw addError; - await syncer.finalize(changes); + await syncer.finalize(cacheFile); + + return { + stats: aggredgatedStats, + integrity: [...integrityMap.entries()].sort(([a], [b]) => a < b) // for readability, order the entries + }; } // this is not part of 'snapshotting' because we need root access to traverse @@ -271,7 +282,15 @@ async function upload(backupTarget, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof progressCallback, 'function'); await saveFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`); - await sync(backupTarget, remotePath, dataLayout, progressCallback); + const { stats, integrity } = await sync(backupTarget, remotePath, dataLayout, progressCallback); + const integrityDataJsonString = JSON.stringify(integrity, null, 4); + const integrityDataStream = Readable.from(integrityDataJsonString); + const integrityUploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, `${remotePath}.checksum`); + await stream.pipeline(integrityDataStream, integrityUploader.stream); + await integrityUploader.finish(); + + const signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey); + return { stats, integrity: { signature } }; } function getFileExtension(encryption) { diff --git a/src/backupformat/tgz.js b/src/backupformat/tgz.js index f7301ceb7..11b9a2ee1 100644 --- a/src/backupformat/tgz.js +++ b/src/backupformat/tgz.js @@ -257,15 +257,15 @@ async function upload(backupTarget, remotePath, dataLayout, progressCallback) { const { stats, integrity } = await tarPack(dataLayout, backupTarget.encryption, uploader, progressCallback); const integrityMap = new Map(); - integrityMap.set(path.basename(remotePath), ...integrity); + integrityMap.set(path.basename(remotePath), integrity); const integrityDataJsonString = JSON.stringify([...integrityMap], null, 4); const integrityDataStream = Readable.from(integrityDataJsonString); const integrityUploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, `${remotePath}.checksum`); await stream.pipeline(integrityDataStream, integrityUploader.stream); await integrityUploader.finish(); - integrity.signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey); - return { stats, integrity }; + const signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey); + return { stats, integrity: { signature } }; }); } diff --git a/src/syncer.js b/src/syncer.js index 4f882b4af..7c30dbf61 100644 --- a/src/syncer.js +++ b/src/syncer.js @@ -62,15 +62,14 @@ function ISFILE(x) { return (x & fs.constants.S_IFREG) === fs.constants.S_IFREG; } -async function sync(dataLayout, options) { +async function sync(dataLayout, cacheFile) { assert(dataLayout instanceof DataLayout, 'Expecting dataLayout to be a DataLayout'); - assert.strictEqual(typeof options, 'object'); + assert.strictEqual(typeof cacheFile, 'string'); const addQueue = [], delQueue = []; // separate queues. we have to process the del first and then the add let curCacheIndex = 0; - - const cacheFile = options.cacheFile, newCacheFile = `${options.cacheFile}.new`; - + const integrityMap = new Map(); + const newCacheFile = `${cacheFile}.new`; let cache = []; // if cache is missing or if we crashed/errored in previous run, start out empty. TODO: do a remote listDir and rebuild @@ -119,6 +118,7 @@ async function sync(dataLayout, options) { const cachePath = curCacheIndex === cache.length ? null : cache[curCacheIndex].path; const cacheStat = curCacheIndex === cache.length ? null : cache[curCacheIndex].stat; + integrityMap.set(entryPath, curCacheIndex === cache.length ? null : cache[curCacheIndex].integrity); if (cachePath === null || cachePath > entryPath) { // new files appeared if (entryStat.isDirectory()) { @@ -155,15 +155,14 @@ async function sync(dataLayout, options) { return { delQueue, addQueue, - cacheFile, - newCacheFile + integrityMap }; } -async function finalize(changes) { - assert.strictEqual(typeof changes, 'object'); +async function finalize(cacheFile) { + assert.strictEqual(typeof cacheFile, 'string'); - safe.fs.unlinkSync(changes.cacheFile); - - if (!safe.fs.renameSync(changes.newCacheFile, changes.cacheFile)) debug('Unable to save new cache file'); + const newCacheFile = `${cacheFile}.new`; + safe.fs.unlinkSync(cacheFile); + if (!safe.fs.renameSync(newCacheFile, cacheFile)) debug('Unable to save new cache file'); } diff --git a/src/test/syncer-test.js b/src/test/syncer-test.js index 2fa27d1ec..16e3eb7cd 100644 --- a/src/test/syncer-test.js +++ b/src/test/syncer-test.js @@ -24,8 +24,8 @@ describe('Syncer', function () { }); async function getChanges(dataLayout) { - const changes = await syncer.sync(dataLayout, { cacheFile: gCacheFile }); - syncer.finalize(changes); + const changes = await syncer.sync(dataLayout, gCacheFile); + syncer.finalize(gCacheFile); return changes.delQueue.concat(changes.addQueue); }