split up backupformat logic into separate files
This commit is contained in:
250
src/backupformat/rsync.js
Normal file
250
src/backupformat/rsync.js
Normal file
@@ -0,0 +1,250 @@
|
||||
'use strict';
|
||||
|
||||
exports = module.exports = {
|
||||
getBackupFilePath,
|
||||
download,
|
||||
upload,
|
||||
|
||||
_saveFsMetadata: saveFsMetadata,
|
||||
_restoreFsMetadata: restoreFsMetadata
|
||||
};
|
||||
|
||||
const assert = require('assert'),
|
||||
async = require('async'),
|
||||
BoxError = require('../boxerror.js'),
|
||||
DataLayout = require('../datalayout.js'),
|
||||
debug = require('debug')('box:backupformat/rsync'),
|
||||
fs = require('fs'),
|
||||
hush = require('../hush.js'),
|
||||
once = require('../once.js'),
|
||||
path = require('path'),
|
||||
safe = require('safetydance'),
|
||||
storage = require('../storage.js'),
|
||||
syncer = require('../syncer.js'),
|
||||
util = require('util');
|
||||
|
||||
function getBackupFilePath(backupConfig, remotePath) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const rootPath = storage.api(backupConfig.provider).getRootPath(backupConfig);
|
||||
return path.join(rootPath, remotePath);
|
||||
}
|
||||
|
||||
function sync(backupConfig, remotePath, dataLayout, progressCallback, callback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
// the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB
|
||||
const concurrency = backupConfig.syncConcurrency || (backupConfig.provider === 's3' ? 20 : 10);
|
||||
const removeDir = util.callbackify(storage.api(backupConfig.provider).removeDir);
|
||||
const remove = util.callbackify(storage.api(backupConfig.provider).remove);
|
||||
|
||||
syncer.sync(dataLayout, function processTask(task, iteratorCallback) {
|
||||
debug('sync: processing task: %j', task);
|
||||
// the empty task.path is special to signify the directory
|
||||
const destPath = task.path && backupConfig.encryption ? hush.encryptFilePath(task.path, backupConfig.encryption) : task.path;
|
||||
const backupFilePath = path.join(getBackupFilePath(backupConfig, remotePath), destPath);
|
||||
|
||||
if (task.operation === 'removedir') {
|
||||
debug(`Removing directory ${backupFilePath}`);
|
||||
return removeDir(backupConfig, backupFilePath, progressCallback, iteratorCallback);
|
||||
} else if (task.operation === 'remove') {
|
||||
debug(`Removing ${backupFilePath}`);
|
||||
return remove(backupConfig, backupFilePath, iteratorCallback);
|
||||
}
|
||||
|
||||
let retryCount = 0;
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
retryCallback = once(retryCallback); // protect again upload() erroring much later after read stream error
|
||||
|
||||
++retryCount;
|
||||
if (task.operation === 'add') {
|
||||
progressCallback({ message: `Adding ${task.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
|
||||
debug(`Adding ${task.path} position ${task.position} try ${retryCount}`);
|
||||
const stream = hush.createReadStream(dataLayout.toLocalPath('./' + task.path), backupConfig.encryption);
|
||||
stream.on('error', (error) => retryCallback(error.message.includes('ENOENT') ? null : error)); // ignore error if file disappears
|
||||
stream.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: `Uploading ${task.path}` }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Uploading ${task.path}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong
|
||||
});
|
||||
// only create the destination path when we have confirmation that the source is available. otherwise, we end up with
|
||||
// files owned as 'root' and the cp later will fail
|
||||
stream.on('open', function () {
|
||||
storage.api(backupConfig.provider).upload(backupConfig, backupFilePath, stream, function (error) {
|
||||
debug(error ? `Error uploading ${task.path} try ${retryCount}: ${error.message}` : `Uploaded ${task.path}`);
|
||||
retryCallback(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
}, iteratorCallback);
|
||||
}, concurrency, function (error) {
|
||||
if (error) return callback(new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
||||
// this is not part of 'snapshotting' because we need root access to traverse
|
||||
async function saveFsMetadata(dataLayout, metadataFile) {
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof metadataFile, 'string');
|
||||
|
||||
// contains paths prefixed with './'
|
||||
const metadata = {
|
||||
emptyDirs: [],
|
||||
execFiles: [],
|
||||
symlinks: []
|
||||
};
|
||||
|
||||
// we assume small number of files. spawnSync will raise a ENOBUFS error after maxBuffer
|
||||
for (let lp of dataLayout.localPaths()) {
|
||||
const emptyDirs = safe.child_process.execSync(`find ${lp} -type d -empty`, { encoding: 'utf8', maxBuffer: 1024 * 1024 * 30 });
|
||||
if (emptyDirs === null) throw new BoxError(BoxError.FS_ERROR, `Error finding empty dirs: ${safe.error.message}`);
|
||||
if (emptyDirs.length) metadata.emptyDirs = metadata.emptyDirs.concat(emptyDirs.trim().split('\n').map((ed) => dataLayout.toRemotePath(ed)));
|
||||
|
||||
const execFiles = safe.child_process.execSync(`find ${lp} -type f -executable`, { encoding: 'utf8', maxBuffer: 1024 * 1024 * 30 });
|
||||
if (execFiles === null) throw new BoxError(BoxError.FS_ERROR, `Error finding executables: ${safe.error.message}`);
|
||||
if (execFiles.length) metadata.execFiles = metadata.execFiles.concat(execFiles.trim().split('\n').map((ef) => dataLayout.toRemotePath(ef)));
|
||||
|
||||
const symlinks = safe.child_process.execSync(`find ${lp} -type l`, { encoding: 'utf8', maxBuffer: 1024 * 1024 * 30 });
|
||||
if (symlinks === null) throw new BoxError(BoxError.FS_ERROR, `Error finding symlinks: ${safe.error.message}`);
|
||||
if (symlinks.length) metadata.symlinks = metadata.symlinks.concat(symlinks.trim().split('\n').map((sl) => {
|
||||
const target = safe.fs.readlinkSync(sl);
|
||||
return { path: dataLayout.toRemotePath(sl), target };
|
||||
}));
|
||||
}
|
||||
|
||||
if (!safe.fs.writeFileSync(metadataFile, JSON.stringify(metadata, null, 4))) throw new BoxError(BoxError.FS_ERROR, `Error writing fs metadata: ${safe.error.message}`);
|
||||
}
|
||||
|
||||
async function restoreFsMetadata(dataLayout, metadataFile) {
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof metadataFile, 'string');
|
||||
|
||||
debug(`Recreating empty directories in ${dataLayout.toString()}`);
|
||||
|
||||
const metadataJson = safe.fs.readFileSync(metadataFile, 'utf8');
|
||||
if (metadataJson === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error loading fsmetadata.json:' + safe.error.message);
|
||||
const metadata = safe.JSON.parse(metadataJson);
|
||||
if (metadata === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error parsing fsmetadata.json:' + safe.error.message);
|
||||
|
||||
for (const emptyDir of metadata.emptyDirs) {
|
||||
const [mkdirError] = await safe(fs.promises.mkdir(dataLayout.toLocalPath(emptyDir), { recursive: true }));
|
||||
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to create path: ${mkdirError.message}`);
|
||||
}
|
||||
|
||||
for (const execFile of metadata.execFiles) {
|
||||
const [chmodError] = await safe(fs.promises.chmod(dataLayout.toLocalPath(execFile), parseInt('0755', 8)));
|
||||
if (chmodError) throw new BoxError(BoxError.FS_ERROR, `unable to chmod: ${chmodError.message}`);
|
||||
}
|
||||
|
||||
for (const symlink of (metadata.symlinks || [])) {
|
||||
if (!symlink.target) continue;
|
||||
// the path may not exist if we had a directory full of symlinks
|
||||
const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(dataLayout.toLocalPath(symlink.path)), { recursive: true }));
|
||||
if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink (mkdir): ${mkdirError.message}`);
|
||||
const [symlinkError] = await safe(fs.promises.symlink(symlink.target, dataLayout.toLocalPath(symlink.path), 'file'));
|
||||
if (symlinkError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink: ${symlinkError.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
function downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback, callback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
debug(`downloadDir: ${backupFilePath} to ${dataLayout.toString()}`);
|
||||
|
||||
function downloadFile(entry, done) {
|
||||
let relativePath = path.relative(backupFilePath, entry.fullPath);
|
||||
if (backupConfig.encryption) {
|
||||
const { error, result } = hush.decryptFilePath(relativePath, backupConfig.encryption);
|
||||
if (error) return done(new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file'));
|
||||
relativePath = result;
|
||||
}
|
||||
const destFilePath = dataLayout.toLocalPath('./' + relativePath);
|
||||
|
||||
fs.mkdir(path.dirname(destFilePath), { recursive: true }, function (error) {
|
||||
if (error) return done(new BoxError(BoxError.FS_ERROR, error.message));
|
||||
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
storage.api(backupConfig.provider).download(backupConfig, entry.fullPath, function (error, sourceStream) {
|
||||
if (error) {
|
||||
progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${error.message}` });
|
||||
return retryCallback(error);
|
||||
}
|
||||
|
||||
let destStream = hush.createWriteStream(destFilePath, backupConfig.encryption);
|
||||
|
||||
// protect against multiple errors. must destroy the write stream so that a previous retry does not write
|
||||
let closeAndRetry = once((error) => {
|
||||
if (error) progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${error.message}` });
|
||||
else progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} finished` });
|
||||
sourceStream.destroy();
|
||||
destStream.destroy();
|
||||
retryCallback(error);
|
||||
});
|
||||
|
||||
destStream.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
destStream.on('error', closeAndRetry);
|
||||
|
||||
sourceStream.on('error', closeAndRetry);
|
||||
|
||||
progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` });
|
||||
|
||||
sourceStream.pipe(destStream, { end: true }).on('done', closeAndRetry);
|
||||
});
|
||||
}, done);
|
||||
});
|
||||
}
|
||||
|
||||
storage.api(backupConfig.provider).listDir(backupConfig, backupFilePath, 1000, function (entries, iteratorDone) {
|
||||
// https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441
|
||||
const concurrency = backupConfig.downloadConcurrency || (backupConfig.provider === 's3' ? 30 : 10);
|
||||
|
||||
async.eachLimit(entries, concurrency, downloadFile, iteratorDone);
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function download(backupConfig, remotePath, dataLayout, progressCallback, callback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`);
|
||||
|
||||
const backupFilePath = getBackupFilePath(backupConfig, remotePath);
|
||||
|
||||
downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback, async function (error) {
|
||||
if (error) return callback(error);
|
||||
|
||||
[error] = await safe(restoreFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`));
|
||||
callback(error);
|
||||
});
|
||||
}
|
||||
|
||||
function upload(backupConfig, remotePath, dataLayout, progressCallback, callback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert.strictEqual(typeof dataLayout, 'object');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
async.series([
|
||||
saveFsMetadata.bind(null, dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`),
|
||||
sync.bind(null, backupConfig, remotePath, dataLayout, progressCallback)
|
||||
], callback);
|
||||
}
|
||||
Reference in New Issue
Block a user