diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index 35142c229..f9a28095a 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -43,7 +43,7 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { const [openError, sourceHandle] = await safe(fs.promises.open(sourceFile, 'r')); if (openError) { debug(`addFile: ignoring disappeared file: ${sourceFile}`); - return null; + return { integrity: null, stats: { transferred: 0 } }; } const sourceStream = sourceHandle.createReadStream(sourceFile, { autoClose: true }); @@ -71,7 +71,7 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { await uploader.finish(); return { - stats: ps.stats(), + stats: ps.stats(), // { startTime, totalMsecs, transferred } integrity: { size: ps.stats().transferred, sha256: hash.digest('hex') } }; } @@ -86,15 +86,21 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) { const concurrency = backupSite.limits?.syncConcurrency || (backupSite.provider === 's3' ? 20 : 10); const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupSite.id, `${dataLayout.getBasename()}.sync.cache`); const { delQueue, addQueue, integrityMap } = await syncer.sync(dataLayout, cacheFile); // integrityMap is unchanged files - debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`); + debug(`sync: processing ${delQueue.length} deletes, ${addQueue.length} additions and ${integrityMap.size} unchanged`); const aggregatedStats = { transferred: 0, - size: [...integrityMap.values()].reduce((sum, { size }) => sum + size, 0), + size: [...integrityMap.values()].reduce((sum, integrity) => sum + (integrity?.size || 0), 0), // integrity can be null if file had disappeared during upload fileCount: addQueue.length + integrityMap.size, // final file count, not the transferred file count startTime: Date.now(), totalMsecs: 0 }; + const destPathIntegrityMap = new Map(); // unlike integrityMap which contains local filenames, this contains destination filenames (maybe encrypted) + for (const [entryPath, integrity] of integrityMap) { + const destPath = backupSite.encryption?.encryptedFilenames ? hush.encryptFilePath(entryPath, backupSite.encryption) : entryPath; + destPathIntegrityMap.set(destPath, integrity); + } + async function processSyncerChange(change) { debug('sync: processing task: %j', change); // the empty task.path is special to signify the directory @@ -102,20 +108,19 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) { const fullPath = path.join(remotePath, destPath); if (change.operation === 'removedir') { - debug(`Removing directory ${fullPath}`); + debug(`sync: removing directory ${fullPath}`); await backupSites.storageApi(backupSite).removeDir(backupSite.config, fullPath, progressCallback); } else if (change.operation === 'remove') { - debug(`Removing ${fullPath}`); + debug(`sync: removing ${fullPath}`); await backupSites.storageApi(backupSite).remove(backupSite.config, fullPath); } else if (change.operation === 'add') { await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => { progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') }); - debug(`Adding ${change.path} position ${change.position} try ${retryCount}`); - + debug(`sync: adding ${change.path} position ${change.position} try ${retryCount}`); const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, fullPath); const result = await addFile(dataLayout.toLocalPath('./' + change.path), backupSite.encryption, uploader, progressCallback); - if (!result) return; // this can happen if the file disappeared on us - integrityMap.set(destPath, result.integrity); + integrityMap.set(change.path, result.integrity); // .integrity can be null when file disappeared on us + destPathIntegrityMap.set(destPath, result.integrity); aggregatedStats.transferred += result.stats.transferred; aggregatedStats.size += result.stats.transferred; }); @@ -134,7 +139,7 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) { return { stats: { ...aggregatedStats, totalMsecs: Date.now()-aggregatedStats.startTime }, - integrityMap + integrityMap: destPathIntegrityMap }; } @@ -322,17 +327,9 @@ async function verify(backupSite, remotePath, integrityMap, progressCallback) { debug(`verify: Verifying ${remotePath}`); - const encryptedFilenames = backupSite.encryption?.encryptedFilenames || false; let fileCount = 0; async function validateFile(entry) { - let relativePath = path.relative(remotePath, entry.path); - if (encryptedFilenames) { - const { error, result } = hush.decryptFilePath(relativePath, backupSite.encryption); - if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file'); - relativePath = result; - } - ++fileCount; const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, entry.path); @@ -344,18 +341,17 @@ async function verify(backupSite, remotePath, integrityMap, progressCallback) { }); const streams = [ sourceStream, ps ]; + const hash = new HashStream(); + streams.push(hash); if (backupSite.encryption) { const decryptStream = new DecryptStream(backupSite.encryption); streams.push(decryptStream); } - const hash = new HashStream(); - streams.push(hash); - await stream.pipeline(streams); - const integrity = integrityMap.get(relativePath); + const integrity = integrityMap.get(entry.path); if (ps.stats().transferred !== integrity.size) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${ps.stats().transferred}. Expecting ${integrity.size}`); if (hash.digest() !== integrity.sha256) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${hash.digest()}. Expecting ${integrity.sha256}`); } diff --git a/src/backuptask.js b/src/backuptask.js index 152754ee1..2738ae33a 100644 --- a/src/backuptask.js +++ b/src/backuptask.js @@ -100,10 +100,14 @@ async function upload(remotePath, siteId, dataLayoutString, progressCallback) { await checkPreconditions(backupSite, dataLayout); + // integrityMap - { size, fileCount, sha256 } of each file. this is saved in .backupinfo file + // - tgz: only one entry named "." in the map. fileCount has the file count inside. + // - rsync: entry for each relative path. // integrity - { signature } of the uploaded .backupinfo . - // .backupinfo contains an integrityMap { size, fileCount, sha256 } of each file. for tgz, fileCount has the file count inside - // stats - { fileCount, size, startTime, totalMsecs, transferred } . size is the backup size .transferred is what was transferred. they differ for rsync - // fileCount and size in stats should match up .backupinfo + // stats - { fileCount, size, startTime, totalMsecs, transferred } + // - tgz: size (backup size) and transferred is the same + // - rsync: size (final backup size) will be different from what was transferred (only changed files) + // stats.fileCount and stats.size are stored in db and should match up what is written into .backupinfo const { stats, integrityMap } = await backupFormats.api(backupSite.format).upload(backupSite, remotePath, dataLayout, progressCallback); progressCallback({ message: `Uploading integrity information to ${remotePath}.backupinfo` }); diff --git a/src/syncer.js b/src/syncer.js index eb0c189f9..94563f36a 100644 --- a/src/syncer.js +++ b/src/syncer.js @@ -76,10 +76,13 @@ async function sync(dataLayout, cacheFile) { // if cache is missing or if we crashed/errored in previous run, start out empty if (!safe.fs.existsSync(cacheFile)) { + debug(`sync: cache file ${cacheFile} is missing, starting afresh`); delQueue.push({ operation: 'removedir', path: '', reason: 'nocache' }); } else if (safe.fs.existsSync(newCacheFile)) { + debug(`sync: new cache file ${newCacheFile} exists. previous run crashed, starting afresh`); delQueue.push({ operation: 'removedir', path: '', reason: 'crash' }); } else { + debug(`sync: loading cache file ${cacheFile}`); cache = readCache(cacheFile); } @@ -168,7 +171,7 @@ async function finalize(integrityMap, cacheFile) { const newCacheFile = `${cacheFile}.new`, tempCacheFile = `${cacheFile}.tmp`; - debug('finalize: patching in integrity information'); + debug(`finalize: patching in integrity information into ${cacheFile}`); const tempCacheFd = safe.fs.openSync(tempCacheFile, 'w'); // truncates any existing file if (tempCacheFd === -1) throw new BoxError(BoxError.FS_ERROR, 'Error opening temp cache file: ' + safe.error.message); @@ -181,7 +184,10 @@ async function finalize(integrityMap, cacheFile) { for await (const line of rl) { if (!line) continue; const cacheEntry = JSON.parse(line); - cacheEntry.integrity = integrityMap.get(cacheEntry.path); + if (ISFILE(cacheEntry.stat.mode)) { + cacheEntry.integrity = integrityMap.get(cacheEntry.path); // { size, sha256 } + if (typeof cacheEntry.integrity === 'undefined') throw new BoxError(BoxError.INTERNAL_ERROR, `No integrity information for ${cacheEntry.path}`); + } safe.fs.appendFileSync(tempCacheFd, JSON.stringify(cacheEntry) + '\n'); }