rsync: fix integrity check
This commit is contained in:
+48
-45
@@ -10,13 +10,14 @@ import fs from 'node:fs';
|
||||
import HashStream from '../hash-stream.js';
|
||||
import path from 'node:path';
|
||||
import paths from '../paths.js';
|
||||
import { pipeline } from 'node:stream/promises';
|
||||
import ProgressStream from '../progress-stream.js';
|
||||
import promiseRetry from '../promise-retry.js';
|
||||
import safe from 'safetydance';
|
||||
import shellModule from '../shell.js';
|
||||
import stream from 'stream/promises';
|
||||
import syncer from '../syncer.js';
|
||||
import util from 'node:util';
|
||||
import { Writable } from 'node:stream';
|
||||
|
||||
const debug = debugModule('box:backupformat/rsync');
|
||||
const shell = shellModule('backupformat/rsync');
|
||||
@@ -50,9 +51,9 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) {
|
||||
let pipeline;
|
||||
if (encryption) {
|
||||
const encryptStream = new EncryptStream(encryption);
|
||||
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, destStream));
|
||||
pipeline = safe(pipeline(sourceStream, encryptStream, ps, hash, destStream));
|
||||
} else {
|
||||
pipeline = safe(stream.pipeline(sourceStream, ps, hash, destStream));
|
||||
pipeline = safe(pipeline(sourceStream, ps, hash, destStream));
|
||||
}
|
||||
|
||||
const [error] = await safe(pipeline);
|
||||
@@ -264,7 +265,7 @@ async function downloadDir(backupSite, remotePath, dataLayout, progressCallback)
|
||||
|
||||
streams.push(destStream);
|
||||
|
||||
const [pipelineError] = await safe(stream.pipeline(streams));
|
||||
const [pipelineError] = await safe(pipeline(streams));
|
||||
if (pipelineError) {
|
||||
progressCallback({ message: `Download error ${entry.path} to ${destFilePath}: ${pipelineError.message}` });
|
||||
throw pipelineError;
|
||||
@@ -308,6 +309,34 @@ async function upload(backupSite, remotePath, dataLayout, progressCallback) {
|
||||
return await sync(backupSite, remotePath, dataLayout, progressCallback); // { stats, integrityMap }
|
||||
}
|
||||
|
||||
async function computeEntryIntegrity(backupSite, entry, progressCallback) {
|
||||
const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, entry.path);
|
||||
|
||||
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
|
||||
ps.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
|
||||
const streams = [ sourceStream, ps ];
|
||||
const hash = new HashStream();
|
||||
streams.push(hash);
|
||||
|
||||
if (backupSite.encryption) {
|
||||
const decryptStream = new DecryptStream(backupSite.encryption);
|
||||
streams.push(decryptStream);
|
||||
}
|
||||
|
||||
const sink = new Writable({ write(chunk, enc, cb) { cb(); } });
|
||||
streams.push(sink); // drain the stream
|
||||
|
||||
const [error] = await safe(pipeline(streams));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `pipeline error: ${error.message}`);
|
||||
|
||||
return { transferred: ps.stats().transferred, digest: hash.digest() };
|
||||
}
|
||||
|
||||
async function verify(backupSite, remotePath, integrityMap, progressCallback) {
|
||||
assert.strictEqual(typeof backupSite, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
@@ -316,56 +345,30 @@ async function verify(backupSite, remotePath, integrityMap, progressCallback) {
|
||||
|
||||
debug(`verify: Verifying ${remotePath}`);
|
||||
|
||||
let fileCount = 0;
|
||||
|
||||
async function validateFile(entry) {
|
||||
++fileCount;
|
||||
const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, entry.path);
|
||||
|
||||
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
|
||||
ps.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
|
||||
const streams = [ sourceStream, ps ];
|
||||
const hash = new HashStream();
|
||||
streams.push(hash);
|
||||
|
||||
if (backupSite.encryption) {
|
||||
const decryptStream = new DecryptStream(backupSite.encryption);
|
||||
streams.push(decryptStream);
|
||||
}
|
||||
|
||||
await stream.pipeline(streams);
|
||||
|
||||
const integrity = integrityMap.get(entry.path);
|
||||
if (ps.stats().transferred !== integrity.size) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${ps.stats().transferred}. Expecting ${integrity.size}`);
|
||||
if (hash.digest() !== integrity.sha256) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${hash.digest()}. Expecting ${integrity.sha256}`);
|
||||
}
|
||||
|
||||
debug(integrityMap.entries());
|
||||
|
||||
// https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441
|
||||
const concurrency = backupSite.limits?.downloadConcurrency || (backupSite.provider === 's3' ? 30 : 10);
|
||||
let marker = null;
|
||||
let fileCount = 0, marker = null;
|
||||
const messages = [];
|
||||
while (true) {
|
||||
const batch = await backupSites.storageApi(backupSite).listDir(backupSite.config, remotePath, marker === null ? 1 : 1000, marker); // try with one file first. if that works out, we continue faster
|
||||
await async.eachLimit(batch.entries, concurrency, validateFile);
|
||||
await async.eachLimit(batch.entries, concurrency, async function validateFile(entry) {
|
||||
const [error, result] = await safe(computeEntryIntegrity(backupSite, entry, progressCallback));
|
||||
if (error) return messages.push(`Could not verify integrity of ${entry.path}: ${error.message}`);
|
||||
|
||||
const relativePath = path.relative(remotePath, entry.path);
|
||||
const integrity = integrityMap.get(relativePath);
|
||||
debug(`verify: computed integrity of ${entry.path} as ${JSON.stringify(result)}`);
|
||||
if (result.transferred !== integrity.size) messages.push(`${entry.path} has size ${result.transferred}. Expecting ${integrity.size}`);
|
||||
else if (result.digest !== integrity.sha256) messages.push(`${entry.path} has digest ${result.digest()}. Expecting ${integrity.sha256}`);
|
||||
});
|
||||
fileCount += batch.entries.length;
|
||||
if (!batch.marker) break;
|
||||
marker = batch.marker;
|
||||
}
|
||||
|
||||
const check = (x, y) => { return x === y ? { status: 'passed' } : { status: 'failed', message: `Expecting ${x} but got ${y}` }; };
|
||||
if (integrityMap.size !== fileCount) messages.push(`Got ${fileCount} files. Expecting ${integrityMap.size()} files`);
|
||||
|
||||
if (integrityMap.size !== fileCount) throw new BoxError(BoxError.BAD_STATE, `Got ${fileCount} files. Expecting ${integrityMap.size()} files`);
|
||||
|
||||
return {
|
||||
size: { status: 'passed' },
|
||||
fileCount: check(integrityMap.size, fileCount),
|
||||
sha256: { status: 'passed' },
|
||||
};
|
||||
return messages;
|
||||
}
|
||||
|
||||
export default {
|
||||
|
||||
Reference in New Issue
Block a user