diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index f173f0ad9..4cd3a4d3b 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -10,13 +10,14 @@ import fs from 'node:fs'; import HashStream from '../hash-stream.js'; import path from 'node:path'; import paths from '../paths.js'; +import { pipeline } from 'node:stream/promises'; import ProgressStream from '../progress-stream.js'; import promiseRetry from '../promise-retry.js'; import safe from 'safetydance'; import shellModule from '../shell.js'; -import stream from 'stream/promises'; import syncer from '../syncer.js'; import util from 'node:util'; +import { Writable } from 'node:stream'; const debug = debugModule('box:backupformat/rsync'); const shell = shellModule('backupformat/rsync'); @@ -50,9 +51,9 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { let pipeline; if (encryption) { const encryptStream = new EncryptStream(encryption); - pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, destStream)); + pipeline = safe(pipeline(sourceStream, encryptStream, ps, hash, destStream)); } else { - pipeline = safe(stream.pipeline(sourceStream, ps, hash, destStream)); + pipeline = safe(pipeline(sourceStream, ps, hash, destStream)); } const [error] = await safe(pipeline); @@ -264,7 +265,7 @@ async function downloadDir(backupSite, remotePath, dataLayout, progressCallback) streams.push(destStream); - const [pipelineError] = await safe(stream.pipeline(streams)); + const [pipelineError] = await safe(pipeline(streams)); if (pipelineError) { progressCallback({ message: `Download error ${entry.path} to ${destFilePath}: ${pipelineError.message}` }); throw pipelineError; @@ -308,6 +309,34 @@ async function upload(backupSite, remotePath, dataLayout, progressCallback) { return await sync(backupSite, remotePath, dataLayout, progressCallback); // { stats, integrityMap } } +async function computeEntryIntegrity(backupSite, entry, progressCallback) { + const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, entry.path); + + const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds + ps.on('progress', function (progress) { + const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); + if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong + progressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` }); + }); + + const streams = [ sourceStream, ps ]; + const hash = new HashStream(); + streams.push(hash); + + if (backupSite.encryption) { + const decryptStream = new DecryptStream(backupSite.encryption); + streams.push(decryptStream); + } + + const sink = new Writable({ write(chunk, enc, cb) { cb(); } }); + streams.push(sink); // drain the stream + + const [error] = await safe(pipeline(streams)); + if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `pipeline error: ${error.message}`); + + return { transferred: ps.stats().transferred, digest: hash.digest() }; +} + async function verify(backupSite, remotePath, integrityMap, progressCallback) { assert.strictEqual(typeof backupSite, 'object'); assert.strictEqual(typeof remotePath, 'string'); @@ -316,56 +345,30 @@ async function verify(backupSite, remotePath, integrityMap, progressCallback) { debug(`verify: Verifying ${remotePath}`); - let fileCount = 0; - - async function validateFile(entry) { - ++fileCount; - const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, entry.path); - - const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds - ps.on('progress', function (progress) { - const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); - if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong - progressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` }); - }); - - const streams = [ sourceStream, ps ]; - const hash = new HashStream(); - streams.push(hash); - - if (backupSite.encryption) { - const decryptStream = new DecryptStream(backupSite.encryption); - streams.push(decryptStream); - } - - await stream.pipeline(streams); - - const integrity = integrityMap.get(entry.path); - if (ps.stats().transferred !== integrity.size) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${ps.stats().transferred}. Expecting ${integrity.size}`); - if (hash.digest() !== integrity.sha256) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${hash.digest()}. Expecting ${integrity.sha256}`); - } - - debug(integrityMap.entries()); - // https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441 const concurrency = backupSite.limits?.downloadConcurrency || (backupSite.provider === 's3' ? 30 : 10); - let marker = null; + let fileCount = 0, marker = null; + const messages = []; while (true) { const batch = await backupSites.storageApi(backupSite).listDir(backupSite.config, remotePath, marker === null ? 1 : 1000, marker); // try with one file first. if that works out, we continue faster - await async.eachLimit(batch.entries, concurrency, validateFile); + await async.eachLimit(batch.entries, concurrency, async function validateFile(entry) { + const [error, result] = await safe(computeEntryIntegrity(backupSite, entry, progressCallback)); + if (error) return messages.push(`Could not verify integrity of ${entry.path}: ${error.message}`); + + const relativePath = path.relative(remotePath, entry.path); + const integrity = integrityMap.get(relativePath); + debug(`verify: computed integrity of ${entry.path} as ${JSON.stringify(result)}`); + if (result.transferred !== integrity.size) messages.push(`${entry.path} has size ${result.transferred}. Expecting ${integrity.size}`); + else if (result.digest !== integrity.sha256) messages.push(`${entry.path} has digest ${result.digest()}. Expecting ${integrity.sha256}`); + }); + fileCount += batch.entries.length; if (!batch.marker) break; marker = batch.marker; } - const check = (x, y) => { return x === y ? { status: 'passed' } : { status: 'failed', message: `Expecting ${x} but got ${y}` }; }; + if (integrityMap.size !== fileCount) messages.push(`Got ${fileCount} files. Expecting ${integrityMap.size()} files`); - if (integrityMap.size !== fileCount) throw new BoxError(BoxError.BAD_STATE, `Got ${fileCount} files. Expecting ${integrityMap.size()} files`); - - return { - size: { status: 'passed' }, - fileCount: check(integrityMap.size, fileCount), - sha256: { status: 'passed' }, - }; + return messages; } export default { diff --git a/src/backupintegrity.js b/src/backupintegrity.js index b31adf68e..0dd244905 100644 --- a/src/backupintegrity.js +++ b/src/backupintegrity.js @@ -34,23 +34,23 @@ async function verify(backup, backupSite, progressCallback) { } const validSignature = crypto.verify(null /* algo */, backupInfoBuffer, backupSite.integrityKeyPair.publicKey, Buffer.from(backup.integrity.signature, 'hex')); - progressCallback({ message: `Signature valid? ${validSignature}`}); + progressCallback({ message: `${backup.remotePath}.backupinfo has ${validSignature ? 'valid': 'invalid' } signature`}); const backupInfo = JSON.parse(backupInfoBuffer.toString('utf8')); const integrityMap = new Map(Object.entries(backupInfo)); const [verifyError, verifyMessages] = await safe(backupFormats.api(backupSite.format).verify(backupSite, backup.remotePath, integrityMap, progressCallback)); - progressCallback({ message: 'Verification done' }); + progressCallback({ message: `Verification of ${backup.remotePath} done` }); const messages = []; if (!validSignature) messages.push(`${backup.remotePath}.backupinfo has invalid signature`); if (verifyError) messages.push(`Failed to verify ${backup.remotePath}: ${verifyError.message}`); if (verifyMessages) messages.push(...verifyMessages); - debug(`verified: ${JSON.stringify(verifyMessages, null, 4)}`); + debug(`verified: ${backup.remotePath} ${JSON.stringify(verifyMessages, null, 4)}`); stats.duration = Date.now() - stats.startTime; - return { stats, messages }; + return { stats, messages: messages.slice(0, 50) }; // keep rsync fails to 50 to not overflow db } async function check(backupId, progressCallback) { @@ -66,7 +66,7 @@ async function check(backupId, progressCallback) { const depBackup = await backups.get(depId); const result = await verify(depBackup, backupSite, progressCallback); // { stats, messages } - await backups.setIntegrityResult(backup, result.messages.length === 0 ? 'passed' : 'failed', result); + await backups.setIntegrityResult(depBackup, result.messages.length === 0 ? 'passed' : 'failed', result); if (result.messages.length) aggregatedMessages.push(`Integrity check of dependent backup ${depBackup.remotePath} failed`); } diff --git a/src/backups.js b/src/backups.js index 1bc324913..0ee33a3aa 100644 --- a/src/backups.js +++ b/src/backups.js @@ -217,7 +217,7 @@ async function startIntegrityCheck(backup, auditSource) { await eventlog.add(eventlog.ACTION_BACKUP_INTEGRITY_START, auditSource, { taskId, backupId: backup.id }); - // background + // background tasks.startTask(taskId, {}) .then(async () => debug(`startIntegrityCheck: task completed`)) .catch((error) => debug(`startIntegrityCheck: task error. ${error.message}`)) @@ -239,6 +239,7 @@ async function stopIntegrityCheck(backup, auditSource) { } async function clearTasks() { + debug('clearTasks: clearing task ids'); await database.query('UPDATE backups SET integrityCheckTaskId = NULL'); }