Files
cloudron-box/src/backupformat/rsync.js

383 lines
19 KiB
JavaScript
Raw Normal View History

'use strict';
exports = module.exports = {
download,
upload,
2025-08-15 21:35:34 +05:30
verify,
2025-08-01 22:58:19 +02:00
getFileExtension,
copy,
_saveFsMetadata: saveFsMetadata,
_restoreFsMetadata: restoreFsMetadata
};
const assert = require('node:assert'),
async = require('async'),
2025-09-12 09:48:37 +02:00
backupSites = require('../backupsites.js'),
BoxError = require('../boxerror.js'),
DataLayout = require('../datalayout.js'),
{ DecryptStream } = require('../hush.js'),
debug = require('debug')('box:backupformat/rsync'),
{ EncryptStream } = require('../hush.js'),
fs = require('node:fs'),
HashStream = require('../hash-stream.js'),
hush = require('../hush.js'),
path = require('node:path'),
paths = require('../paths.js'),
ProgressStream = require('../progress-stream.js'),
2023-08-01 19:03:24 +05:30
promiseRetry = require('../promise-retry.js'),
safe = require('safetydance'),
2024-10-14 19:10:31 +02:00
shell = require('../shell.js')('backupformat/rsync'),
stream = require('stream/promises'),
2025-08-15 21:35:34 +05:30
syncer = require('../syncer.js'),
util = require('node:util');
async function addFile(sourceFile, encryption, uploader, progressCallback) {
assert.strictEqual(typeof sourceFile, 'string');
assert.strictEqual(typeof encryption, 'object');
assert.strictEqual(typeof uploader, 'object');
assert.strictEqual(typeof progressCallback, 'function');
// make sure file can be opened for reading before we start the pipeline. otherwise, we end up with
// destinations dirs/file which are owned by root (this process id) and cannot be copied (run as normal user)
const [openError, sourceHandle] = await safe(fs.promises.open(sourceFile, 'r'));
if (openError) {
debug(`addFile: ignoring disappeared file: ${sourceFile}`);
2025-10-01 17:19:58 +02:00
return null;
}
const sourceStream = sourceHandle.createReadStream(sourceFile, { autoClose: true });
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
ps.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Uploading ${sourceFile}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong
});
const hash = new HashStream();
2025-08-13 09:39:36 +05:30
let pipeline = null;
if (encryption) {
const encryptStream = new EncryptStream(encryption);
2025-08-13 09:39:36 +05:30
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, uploader.stream));
} else {
2025-08-13 09:39:36 +05:30
pipeline = safe(stream.pipeline(sourceStream, ps, hash, uploader.stream));
}
const [error] = await safe(pipeline);
2025-08-13 09:39:36 +05:30
if (error && !error.message.includes('ENOENT')) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); // ignore error if file disappears
// debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`);
await uploader.finish();
2025-10-01 17:19:58 +02:00
2025-08-13 09:39:36 +05:30
return {
stats: ps.stats(),
integrity: { size: ps.stats().transferred, sha256: hash.digest('hex') }
};
}
2025-09-12 09:48:37 +02:00
async function sync(backupSite, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof backupSite, 'object');
assert.strictEqual(typeof remotePath, 'string');
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof progressCallback, 'function');
// the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB
2025-09-12 09:48:37 +02:00
const concurrency = backupSite.limits?.syncConcurrency || (backupSite.provider === 's3' ? 20 : 10);
const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupSite.id, `${dataLayout.getBasename()}.sync.cache`);
2025-10-01 17:19:58 +02:00
const { delQueue, addQueue, integrityMap } = await syncer.sync(dataLayout, cacheFile); // integrityMap is unchanged files
2025-08-13 09:39:36 +05:30
debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`);
const aggregatedStats = {
2025-10-01 17:19:58 +02:00
transferred: 0,
size: [...integrityMap.values()].reduce((sum, { size }) => sum + size, 0),
2025-10-01 17:19:58 +02:00
fileCount: addQueue.length + integrityMap.size, // final file count, not the transferred file count
startTime: Date.now(),
totalMsecs: 0
};
async function processSyncerChange(change) {
debug('sync: processing task: %j', change);
// the empty task.path is special to signify the directory
2025-09-12 09:48:37 +02:00
const destPath = change.path && backupSite.encryption?.encryptedFilenames ? hush.encryptFilePath(change.path, backupSite.encryption) : change.path;
const fullPath = path.join(remotePath, destPath);
if (change.operation === 'removedir') {
debug(`Removing directory ${fullPath}`);
2025-09-12 09:48:37 +02:00
await backupSites.storageApi(backupSite).removeDir(backupSite.config, fullPath, progressCallback);
} else if (change.operation === 'remove') {
debug(`Removing ${fullPath}`);
2025-09-12 09:48:37 +02:00
await backupSites.storageApi(backupSite).remove(backupSite.config, fullPath);
} else if (change.operation === 'add') {
await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => {
progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
debug(`Adding ${change.path} position ${change.position} try ${retryCount}`);
2025-09-12 09:48:37 +02:00
const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, fullPath);
2025-10-01 17:19:58 +02:00
const result = await addFile(dataLayout.toLocalPath('./' + change.path), backupSite.encryption, uploader, progressCallback);
if (!result) return; // this can happen if the file disappeared on us
integrityMap.set(destPath, result.integrity);
aggregatedStats.transferred += result.stats.transferred;
aggregatedStats.size += result.stats.transferred;
});
}
}
2025-09-12 09:48:37 +02:00
const [delError] = await safe(async.eachLimit(delQueue, concurrency, async (change) => await processSyncerChange(change, backupSite, remotePath, dataLayout, progressCallback)));
debug('sync: done processing deletes. error: %o', delError);
2025-07-14 21:57:20 +02:00
if (delError) throw delError;
2025-09-12 09:48:37 +02:00
const [addError] = await safe(async.eachLimit(addQueue, concurrency, async (change) => await processSyncerChange(change, backupSite, remotePath, dataLayout, progressCallback)));
debug('sync: done processing adds. error: %o', addError);
if (addError) throw addError;
await syncer.finalize(integrityMap, cacheFile);
2025-08-13 09:39:36 +05:30
return {
stats: { ...aggregatedStats, totalMsecs: Date.now()-aggregatedStats.startTime },
integrityMap
2025-08-13 09:39:36 +05:30
};
}
// this is not part of 'snapshotting' because we need root access to traverse
async function saveFsMetadata(dataLayout, metadataFile) {
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof metadataFile, 'string');
// contains paths prefixed with './'
const metadata = {
emptyDirs: [],
execFiles: [],
symlinks: []
};
// we assume small number of files. spawnSync will raise a ENOBUFS error after maxBuffer
2024-07-08 13:18:22 +02:00
for (const lp of dataLayout.localPaths()) {
const [emptyDirsError, emptyDirs] = await safe(shell.spawn('find', [lp, '-type', 'd', '-empty'], { encoding: 'utf8', maxLines: 50000 }));
if (emptyDirsError && emptyDirsError.stdoutLineCount >= 50000) throw new BoxError(BoxError.FS_ERROR, `Too many empty directories. Run "find ${lp} -type d -empty" to investigate`);
if (emptyDirsError) throw emptyDirsError;
if (emptyDirs.length) metadata.emptyDirs = metadata.emptyDirs.concat(emptyDirs.trim().split('\n').map((ed) => dataLayout.toRemotePath(ed)));
const [execFilesError, execFiles] = await safe(shell.spawn('find', [lp, '-type', 'f', '-executable'], { encoding: 'utf8', maxLines: 20000 }));
if (execFilesError && execFilesError.stdoutLineCount >= 20000) throw new BoxError(BoxError.FS_ERROR, `Too many executable files. Run "find ${lp} -type f -executable" to investigate`);
if (execFilesError) throw execFilesError;
if (execFiles.length) metadata.execFiles = metadata.execFiles.concat(execFiles.trim().split('\n').map((ef) => dataLayout.toRemotePath(ef)));
const [symlinkFilesError, symlinkFiles] = await safe(shell.spawn('find', [lp, '-type', 'l'], { encoding: 'utf8', maxLines: 20000 }));
if (symlinkFilesError && symlinkFilesError.stdoutLineCount >= 20000) throw new BoxError(BoxError.FS_ERROR, `Too many symlinks. Run "find ${lp} -type l" to investigate`);
if (symlinkFilesError) throw symlinkFilesError;
if (symlinkFiles.length) metadata.symlinks = metadata.symlinks.concat(symlinkFiles.trim().split('\n').map((sl) => {
2025-09-12 09:48:37 +02:00
const site = safe.fs.readlinkSync(sl);
return { path: dataLayout.toRemotePath(sl), site };
}));
}
2025-08-13 15:55:00 +05:30
if (!safe.fs.writeFileSync(metadataFile, JSON.stringify(metadata, null, 2))) throw new BoxError(BoxError.FS_ERROR, `Error writing fs metadata: ${safe.error.message}`);
}
async function restoreFsMetadata(dataLayout, metadataFile) {
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof metadataFile, 'string');
debug(`Recreating empty directories in ${dataLayout.toString()}`);
const metadataJson = safe.fs.readFileSync(metadataFile, 'utf8');
if (metadataJson === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error loading fsmetadata.json:' + safe.error.message);
const metadata = safe.JSON.parse(metadataJson);
if (metadata === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error parsing fsmetadata.json:' + safe.error.message);
for (const emptyDir of metadata.emptyDirs) {
const [mkdirError] = await safe(fs.promises.mkdir(dataLayout.toLocalPath(emptyDir), { recursive: true }));
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to create path: ${mkdirError.message}`);
}
for (const execFile of metadata.execFiles) {
const [chmodError] = await safe(fs.promises.chmod(dataLayout.toLocalPath(execFile), parseInt('0755', 8)));
if (chmodError) throw new BoxError(BoxError.FS_ERROR, `unable to chmod: ${chmodError.message}`);
}
for (const symlink of (metadata.symlinks || [])) {
2025-09-12 09:48:37 +02:00
if (!symlink.site) continue;
// the path may not exist if we had a directory full of symlinks
const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(dataLayout.toLocalPath(symlink.path)), { recursive: true }));
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink (mkdir): ${mkdirError.message}`);
2025-09-12 09:48:37 +02:00
const [symlinkError] = await safe(fs.promises.symlink(symlink.site, dataLayout.toLocalPath(symlink.path), 'file'));
if (symlinkError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink: ${symlinkError.message}`);
}
}
2025-09-12 09:48:37 +02:00
async function downloadDir(backupSite, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof backupSite, 'object');
2025-08-15 21:35:34 +05:30
assert.strictEqual(typeof remotePath, 'string');
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof progressCallback, 'function');
2025-09-12 09:48:37 +02:00
const encryptedFilenames = backupSite.encryption?.encryptedFilenames || false;
2025-07-25 13:49:37 +02:00
2025-09-12 09:48:37 +02:00
debug(`downloadDir: ${remotePath} to ${dataLayout.toString()}. encryption filenames: ${encryptedFilenames}. encrypted files: ${!!backupSite.encryption}`);
2023-08-01 19:03:24 +05:30
async function downloadFile(entry) {
2025-08-15 21:35:34 +05:30
let relativePath = path.relative(remotePath, entry.path);
2025-07-25 13:49:37 +02:00
if (encryptedFilenames) {
2025-09-12 09:48:37 +02:00
const { error, result } = hush.decryptFilePath(relativePath, backupSite.encryption);
2023-08-01 19:03:24 +05:30
if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file');
relativePath = result;
}
const destFilePath = dataLayout.toLocalPath('./' + relativePath);
2023-08-01 19:03:24 +05:30
const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(destFilePath), { recursive: true }));
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, mkdirError.message);
await promiseRetry({ times: 3, interval: 20000 }, async function () {
2025-08-25 19:15:08 +02:00
progressCallback({ message: `Downloading ${entry.path} to ${destFilePath}` });
2025-09-12 09:48:37 +02:00
const [downloadError, sourceStream] = await safe(backupSites.storageApi(backupSite).download(backupSite.config, entry.path));
2023-08-01 19:03:24 +05:30
if (downloadError) {
progressCallback({ message: `Download ${entry.path} to ${destFilePath} errored: ${downloadError.message}` });
2023-08-01 19:03:24 +05:30
throw downloadError;
}
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
ps.on('progress', function (progress) {
2023-08-01 19:03:24 +05:30
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` });
2023-08-01 19:03:24 +05:30
});
const destStream = fs.createWriteStream(destFilePath);
const streams = [ sourceStream, ps ];
2025-09-12 09:48:37 +02:00
if (backupSite.encryption) {
const decryptStream = new DecryptStream(backupSite.encryption);
streams.push(decryptStream);
}
streams.push(destStream);
const [pipelineError] = await safe(stream.pipeline(streams));
2023-08-01 19:03:24 +05:30
if (pipelineError) {
progressCallback({ message: `Download error ${entry.path} to ${destFilePath}: ${pipelineError.message}` });
2023-08-01 19:03:24 +05:30
throw pipelineError;
}
progressCallback({ message: `Download finished ${entry.path} to ${destFilePath}` });
});
}
// https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441
2025-09-12 09:48:37 +02:00
const concurrency = backupSite.limits?.downloadConcurrency || (backupSite.provider === 's3' ? 30 : 10);
let marker = null;
while (true) {
2025-09-12 09:48:37 +02:00
const batch = await backupSites.storageApi(backupSite).listDir(backupSite.config, remotePath, marker === null ? 1 : 1000, marker); // try with one file first. if that works out, we continue faster
await async.eachLimit(batch.entries, concurrency, downloadFile);
if (!batch.marker) break;
marker = batch.marker;
}
}
2025-09-12 09:48:37 +02:00
async function download(backupSite, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof backupSite, 'object');
assert.strictEqual(typeof remotePath, 'string');
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof progressCallback, 'function');
debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`);
2025-09-12 09:48:37 +02:00
await downloadDir(backupSite, remotePath, dataLayout, progressCallback);
2022-04-30 16:42:14 -07:00
await restoreFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`);
}
2025-09-12 09:48:37 +02:00
async function upload(backupSite, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof backupSite, 'object');
assert.strictEqual(typeof remotePath, 'string');
assert.strictEqual(typeof dataLayout, 'object');
assert.strictEqual(typeof progressCallback, 'function');
2022-04-30 16:42:14 -07:00
await saveFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`);
2025-09-12 09:48:37 +02:00
return await sync(backupSite, remotePath, dataLayout, progressCallback); // { stats, integrityMap }
}
2025-08-01 22:58:19 +02:00
2025-09-12 09:48:37 +02:00
async function copy(backupSite, fromPath, toPath, progressCallback) {
assert.strictEqual(typeof backupSite, 'object');
assert.strictEqual(typeof fromPath, 'string');
assert.strictEqual(typeof toPath, 'string');
assert.strictEqual(typeof progressCallback, 'function');
2025-09-12 09:48:37 +02:00
await backupSites.storageApi(backupSite).copyDir(backupSite.config, fromPath, toPath, progressCallback);
}
2025-08-01 22:58:19 +02:00
function getFileExtension(encryption) {
assert.strictEqual(typeof encryption, 'boolean');
return ''; // this also signals to backupcleanear that we are dealing with directories
2025-08-01 22:58:19 +02:00
}
2025-08-15 21:35:34 +05:30
2025-09-12 09:48:37 +02:00
async function verify(backupSite, remotePath, integrityMap, progressCallback) {
assert.strictEqual(typeof backupSite, 'object');
2025-08-15 21:35:34 +05:30
assert.strictEqual(typeof remotePath, 'string');
assert(util.types.isMap(integrityMap), 'integrityMap should be a Map');
assert.strictEqual(typeof progressCallback, 'function');
debug(`verify: Verifying ${remotePath}`);
2025-09-12 09:48:37 +02:00
const encryptedFilenames = backupSite.encryption?.encryptedFilenames || false;
2025-08-15 21:35:34 +05:30
let fileCount = 0;
async function validateFile(entry) {
let relativePath = path.relative(remotePath, entry.path);
if (encryptedFilenames) {
2025-09-12 09:48:37 +02:00
const { error, result } = hush.decryptFilePath(relativePath, backupSite.encryption);
2025-08-15 21:35:34 +05:30
if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file');
relativePath = result;
}
++fileCount;
2025-09-12 09:48:37 +02:00
const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, entry.path);
2025-08-15 21:35:34 +05:30
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
ps.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` });
});
const streams = [ sourceStream, ps ];
2025-09-12 09:48:37 +02:00
if (backupSite.encryption) {
const decryptStream = new DecryptStream(backupSite.encryption);
2025-08-15 21:35:34 +05:30
streams.push(decryptStream);
}
const hash = new HashStream();
streams.push(hash);
await stream.pipeline(streams);
const integrity = integrityMap.get(relativePath);
if (ps.stats().transferred !== integrity.size) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${ps.stats().transferred}. Expecting ${integrity.size}`);
if (hash.digest() !== integrity.sha256) throw new BoxError(BoxError.BAD_STATE, `${entry.path} has size ${hash.digest()}. Expecting ${integrity.sha256}`);
}
debug(integrityMap.entries());
// https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441
2025-09-12 09:48:37 +02:00
const concurrency = backupSite.limits?.downloadConcurrency || (backupSite.provider === 's3' ? 30 : 10);
2025-08-15 21:35:34 +05:30
let marker = null;
while (true) {
2025-09-12 09:48:37 +02:00
const batch = await backupSites.storageApi(backupSite).listDir(backupSite.config, remotePath, marker === null ? 1 : 1000, marker); // try with one file first. if that works out, we continue faster
2025-08-15 21:35:34 +05:30
await async.eachLimit(batch.entries, concurrency, validateFile);
if (!batch.marker) break;
marker = batch.marker;
}
const check = (x, y) => { return x === y ? { status: 'passed' } : { status: 'failed', message: `Expecting ${x} but got ${y}` }; };
if (integrityMap.size !== fileCount) throw new BoxError(BoxError.BAD_STATE, `Got ${fileCount} files. Expecting ${integrityMap.size()} files`);
return {
size: { status: 'passed' },
fileCount: check(integrityMap.size, fileCount),
sha256: { status: 'passed' },
};
}