Make restore/download logic have progress callbacks

This commit is contained in:
Girish Ramakrishnan
2018-11-27 11:55:53 -08:00
parent b2465dd2ee
commit 632ba69663
3 changed files with 25 additions and 21 deletions

View File

@@ -238,10 +238,6 @@ function createReadStream(sourceFile, key) {
ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
ps.on('progress', function(progress) {
debug('createReadStream: %s@%s (%s)', Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps', sourceFile);
});
if (key !== null) {
var encrypt = crypto.createCipher('aes-256-cbc', key);
encrypt.on('error', function (error) {
@@ -347,6 +343,9 @@ function sync(backupConfig, backupId, dataDir, progressCallback, callback) {
debug(`read stream error for ${task.path}: ${error.message}`);
retryCallback();
}); // ignore error if file disappears
stream.on('progress', function(progress) {
progressCallback({ message: `Uploading ${task.path}: ${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}` });
});
api(backupConfig.provider).upload(backupConfig, backupFilePath, stream, function (error) {
debug(error ? `Error uploading ${task.path} try ${retryCount}: ${error.message}` : `Uploaded ${task.path}`);
retryCallback(error);
@@ -399,7 +398,7 @@ function upload(backupId, format, dataDir, progressCallback, callback) {
var tarStream = createTarPackStream(dataDir, backupConfig.key || null);
tarStream.on('progress', function(progress) {
progressCallback({ message: `${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}Mbps` });
progressCallback({ message: `Uploading ${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}Mbps` });
});
tarStream.on('error', retryCallback); // already returns BackupsError
@@ -431,10 +430,6 @@ function tarExtract(inStream, destination, key, callback) {
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
ps.on('progress', function(progress) {
debug('tarExtract: %s@%s', Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps');
});
gunzip.on('error', function (error) {
debug('tarExtract: gunzip stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
@@ -460,6 +455,8 @@ function tarExtract(inStream, destination, key, callback) {
} else {
inStream.pipe(ps).pipe(gunzip).pipe(extract);
}
return ps;
}
function restoreFsMetadata(appDataDir, callback) {
@@ -488,10 +485,11 @@ function restoreFsMetadata(appDataDir, callback) {
});
}
function downloadDir(backupConfig, backupFilePath, destDir, callback) {
function downloadDir(backupConfig, backupFilePath, destDir, progressCallback, callback) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof destDir, 'string');
assert.strictEqual(typeof progressCallback, 'function');
assert.strictEqual(typeof callback, 'function');
debug(`downloadDir: ${backupFilePath} to ${destDir}`);
@@ -515,7 +513,7 @@ function downloadDir(backupConfig, backupFilePath, destDir, callback) {
let destStream = createWriteStream(destFilePath, backupConfig.key || null);
destStream.on('error', callback);
debug(`downloadDir: Copying ${entry.fullPath} to ${destFilePath}`);
progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` });
sourceStream.pipe(destStream, { end: true }).on('finish', callback);
});
@@ -527,23 +525,27 @@ function downloadDir(backupConfig, backupFilePath, destDir, callback) {
}, callback);
}
function download(backupConfig, backupId, format, dataDir, callback) {
function download(backupConfig, backupId, format, dataDir, progressCallback, callback) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof backupId, 'string');
assert.strictEqual(typeof format, 'string');
assert.strictEqual(typeof dataDir, 'string');
assert.strictEqual(typeof progressCallback, 'function');
assert.strictEqual(typeof callback, 'function');
tasks.setProgress(tasks.TASK_BACKUP, { detail: `Downloading ${backupId} of format ${format} to ${dataDir}` }, NOOP_CALLBACK);
debug(`download - Downloading ${backupId} of format ${format} to ${dataDir}`);
if (format === 'tgz') {
api(backupConfig.provider).download(backupConfig, getBackupFilePath(backupConfig, backupId, format), function (error, sourceStream) {
if (error) return callback(error);
tarExtract(sourceStream, dataDir, backupConfig.key || null, callback);
let ps = tarExtract(sourceStream, dataDir, backupConfig.key || null, callback);
ps.on('progress', function (progress) {
progressCallback({ message: `Downloading ${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}Mbps` });
});
});
} else {
downloadDir(backupConfig, getBackupFilePath(backupConfig, backupId, format), dataDir, function (error) {
downloadDir(backupConfig, getBackupFilePath(backupConfig, backupId, format), dataDir, progressCallback, function (error) {
if (error) return callback(error);
restoreFsMetadata(dataDir, callback);
@@ -551,12 +553,13 @@ function download(backupConfig, backupId, format, dataDir, callback) {
}
}
function restore(backupConfig, backupId, callback) {
function restore(backupConfig, backupId, progressCallback, callback) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof backupId, 'string');
assert.strictEqual(typeof progressCallback, 'function');
assert.strictEqual(typeof callback, 'function');
download(backupConfig, backupId, backupConfig.format, paths.BOX_DATA_DIR, function (error) {
download(backupConfig, backupId, backupConfig.format, paths.BOX_DATA_DIR, progressCallback, function (error) {
if (error) return callback(error);
debug('restore: download completed, importing database');
@@ -571,10 +574,11 @@ function restore(backupConfig, backupId, callback) {
});
}
function restoreApp(app, addonsToRestore, restoreConfig, callback) {
function restoreApp(app, addonsToRestore, restoreConfig, progressCallback, callback) {
assert.strictEqual(typeof app, 'object');
assert.strictEqual(typeof addonsToRestore, 'object');
assert.strictEqual(typeof restoreConfig, 'object');
assert.strictEqual(typeof progressCallback, 'object');
assert.strictEqual(typeof callback, 'function');
var appDataDir = safe.fs.realpathSync(path.join(paths.APPS_DATA_DIR, app.id));
@@ -585,7 +589,7 @@ function restoreApp(app, addonsToRestore, restoreConfig, callback) {
if (error) return callback(new BackupsError(BackupsError.INTERNAL_ERROR, error));
async.series([
download.bind(null, backupConfig, restoreConfig.backupId, restoreConfig.backupFormat, appDataDir),
download.bind(null, backupConfig, restoreConfig.backupId, restoreConfig.backupFormat, appDataDir, progressCallback),
addons.restoreAddons.bind(null, app, addonsToRestore)
], function (error) {
debug('restoreApp: time: %s', (new Date() - startTime)/1000);