diff --git a/src/backups.js b/src/backups.js index 7af352e28..5605fe400 100644 --- a/src/backups.js +++ b/src/backups.js @@ -27,8 +27,6 @@ exports = module.exports = { // for testing _getBackupFilePath: getBackupFilePath, - _createTarPackStream: createTarPackStream, - _tarExtract: tarExtract, _restoreFsMetadata: restoreFsMetadata, _saveFsMetadata: saveFsMetadata }; @@ -262,9 +260,10 @@ function createWriteStream(destFile, key) { } } -function createTarPackStream(sourceDir, key) { +function tarPack(sourceDir, key, callback) { assert.strictEqual(typeof sourceDir, 'string'); assert(key === null || typeof key === 'string'); + assert.strictEqual(typeof callback, 'function'); var pack = tar.pack('/', { dereference: false, // pack the symlink and not what it points to @@ -280,25 +279,27 @@ function createTarPackStream(sourceDir, key) { var ps = progressStream({ time: 10000 }); // emit 'pgoress' every 10 seconds pack.on('error', function (error) { - debug('createTarPackStream: tar stream error.', error); + debug('tarPack: tar stream error.', error); ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); gzip.on('error', function (error) { - debug('createTarPackStream: gzip stream error.', error); + debug('tarPack: gzip stream error.', error); ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); if (key !== null) { var encrypt = crypto.createCipher('aes-256-cbc', key); encrypt.on('error', function (error) { - debug('createTarPackStream: encrypt stream error.', error); + debug('tarPack: encrypt stream error.', error); ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); - return pack.pipe(gzip).pipe(encrypt).pipe(ps); + pack.pipe(gzip).pipe(encrypt).pipe(ps); } else { - return pack.pipe(gzip).pipe(ps); + pack.pipe(gzip).pipe(ps); } + + callback(null, ps); } function sync(backupConfig, backupId, dataDir, progressCallback, callback) { @@ -390,13 +391,16 @@ function upload(backupId, format, dataDir, progressCallback, callback) { async.retry({ times: 5, interval: 20000 }, function (retryCallback) { retryCallback = once(retryCallback); // protect again upload() erroring much later after tar stream error - var tarStream = createTarPackStream(dataDir, backupConfig.key || null); - tarStream.on('progress', function(progress) { - progressCallback({ message: `Uploading ${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}Mbps` }); - }); - tarStream.on('error', retryCallback); // already returns BackupsError + tarPack(dataDir, backupConfig.key || null, function (error, tarStream) { + if (error) return retryCallback(error); - api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, backupId, format), tarStream, retryCallback); + tarStream.on('progress', function(progress) { + progressCallback({ message: `Uploading ${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}Mbps` }); + }); + tarStream.on('error', retryCallback); // already returns BackupsError + + api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, backupId, format), tarStream, retryCallback); + }); }, callback); } else { async.series([ @@ -413,44 +417,44 @@ function tarExtract(inStream, destination, key, callback) { assert(key === null || typeof key === 'string'); assert.strictEqual(typeof callback, 'function'); - callback = once(callback); - var gunzip = zlib.createGunzip({}); var ps = progressStream({ time: 10000 }); // display a progress every 10 seconds var extract = tar.extract(destination); + const emitError = once((error) => ps.emit('error', error)); + inStream.on('error', function (error) { debug('tarExtract: input stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + emitError(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); gunzip.on('error', function (error) { debug('tarExtract: gunzip stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + emitError(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); extract.on('error', function (error) { debug('tarExtract: extract stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + emitError(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); extract.on('finish', function () { debug('tarExtract: done.'); - callback(null); + ps.emit('finish'); }); if (key !== null) { var decrypt = crypto.createDecipher('aes-256-cbc', key); decrypt.on('error', function (error) { debug('tarExtract: decrypt stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Failed to decrypt: ${error.message}`)); + emitError(new BackupsError(BackupsError.EXTERNAL_ERROR, `Failed to decrypt: ${error.message}`)); }); inStream.pipe(ps).pipe(decrypt).pipe(gunzip).pipe(extract); } else { inStream.pipe(ps).pipe(gunzip).pipe(extract); } - return ps; + callback(null, ps); } function restoreFsMetadata(appDataDir, callback) { @@ -533,9 +537,14 @@ function download(backupConfig, backupId, format, dataDir, progressCallback, cal api(backupConfig.provider).download(backupConfig, getBackupFilePath(backupConfig, backupId, format), function (error, sourceStream) { if (error) return callback(error); - let ps = tarExtract(sourceStream, dataDir, backupConfig.key || null, callback); - ps.on('progress', function (progress) { - progressCallback({ message: `Downloading ${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}Mbps` }); + tarExtract(sourceStream, dataDir, backupConfig.key || null, function (error, ps) { + if (error) return callback(error); + + ps.on('progress', function (progress) { + progressCallback({ message: `Downloading ${Math.round(progress.transferred/1024/1024)}M@${Math.round(progress.speed/1024/1024)}Mbps` }); + }); + ps.on('error', callback); + ps.on('finish', callback); }); }); } else {