backuptask: make upload/download async
This commit is contained in:
@@ -217,34 +217,29 @@ function downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback,
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function download(backupConfig, remotePath, dataLayout, progressCallback, callback) {
|
||||
async function download(backupConfig, remotePath, dataLayout, progressCallback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`);
|
||||
|
||||
const backupFilePath = getBackupFilePath(backupConfig, remotePath);
|
||||
const downloadDirAsync = util.promisify(downloadDir);
|
||||
|
||||
downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback, async function (error) {
|
||||
if (error) return callback(error);
|
||||
|
||||
[error] = await safe(restoreFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`));
|
||||
callback(error);
|
||||
});
|
||||
await downloadDirAsync(backupConfig, backupFilePath, dataLayout, progressCallback);
|
||||
await restoreFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`);
|
||||
}
|
||||
|
||||
function upload(backupConfig, remotePath, dataLayout, progressCallback, callback) {
|
||||
async function upload(backupConfig, remotePath, dataLayout, progressCallback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert.strictEqual(typeof dataLayout, 'object');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
async.series([
|
||||
saveFsMetadata.bind(null, dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`),
|
||||
sync.bind(null, backupConfig, remotePath, dataLayout, progressCallback)
|
||||
], callback);
|
||||
const syncAsync = util.promisify(sync);
|
||||
|
||||
await saveFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`);
|
||||
await syncAsync(backupConfig, remotePath, dataLayout, progressCallback);
|
||||
}
|
||||
|
||||
@@ -133,55 +133,63 @@ function tarExtract(inStream, dataLayout, encryption) {
|
||||
return ps;
|
||||
}
|
||||
|
||||
function download(backupConfig, remotePath, dataLayout, progressCallback, callback) {
|
||||
async function download(backupConfig, remotePath, dataLayout, progressCallback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`);
|
||||
|
||||
const backupFilePath = getBackupFilePath(backupConfig, remotePath);
|
||||
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
progressCallback({ message: `Downloading backup ${remotePath}` });
|
||||
return new Promise((resolve, reject) => {
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
progressCallback({ message: `Downloading backup ${remotePath}` });
|
||||
|
||||
storage.api(backupConfig.provider).download(backupConfig, backupFilePath, function (error, sourceStream) {
|
||||
if (error) return retryCallback(error);
|
||||
storage.api(backupConfig.provider).download(backupConfig, backupFilePath, function (error, sourceStream) {
|
||||
if (error) return retryCallback(error);
|
||||
|
||||
const ps = tarExtract(sourceStream, dataLayout, backupConfig.encryption);
|
||||
const ps = tarExtract(sourceStream, dataLayout, backupConfig.encryption);
|
||||
|
||||
ps.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
|
||||
ps.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
ps.on('error', retryCallback);
|
||||
ps.on('done', retryCallback);
|
||||
});
|
||||
ps.on('error', retryCallback);
|
||||
ps.on('done', retryCallback);
|
||||
}, (error) => {
|
||||
if (error) return reject(error);
|
||||
resolve();
|
||||
});
|
||||
}, callback);
|
||||
});
|
||||
}
|
||||
|
||||
function upload(backupConfig, remotePath, dataLayout, progressCallback, callback) {
|
||||
async function upload(backupConfig, remotePath, dataLayout, progressCallback) {
|
||||
assert.strictEqual(typeof backupConfig, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert.strictEqual(typeof dataLayout, 'object');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
retryCallback = once(retryCallback); // protect again upload() erroring much later after tar stream error
|
||||
return new Promise((resolve, reject) => {
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
retryCallback = once(retryCallback); // protect again upload() erroring much later after tar stream error
|
||||
|
||||
const tarStream = tarPack(dataLayout, backupConfig.encryption);
|
||||
const tarStream = tarPack(dataLayout, backupConfig.encryption);
|
||||
|
||||
tarStream.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` });
|
||||
tarStream.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
tarStream.on('error', retryCallback); // already returns BoxError
|
||||
|
||||
storage.api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, remotePath), tarStream, retryCallback);
|
||||
}, (error) => {
|
||||
if (error) return reject(error);
|
||||
resolve();
|
||||
});
|
||||
tarStream.on('error', retryCallback); // already returns BoxError
|
||||
|
||||
storage.api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, remotePath), tarStream, retryCallback);
|
||||
}, callback);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user