rsync: integrity fixes
This commit is contained in:
@@ -43,7 +43,7 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) {
|
||||
const [openError, sourceHandle] = await safe(fs.promises.open(sourceFile, 'r'));
|
||||
if (openError) {
|
||||
debug(`addFile: ignoring disappeared file: ${sourceFile}`);
|
||||
return null;
|
||||
return { integrity: null, stats: { transferred: 0 } };
|
||||
}
|
||||
|
||||
const sourceStream = sourceHandle.createReadStream(sourceFile, { autoClose: true });
|
||||
@@ -71,7 +71,7 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) {
|
||||
await uploader.finish();
|
||||
|
||||
return {
|
||||
stats: ps.stats(),
|
||||
stats: ps.stats(), // { startTime, totalMsecs, transferred }
|
||||
integrity: { size: ps.stats().transferred, sha256: hash.digest('hex') }
|
||||
};
|
||||
}
|
||||
@@ -86,15 +86,21 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) {
|
||||
const concurrency = backupSite.limits?.syncConcurrency || (backupSite.provider === 's3' ? 20 : 10);
|
||||
const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupSite.id, `${dataLayout.getBasename()}.sync.cache`);
|
||||
const { delQueue, addQueue, integrityMap } = await syncer.sync(dataLayout, cacheFile); // integrityMap is unchanged files
|
||||
debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`);
|
||||
debug(`sync: processing ${delQueue.length} deletes, ${addQueue.length} additions and ${integrityMap.size} unchanged`);
|
||||
const aggregatedStats = {
|
||||
transferred: 0,
|
||||
size: [...integrityMap.values()].reduce((sum, { size }) => sum + size, 0),
|
||||
size: [...integrityMap.values()].reduce((sum, integrity) => sum + (integrity?.size || 0), 0), // integrity can be null if file had disappeared during upload
|
||||
fileCount: addQueue.length + integrityMap.size, // final file count, not the transferred file count
|
||||
startTime: Date.now(),
|
||||
totalMsecs: 0
|
||||
};
|
||||
|
||||
const destPathIntegrityMap = new Map(); // unlike integrityMap which contains local filenames, this contains destination filenames (maybe encrypted)
|
||||
for (const [entryPath, integrity] of integrityMap) {
|
||||
const destPath = backupSite.encryption?.encryptedFilenames ? hush.encryptFilePath(entryPath, backupSite.encryption) : entryPath;
|
||||
destPathIntegrityMap.set(destPath, integrity);
|
||||
}
|
||||
|
||||
async function processSyncerChange(change) {
|
||||
debug('sync: processing task: %j', change);
|
||||
// the empty task.path is special to signify the directory
|
||||
@@ -102,20 +108,19 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) {
|
||||
const fullPath = path.join(remotePath, destPath);
|
||||
|
||||
if (change.operation === 'removedir') {
|
||||
debug(`Removing directory ${fullPath}`);
|
||||
debug(`sync: removing directory ${fullPath}`);
|
||||
await backupSites.storageApi(backupSite).removeDir(backupSite.config, fullPath, progressCallback);
|
||||
} else if (change.operation === 'remove') {
|
||||
debug(`Removing ${fullPath}`);
|
||||
debug(`sync: removing ${fullPath}`);
|
||||
await backupSites.storageApi(backupSite).remove(backupSite.config, fullPath);
|
||||
} else if (change.operation === 'add') {
|
||||
await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => {
|
||||
progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
|
||||
debug(`Adding ${change.path} position ${change.position} try ${retryCount}`);
|
||||
|
||||
debug(`sync: adding ${change.path} position ${change.position} try ${retryCount}`);
|
||||
const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, fullPath);
|
||||
const result = await addFile(dataLayout.toLocalPath('./' + change.path), backupSite.encryption, uploader, progressCallback);
|
||||
if (!result) return; // this can happen if the file disappeared on us
|
||||
integrityMap.set(destPath, result.integrity);
|
||||
integrityMap.set(change.path, result.integrity); // .integrity can be null when file disappeared on us
|
||||
destPathIntegrityMap.set(destPath, result.integrity);
|
||||
aggregatedStats.transferred += result.stats.transferred;
|
||||
aggregatedStats.size += result.stats.transferred;
|
||||
});
|
||||
@@ -134,7 +139,7 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) {
|
||||
|
||||
return {
|
||||
stats: { ...aggregatedStats, totalMsecs: Date.now()-aggregatedStats.startTime },
|
||||
integrityMap
|
||||
integrityMap: destPathIntegrityMap
|
||||
};
|
||||
}
|
||||
|
||||
@@ -322,17 +327,9 @@ async function verify(backupSite, remotePath, integrityMap, progressCallback) {
|
||||
|
||||
debug(`verify: Verifying ${remotePath}`);
|
||||
|
||||
const encryptedFilenames = backupSite.encryption?.encryptedFilenames || false;
|
||||
let fileCount = 0;
|
||||
|
||||
async function validateFile(entry) {
|
||||
let relativePath = path.relative(remotePath, entry.path);
|
||||
if (encryptedFilenames) {
|
||||
const { error, result } = hush.decryptFilePath(relativePath, backupSite.encryption);
|
||||
if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file');
|
||||
relativePath = result;
|
||||
}
|
||||
|
||||
++fileCount;
|
||||
const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, entry.path);
|
||||
|
||||
@@ -344,18 +341,17 @@ async function verify(backupSite, remotePath, integrityMap, progressCallback) {
|
||||
});
|
||||
|
||||
const streams = [ sourceStream, ps ];
|
||||
const hash = new HashStream();
|
||||
streams.push(hash);
|
||||
|
||||
if (backupSite.encryption) {
|
||||
const decryptStream = new DecryptStream(backupSite.encryption);
|
||||
streams.push(decryptStream);
|
||||
}
|
||||
|
||||
const hash = new HashStream();
|
||||
streams.push(hash);
|
||||
|
||||
await stream.pipeline(streams);
|
||||
|
||||
const integrity = integrityMap.get(relativePath);
|
||||
const integrity = integrityMap.get(entry.path);
|
||||
if (ps.stats().transferred !== integrity.size) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${ps.stats().transferred}. Expecting ${integrity.size}`);
|
||||
if (hash.digest() !== integrity.sha256) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${hash.digest()}. Expecting ${integrity.sha256}`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user