302 lines
16 KiB
JavaScript
302 lines
16 KiB
JavaScript
'use strict';
|
|
|
|
exports = module.exports = {
|
|
download,
|
|
upload,
|
|
getFileExtension,
|
|
|
|
_saveFsMetadata: saveFsMetadata,
|
|
_restoreFsMetadata: restoreFsMetadata
|
|
};
|
|
|
|
const assert = require('assert'),
|
|
async = require('async'),
|
|
backupTargets = require('../backuptargets.js'),
|
|
BoxError = require('../boxerror.js'),
|
|
crypto = require('crypto'),
|
|
DataLayout = require('../datalayout.js'),
|
|
{ DecryptStream } = require('../hush.js'),
|
|
debug = require('debug')('box:backupformat/rsync'),
|
|
{ EncryptStream } = require('../hush.js'),
|
|
fs = require('fs'),
|
|
hush = require('../hush.js'),
|
|
path = require('path'),
|
|
paths = require('../paths.js'),
|
|
ProgressStream = require('../progress-stream.js'),
|
|
promiseRetry = require('../promise-retry.js'),
|
|
{ Readable } = require('stream'),
|
|
safe = require('safetydance'),
|
|
shell = require('../shell.js')('backupformat/rsync'),
|
|
stream = require('stream/promises'),
|
|
syncer = require('../syncer.js');
|
|
|
|
async function addFile(sourceFile, encryption, uploader, progressCallback) {
|
|
assert.strictEqual(typeof sourceFile, 'string');
|
|
assert.strictEqual(typeof encryption, 'object');
|
|
assert.strictEqual(typeof uploader, 'object');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
// make sure file can be opened for reading before we start the pipeline. otherwise, we end up with
|
|
// destinations dirs/file which are owned by root (this process id) and cannot be copied (run as normal user)
|
|
const [openError, sourceHandle] = await safe(fs.promises.open(sourceFile, 'r'));
|
|
if (openError) {
|
|
debug(`addFile: ignoring disappeared file: ${sourceFile}`);
|
|
return;
|
|
}
|
|
|
|
const sourceStream = sourceHandle.createReadStream(sourceFile, { autoClose: true });
|
|
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
|
|
ps.on('progress', function (progress) {
|
|
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
|
if (!transferred && !speed) return progressCallback({ message: `Uploading ${sourceFile}` }); // 0M@0MBps looks wrong
|
|
progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong
|
|
});
|
|
|
|
const hash = crypto.createHash('sha256');
|
|
|
|
let pipeline = null;
|
|
if (encryption) {
|
|
const encryptStream = new EncryptStream(encryption);
|
|
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, uploader.stream));
|
|
} else {
|
|
pipeline = safe(stream.pipeline(sourceStream, ps, hash, uploader.stream));
|
|
}
|
|
|
|
const [error] = await safe(pipeline);
|
|
if (error && !error.message.includes('ENOENT')) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); // ignore error if file disappears
|
|
// debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`);
|
|
|
|
await uploader.finish();
|
|
return {
|
|
stats: ps.stats(),
|
|
integrity: { size: ps.stats().transferred, sha256: hash.digest('hex') }
|
|
};
|
|
}
|
|
|
|
async function sync(backupTarget, remotePath, dataLayout, progressCallback) {
|
|
assert.strictEqual(typeof backupTarget, 'object');
|
|
assert.strictEqual(typeof remotePath, 'string');
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
// the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB
|
|
const concurrency = backupTarget.limits?.syncConcurrency || (backupTarget.provider === 's3' ? 20 : 10);
|
|
const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupTarget.id, `${dataLayout.getBasename()}.sync.cache`);
|
|
const { delQueue, addQueue, integrityMap } = await syncer.sync(dataLayout, cacheFile);
|
|
debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`);
|
|
const aggredgatedStats = { added: addQueue.length, deleted: delQueue.length, size: 0, startTime: Date.now() };
|
|
|
|
async function processSyncerChange(change) {
|
|
debug('sync: processing task: %j', change);
|
|
// the empty task.path is special to signify the directory
|
|
const destPath = change.path && backupTarget.encryption?.encryptedFilenames ? hush.encryptFilePath(change.path, backupTarget.encryption) : change.path;
|
|
const fullPath = path.join(remotePath, destPath);
|
|
|
|
if (change.operation === 'removedir') {
|
|
debug(`Removing directory ${fullPath}`);
|
|
await backupTargets.storageApi(backupTarget).removeDir(backupTarget.config, fullPath, progressCallback);
|
|
} else if (change.operation === 'remove') {
|
|
debug(`Removing ${fullPath}`);
|
|
await backupTargets.storageApi(backupTarget).remove(backupTarget.config, fullPath);
|
|
} else if (change.operation === 'add') {
|
|
await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => {
|
|
progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
|
|
debug(`Adding ${change.path} position ${change.position} try ${retryCount}`);
|
|
|
|
const uploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, fullPath);
|
|
const { stats, integrity } = await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback);
|
|
integrityMap.set(destPath, integrity);
|
|
aggredgatedStats.size += stats.size;
|
|
});
|
|
}
|
|
}
|
|
|
|
const [delError] = await safe(async.eachLimit(delQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback)));
|
|
debug('sync: done processing deletes. error: %o', delError);
|
|
if (delError) throw delError;
|
|
|
|
const [addError] = await safe(async.eachLimit(addQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback)));
|
|
debug('sync: done processing adds. error: %o', addError);
|
|
if (addError) throw addError;
|
|
|
|
await syncer.finalize(cacheFile);
|
|
|
|
return {
|
|
stats: aggredgatedStats,
|
|
integrity: [...integrityMap.entries()].sort(([a], [b]) => a < b) // for readability, order the entries
|
|
};
|
|
}
|
|
|
|
// this is not part of 'snapshotting' because we need root access to traverse
|
|
async function saveFsMetadata(dataLayout, metadataFile) {
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof metadataFile, 'string');
|
|
|
|
// contains paths prefixed with './'
|
|
const metadata = {
|
|
emptyDirs: [],
|
|
execFiles: [],
|
|
symlinks: []
|
|
};
|
|
|
|
// we assume small number of files. spawnSync will raise a ENOBUFS error after maxBuffer
|
|
for (const lp of dataLayout.localPaths()) {
|
|
const [emptyDirsError, emptyDirs] = await safe(shell.spawn('find', [lp, '-type', 'd', '-empty'], { encoding: 'utf8', maxLines: 50000 }));
|
|
if (emptyDirsError && emptyDirsError.stdoutLineCount >= 50000) throw new BoxError(BoxError.FS_ERROR, `Too many empty directories. Run "find ${lp} -type d -empty" to investigate`);
|
|
if (emptyDirsError) throw emptyDirsError;
|
|
if (emptyDirs.length) metadata.emptyDirs = metadata.emptyDirs.concat(emptyDirs.trim().split('\n').map((ed) => dataLayout.toRemotePath(ed)));
|
|
|
|
const [execFilesError, execFiles] = await safe(shell.spawn('find', [lp, '-type', 'f', '-executable'], { encoding: 'utf8', maxLines: 20000 }));
|
|
if (execFilesError && execFilesError.stdoutLineCount >= 20000) throw new BoxError(BoxError.FS_ERROR, `Too many executable files. Run "find ${lp} -type f -executable" to investigate`);
|
|
if (execFilesError) throw execFilesError;
|
|
if (execFiles.length) metadata.execFiles = metadata.execFiles.concat(execFiles.trim().split('\n').map((ef) => dataLayout.toRemotePath(ef)));
|
|
|
|
const [symlinkFilesError, symlinkFiles] = await safe(shell.spawn('find', [lp, '-type', 'l'], { encoding: 'utf8', maxLines: 20000 }));
|
|
if (symlinkFilesError && symlinkFilesError.stdoutLineCount >= 20000) throw new BoxError(BoxError.FS_ERROR, `Too many symlinks. Run "find ${lp} -type l" to investigate`);
|
|
if (symlinkFilesError) throw symlinkFilesError;
|
|
|
|
if (symlinkFiles.length) metadata.symlinks = metadata.symlinks.concat(symlinkFiles.trim().split('\n').map((sl) => {
|
|
const target = safe.fs.readlinkSync(sl);
|
|
return { path: dataLayout.toRemotePath(sl), target };
|
|
}));
|
|
}
|
|
|
|
if (!safe.fs.writeFileSync(metadataFile, JSON.stringify(metadata, null, 2))) throw new BoxError(BoxError.FS_ERROR, `Error writing fs metadata: ${safe.error.message}`);
|
|
}
|
|
|
|
async function restoreFsMetadata(dataLayout, metadataFile) {
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof metadataFile, 'string');
|
|
|
|
debug(`Recreating empty directories in ${dataLayout.toString()}`);
|
|
|
|
const metadataJson = safe.fs.readFileSync(metadataFile, 'utf8');
|
|
if (metadataJson === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error loading fsmetadata.json:' + safe.error.message);
|
|
const metadata = safe.JSON.parse(metadataJson);
|
|
if (metadata === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error parsing fsmetadata.json:' + safe.error.message);
|
|
|
|
for (const emptyDir of metadata.emptyDirs) {
|
|
const [mkdirError] = await safe(fs.promises.mkdir(dataLayout.toLocalPath(emptyDir), { recursive: true }));
|
|
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to create path: ${mkdirError.message}`);
|
|
}
|
|
|
|
for (const execFile of metadata.execFiles) {
|
|
const [chmodError] = await safe(fs.promises.chmod(dataLayout.toLocalPath(execFile), parseInt('0755', 8)));
|
|
if (chmodError) throw new BoxError(BoxError.FS_ERROR, `unable to chmod: ${chmodError.message}`);
|
|
}
|
|
|
|
for (const symlink of (metadata.symlinks || [])) {
|
|
if (!symlink.target) continue;
|
|
// the path may not exist if we had a directory full of symlinks
|
|
const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(dataLayout.toLocalPath(symlink.path)), { recursive: true }));
|
|
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink (mkdir): ${mkdirError.message}`);
|
|
const [symlinkError] = await safe(fs.promises.symlink(symlink.target, dataLayout.toLocalPath(symlink.path), 'file'));
|
|
if (symlinkError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink: ${symlinkError.message}`);
|
|
}
|
|
}
|
|
|
|
async function downloadDir(backupTarget, backupFilePath, dataLayout, progressCallback) {
|
|
assert.strictEqual(typeof backupTarget, 'object');
|
|
assert.strictEqual(typeof backupFilePath, 'string');
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
const encryptedFilenames = backupTarget.encryption?.encryptedFilenames || false;
|
|
|
|
debug(`downloadDir: ${backupFilePath} to ${dataLayout.toString()}. encryption filenames: ${encryptedFilenames} content: ${!!backupTarget.encryption}`);
|
|
|
|
async function downloadFile(entry) {
|
|
let relativePath = path.relative(backupFilePath, entry.path);
|
|
if (encryptedFilenames) {
|
|
const { error, result } = hush.decryptFilePath(relativePath, backupTarget.encryption);
|
|
if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file');
|
|
relativePath = result;
|
|
}
|
|
const destFilePath = dataLayout.toLocalPath('./' + relativePath);
|
|
|
|
const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(destFilePath), { recursive: true }));
|
|
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, mkdirError.message);
|
|
|
|
await promiseRetry({ times: 3, interval: 20000 }, async function () {
|
|
const [downloadError, sourceStream] = await safe(backupTargets.storageApi(backupTarget).download(backupTarget.config, entry.path));
|
|
if (downloadError) {
|
|
progressCallback({ message: `Download ${entry.path} to ${destFilePath} errored: ${downloadError.message}` });
|
|
throw downloadError;
|
|
}
|
|
|
|
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
|
|
ps.on('progress', function (progress) {
|
|
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
|
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong
|
|
progressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` });
|
|
});
|
|
|
|
const destStream = fs.createWriteStream(destFilePath);
|
|
|
|
const streams = [ sourceStream, ps ];
|
|
|
|
if (backupTarget.encryption) {
|
|
const decryptStream = new DecryptStream(backupTarget.encryption);
|
|
streams.push(decryptStream);
|
|
}
|
|
|
|
streams.push(destStream);
|
|
|
|
progressCallback({ message: `Downloading ${entry.path} to ${destFilePath}` });
|
|
|
|
const [pipelineError] = await safe(stream.pipeline(streams));
|
|
if (pipelineError) {
|
|
progressCallback({ message: `Download error ${entry.path} to ${destFilePath}: ${pipelineError.message}` });
|
|
throw pipelineError;
|
|
}
|
|
progressCallback({ message: `Download finished ${entry.path} to ${destFilePath}` });
|
|
});
|
|
}
|
|
|
|
// https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441
|
|
const concurrency = backupTarget.limits?.downloadConcurrency || (backupTarget.provider === 's3' ? 30 : 10);
|
|
let marker = null;
|
|
while (true) {
|
|
const batch = await backupTargets.storageApi(backupTarget).listDir(backupTarget.config, backupFilePath, marker === null ? 1 : 1000, marker); // try with one file first. if that works out, we continue faster
|
|
await async.eachLimit(batch.entries, concurrency, downloadFile);
|
|
if (!batch.marker) break;
|
|
marker = batch.marker;
|
|
}
|
|
}
|
|
|
|
async function download(backupTarget, remotePath, dataLayout, progressCallback) {
|
|
assert.strictEqual(typeof backupTarget, 'object');
|
|
assert.strictEqual(typeof remotePath, 'string');
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`);
|
|
|
|
await downloadDir(backupTarget, remotePath, dataLayout, progressCallback);
|
|
await restoreFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`);
|
|
}
|
|
|
|
async function upload(backupTarget, remotePath, dataLayout, progressCallback) {
|
|
assert.strictEqual(typeof backupTarget, 'object');
|
|
assert.strictEqual(typeof remotePath, 'string');
|
|
assert.strictEqual(typeof dataLayout, 'object');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
await saveFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`);
|
|
const { stats, integrity } = await sync(backupTarget, remotePath, dataLayout, progressCallback);
|
|
const integrityDataJsonString = JSON.stringify(Object.fromEntries(integrity), null, 2);
|
|
const integrityDataStream = Readable.from(integrityDataJsonString);
|
|
const integrityUploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, `${remotePath}.checksum`);
|
|
await stream.pipeline(integrityDataStream, integrityUploader.stream);
|
|
await integrityUploader.finish();
|
|
|
|
const signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey);
|
|
return { stats, integrity: { signature } };
|
|
}
|
|
|
|
function getFileExtension(encryption) {
|
|
assert.strictEqual(typeof encryption, 'boolean');
|
|
|
|
return ''; // this also signals to backupcleanear that we are dealing with directories
|
|
}
|