diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index 30fbd00d5..226fac13f 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -85,7 +85,12 @@ async function sync(backupTarget, remotePath, dataLayout, progressCallback) { const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupTarget.id, `${dataLayout.getBasename()}.sync.cache`); const { delQueue, addQueue, integrityMap } = await syncer.sync(dataLayout, cacheFile); debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`); - const aggredgatedStats = { added: addQueue.length, deleted: delQueue.length, size: 0, startTime: Date.now() }; + const aggregatedStats = { + size: [...integrityMap.values()].reduce((sum, { size }) => sum + size, 0), + fileCount: integrityMap.size + addQueue.length, + startTime: Date.now(), + totalMsecs: 0 + }; async function processSyncerChange(change) { debug('sync: processing task: %j', change); @@ -105,9 +110,9 @@ async function sync(backupTarget, remotePath, dataLayout, progressCallback) { debug(`Adding ${change.path} position ${change.position} try ${retryCount}`); const uploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, fullPath); - const { stats, integrity } = await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback); + const { integrity } = await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback); integrityMap.set(destPath, integrity); - aggredgatedStats.size += stats.size; + aggregatedStats.size += integrity.size; }); } } @@ -120,10 +125,10 @@ async function sync(backupTarget, remotePath, dataLayout, progressCallback) { debug('sync: done processing adds. error: %o', addError); if (addError) throw addError; - await syncer.finalize(cacheFile); + await syncer.finalize(integrityMap, cacheFile); return { - stats: aggredgatedStats, + stats: { ...aggregatedStats, totalMsecs: Date.now()-aggregatedStats.startTime }, integrity: [...integrityMap.entries()].sort(([a], [b]) => a < b) // for readability, order the entries }; } diff --git a/src/progress-stream.js b/src/progress-stream.js index 7a1f3efd6..237f1680a 100644 --- a/src/progress-stream.js +++ b/src/progress-stream.js @@ -22,7 +22,7 @@ class ProgressStream extends TransformStream { } stats() { - const totalMsecs = (Date.now() - this.#startTime)/1000; + const totalMsecs = Date.now() - this.#startTime; return { startTime: this.#startTime, totalMsecs, transferred: this.#transferred }; } diff --git a/src/syncer.js b/src/syncer.js index 7c30dbf61..c4abcdc40 100644 --- a/src/syncer.js +++ b/src/syncer.js @@ -6,7 +6,9 @@ const assert = require('assert'), debug = require('debug')('box:syncer'), fs = require('fs'), path = require('path'), - safe = require('safetydance'); + readline = require('node:readline'), + safe = require('safetydance'), + util = require('util'); exports = module.exports = { sync, @@ -17,7 +19,7 @@ function readCache(cacheFile) { assert.strictEqual(typeof cacheFile, 'string'); const cache = safe.fs.readFileSync(cacheFile, 'utf8'); - if (!cache) return [ ]; + if (!cache) return []; const result = cache.trim().split('\n').map(JSON.parse); return result; } @@ -26,7 +28,7 @@ function readTree(dirPath) { assert.strictEqual(typeof dirPath, 'string'); const names = safe.fs.readdirSync(dirPath).sort(); - if (!names) return [ ]; + if (!names) return []; return names.map((name) => { const absolutePath = path.join(dirPath, name); @@ -68,11 +70,11 @@ async function sync(dataLayout, cacheFile) { const addQueue = [], delQueue = []; // separate queues. we have to process the del first and then the add let curCacheIndex = 0; - const integrityMap = new Map(); + const integrityMap = new Map(); // integrity of unchanged files const newCacheFile = `${cacheFile}.new`; let cache = []; - // if cache is missing or if we crashed/errored in previous run, start out empty. TODO: do a remote listDir and rebuild + // if cache is missing or if we crashed/errored in previous run, start out empty if (!safe.fs.existsSync(cacheFile)) { delQueue.push({ operation: 'removedir', path: '', reason: 'nocache' }); } else if (safe.fs.existsSync(newCacheFile)) { @@ -118,7 +120,6 @@ async function sync(dataLayout, cacheFile) { const cachePath = curCacheIndex === cache.length ? null : cache[curCacheIndex].path; const cacheStat = curCacheIndex === cache.length ? null : cache[curCacheIndex].stat; - integrityMap.set(entryPath, curCacheIndex === cache.length ? null : cache[curCacheIndex].integrity); if (cachePath === null || cachePath > entryPath) { // new files appeared if (entryStat.isDirectory()) { @@ -132,6 +133,8 @@ async function sync(dataLayout, cacheFile) { } else if (ISFILE(cacheStat.mode) && entryStat.isFile()) { // file names match if (entryStat.mtime.getTime() !== cacheStat.mtime || entryStat.size != cacheStat.size || entryStat.inode !== cacheStat.inode) { // file changed addQueue.push({ operation: 'add', path: entryPath, reason: 'changed', position: addQueue.length }); + } else { + integrityMap.set(entryPath, cache[curCacheIndex].integrity); } ++curCacheIndex; } else if (entryStat.isDirectory()) { // was a file, now a directory @@ -159,10 +162,32 @@ async function sync(dataLayout, cacheFile) { }; } -async function finalize(cacheFile) { +async function finalize(integrityMap, cacheFile) { + assert(util.types.isMap(integrityMap)); assert.strictEqual(typeof cacheFile, 'string'); - const newCacheFile = `${cacheFile}.new`; + const newCacheFile = `${cacheFile}.new`, tempCacheFile = `${cacheFile}.tmp`; + + debug('finalize: patching in integrity information'); + + const tempCacheFd = safe.fs.openSync(tempCacheFile, 'w'); // truncates any existing file + if (tempCacheFd === -1) throw new BoxError(BoxError.FS_ERROR, 'Error opening temp cache file: ' + safe.error.message); + + const rl = readline.createInterface({ + input: fs.createReadStream(newCacheFile, { encoding: 'utf8' }), + crlfDelay: Infinity, + }); + + for await (const line of rl) { + if (!line) continue; + const cacheEntry = JSON.parse(line); + cacheEntry.integrity = integrityMap.get(cacheEntry.path); + safe.fs.appendFileSync(tempCacheFd, JSON.stringify(cacheEntry) + '\n'); + } + + safe.fs.closeSync(tempCacheFd); + safe.fs.unlinkSync(cacheFile); - if (!safe.fs.renameSync(newCacheFile, cacheFile)) debug('Unable to save new cache file'); + safe.fs.unlinkSync(newCacheFile); + if (!safe.fs.renameSync(tempCacheFile, cacheFile)) debug('Unable to save new cache file'); }