diff --git a/src/storage/caas.js b/src/storage/caas.js index d68e8d838..9ff034159 100644 --- a/src/storage/caas.js +++ b/src/storage/caas.js @@ -19,14 +19,11 @@ var assert = require('assert'), config = require('../config.js'), crypto = require('crypto'), debug = require('debug')('box:storage/caas'), - mkdirp = require('mkdirp'), once = require('once'), + PassThrough = require('stream').PassThrough, path = require('path'), - progress = require('progress-stream'), - spawn = require('child_process').spawn, superagent = require('superagent'), - tar = require('tar-fs'), - zlib = require('zlib'); + targz = require('./targz.js'); var FILE_TYPE = '.tar.gz.enc'; @@ -79,45 +76,12 @@ function backup(apiConfig, backupId, sourceDirectories, callback) { getBackupCredentials(apiConfig, function (error, credentials) { if (error) return callback(error); - var pack = tar.pack('/', { - entries: sourceDirectories.map(function (m) { return m.source; }), - map: function(header) { - sourceDirectories.forEach(function (m) { - header.name = header.name.replace(new RegExp('^' + m.source + '(/?)'), m.destination + '$1'); - }); - return header; - } - }); - - var gzip = zlib.createGzip({}); - var encrypt = crypto.createCipher('aes-256-cbc', apiConfig.key || ''); - var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds - - pack.on('error', function (error) { - console.error('[%s] backup: tar stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - gzip.on('error', function (error) { - console.error('[%s] backup: gzip stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - encrypt.on('error', function (error) { - console.error('[%s] backup: encrypt stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - progressStream.on('progress', function(progress) { - debug('[%s] backup: %s @ %s', backupId, Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); - }); - - pack.pipe(gzip).pipe(encrypt).pipe(progressStream); + var passThrough = new PassThrough(); var params = { Bucket: apiConfig.bucket, Key: backupFilePath, - Body: progressStream + Body: passThrough }; var s3 = new AWS.S3(credentials); @@ -129,6 +93,8 @@ function backup(apiConfig, backupId, sourceDirectories, callback) { callback(null); }); + + targz.create(sourceDirectories, apiConfig.key, passThrough, callback); }); } @@ -138,6 +104,8 @@ function restore(apiConfig, backupId, destination, callback) { assert.strictEqual(typeof destination, 'string'); assert.strictEqual(typeof callback, 'function'); + callback = once(callback); + var isOldFormat = backupId.endsWith('.tar.gz'); var backupFilePath = isOldFormat ? path.join(apiConfig.prefix, backupId) : getBackupFilePath(apiConfig, backupId); @@ -146,69 +114,23 @@ function restore(apiConfig, backupId, destination, callback) { getBackupCredentials(apiConfig, function (error, credentials) { if (error) return callback(error); - mkdirp(destination, function (error) { - if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + var params = { + Bucket: apiConfig.bucket, + Key: backupFilePath + }; - var params = { - Bucket: apiConfig.bucket, - Key: backupFilePath - }; + var s3 = new AWS.S3(credentials); + var s3get = s3.getObject(params).createReadStream(); - var s3 = new AWS.S3(credentials); - var s3get = s3.getObject(params).createReadStream(); + s3get.on('error', function (error) { + // TODO ENOENT for the mock, fix upstream! + if (error.code === 'NoSuchKey' || error.code === 'ENOENT') return callback(new BackupsError(BackupsError.NOT_FOUND)); - var decrypt; - - if (isOldFormat) { - let args = ['aes-256-cbc', '-d', '-pass', 'pass:' + apiConfig.key]; - decrypt = spawn('openssl', args, { stdio: [ 'pipe', 'pipe', process.stderr ]}); - } else { - decrypt = crypto.createDecipher('aes-256-cbc', apiConfig.key || ''); - } - - var gunzip = zlib.createGunzip({}); - var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds - var extract = tar.extract(destination); - - s3get.on('error', function (error) { - // TODO ENOENT for the mock, fix upstream! - if (error.code === 'NoSuchKey' || error.code === 'ENOENT') return callback(new BackupsError(BackupsError.NOT_FOUND)); - - console.error('[%s] restore: s3 stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - progressStream.on('progress', function(progress) { - debug('[%s] restore: %s @ %s', backupId, Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); - }); - - decrypt.on('error', function (error) { - console.error('[%s] restore: decipher stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - gunzip.on('error', function (error) { - console.error('[%s] restore: gunzip stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('error', function (error) { - console.error('[%s] restore: extract stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('finish', function () { - debug('[%s] restore: done.', backupId); - callback(); - }); - - if (isOldFormat) { - s3get.pipe(progressStream).pipe(decrypt.stdin); - decrypt.stdout.pipe(gunzip).pipe(extract); - } else { - s3get.pipe(progressStream).pipe(decrypt).pipe(gunzip).pipe(extract); - } + console.error('[%s] restore: s3 stream error.', backupId, error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); + + targz.extract(s3get, isOldFormat, destination, apiConfig.key || '', callback); }); } diff --git a/src/storage/filesystem.js b/src/storage/filesystem.js index a80f0a4e7..a5f1470de 100644 --- a/src/storage/filesystem.js +++ b/src/storage/filesystem.js @@ -16,18 +16,14 @@ exports = module.exports = { var assert = require('assert'), async = require('async'), BackupsError = require('../backups.js').BackupsError, - crypto = require('crypto'), config = require('../config.js'), debug = require('debug')('box:storage/filesystem'), fs = require('fs'), mkdirp = require('mkdirp'), once = require('once'), path = require('path'), - progress = require('progress-stream'), safe = require('safetydance'), - spawn = require('child_process').spawn, - tar = require('tar-fs'), - zlib = require('zlib'); + targz = require('./targz.js'); var FALLBACK_BACKUP_FOLDER = '/var/backups'; var FILE_TYPE = '.tar.gz.enc'; @@ -57,40 +53,8 @@ function backup(apiConfig, backupId, sourceDirectories, callback) { mkdirp(path.dirname(backupFilePath), function (error) { if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - var pack = tar.pack('/', { - entries: sourceDirectories.map(function (m) { return m.source; }), - map: function(header) { - sourceDirectories.forEach(function (m) { - header.name = header.name.replace(new RegExp('^' + m.source + '(/?)'), m.destination + '$1'); - }); - return header; - } - }); - - var gzip = zlib.createGzip({}); - var encrypt = crypto.createCipher('aes-256-cbc', apiConfig.key || ''); - var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds var fileStream = fs.createWriteStream(backupFilePath); - pack.on('error', function (error) { - console.error('[%s] backup: tar stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - gzip.on('error', function (error) { - console.error('[%s] backup: gzip stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - encrypt.on('error', function (error) { - console.error('[%s] backup: encrypt stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - progressStream.on('progress', function(progress) { - debug('[%s] backup: %s @ %s', backupId, Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); - }); - fileStream.on('error', function (error) { console.error('[%s] backup: out stream error.', backupId, error); callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); @@ -106,7 +70,7 @@ function backup(apiConfig, backupId, sourceDirectories, callback) { callback(null); }); - pack.pipe(gzip).pipe(encrypt).pipe(progressStream).pipe(fileStream); + targz.create(sourceDirectories, apiConfig.key || '', fileStream, callback); }); } @@ -116,6 +80,8 @@ function restore(apiConfig, backupId, destination, callback) { assert.strictEqual(typeof destination, 'string'); assert.strictEqual(typeof callback, 'function'); + callback = once(callback); + var isOldFormat = backupId.endsWith('.tar.gz'); var sourceFilePath = isOldFormat ? path.join(apiConfig.backupFolder || FALLBACK_BACKUP_FOLDER, backupId) : getBackupFilePath(apiConfig, backupId); @@ -123,59 +89,14 @@ function restore(apiConfig, backupId, destination, callback) { if (!fs.existsSync(sourceFilePath)) return callback(new BackupsError(BackupsError.NOT_FOUND, 'backup file does not exist')); - mkdirp(destination, function (error) { - if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + var fileStream = fs.createReadStream(sourceFilePath); - var fileStream = fs.createReadStream(sourceFilePath); - var decrypt; - - if (isOldFormat) { - let args = ['aes-256-cbc', '-d', '-pass', 'pass:' + apiConfig.key]; - decrypt = spawn('openssl', args, { stdio: [ 'pipe', 'pipe', process.stderr ]}); - } else { - decrypt = crypto.createDecipher('aes-256-cbc', apiConfig.key || ''); - } - - var gunzip = zlib.createGunzip({}); - var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds - var extract = tar.extract(destination); - - fileStream.on('error', function (error) { - console.error('[%s] restore: file stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - progressStream.on('progress', function(progress) { - debug('[%s] restore: %s @ %s', backupId, Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); - }); - - decrypt.on('error', function (error) { - console.error('[%s] restore: decrypt stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - gunzip.on('error', function (error) { - console.error('[%s] restore: gunzip stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('error', function (error) { - console.error('[%s] restore: extract stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('finish', function () { - debug('[%s] restore: %s done.', backupId); - callback(null); - }); - - if (isOldFormat) { - fileStream.pipe(progressStream).pipe(decrypt.stdin); - decrypt.stdout.pipe(gunzip).pipe(extract); - } else { - fileStream.pipe(progressStream).pipe(decrypt).pipe(gunzip).pipe(extract); - } + fileStream.on('error', function (error) { + console.error('restore: file stream error.', error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); + + targz.extract(fileStream, isOldFormat, destination, apiConfig.key || '', callback); } function copyBackup(apiConfig, oldBackupId, newBackupId, callback) { diff --git a/src/storage/s3.js b/src/storage/s3.js index f27b7891c..2da0c03b8 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -22,13 +22,10 @@ var assert = require('assert'), BackupsError = require('../backups.js').BackupsError, crypto = require('crypto'), debug = require('debug')('box:storage/s3'), - mkdirp = require('mkdirp'), once = require('once'), + PassThrough = require('stream').PassThrough, path = require('path'), - progress = require('progress-stream'), - spawn = require('child_process').spawn, - tar = require('tar-fs'), - zlib = require('zlib'); + targz = require('./targz.js'); var FILE_TYPE = '.tar.gz.enc'; @@ -86,45 +83,12 @@ function backup(apiConfig, backupId, sourceDirectories, callback) { getBackupCredentials(apiConfig, function (error, credentials) { if (error) return callback(error); - var pack = tar.pack('/', { - entries: sourceDirectories.map(function (m) { return m.source; }), - map: function(header) { - sourceDirectories.forEach(function (m) { - header.name = header.name.replace(new RegExp('^' + m.source + '(/?)'), m.destination + '$1'); - }); - return header; - } - }); - - var gzip = zlib.createGzip({}); - var encrypt = crypto.createCipher('aes-256-cbc', apiConfig.key || ''); - var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds - - pack.on('error', function (error) { - console.error('[%s] backup: tar stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - gzip.on('error', function (error) { - console.error('[%s] backup: gzip stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - encrypt.on('error', function (error) { - console.error('[%s] backup: encrypt stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - progressStream.on('progress', function(progress) { - debug('[%s] backup: %s @ %s', backupId, Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); - }); - - pack.pipe(gzip).pipe(encrypt).pipe(progressStream); + var passThrough = new PassThrough(); var params = { Bucket: apiConfig.bucket, Key: backupFilePath, - Body: progressStream + Body: passThrough }; var s3 = new AWS.S3(credentials); @@ -136,6 +100,8 @@ function backup(apiConfig, backupId, sourceDirectories, callback) { callback(null); }); + + targz.create(sourceDirectories, apiConfig.key, passThrough, callback); }); } @@ -145,6 +111,8 @@ function restore(apiConfig, backupId, destination, callback) { assert.strictEqual(typeof destination, 'string'); assert.strictEqual(typeof callback, 'function'); + callback = once(callback); + var isOldFormat = backupId.endsWith('.tar.gz'); var backupFilePath = isOldFormat ? path.join(apiConfig.prefix, backupId) : getBackupFilePath(apiConfig, backupId); @@ -153,69 +121,24 @@ function restore(apiConfig, backupId, destination, callback) { getBackupCredentials(apiConfig, function (error, credentials) { if (error) return callback(error); - mkdirp(destination, function (error) { - if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + var params = { + Bucket: apiConfig.bucket, + Key: backupFilePath + }; - var params = { - Bucket: apiConfig.bucket, - Key: backupFilePath - }; + var s3 = new AWS.S3(credentials); - var s3 = new AWS.S3(credentials); + var s3get = s3.getObject(params).createReadStream(); - var s3get = s3.getObject(params).createReadStream(); - var decrypt; + s3get.on('error', function (error) { + // TODO ENOENT for the mock, fix upstream! + if (error.code === 'NoSuchKey' || error.code === 'ENOENT') return callback(new BackupsError(BackupsError.NOT_FOUND)); - if (isOldFormat) { - let args = ['aes-256-cbc', '-d', '-pass', 'pass:' + apiConfig.key]; - decrypt = spawn('openssl', args, { stdio: [ 'pipe', 'pipe', process.stderr ]}); - } else { - decrypt = crypto.createDecipher('aes-256-cbc', apiConfig.key || ''); - } - - var gunzip = zlib.createGunzip({}); - var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds - var extract = tar.extract(destination); - - s3get.on('error', function (error) { - // TODO ENOENT for the mock, fix upstream! - if (error.code === 'NoSuchKey' || error.code === 'ENOENT') return callback(new BackupsError(BackupsError.NOT_FOUND)); - - console.error('[%s] restore: s3 stream error.', backupId, error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - progressStream.on('progress', function(progress) { - debug('[%s] restore: %s @ %s', backupId, Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); - }); - - decrypt.on('error', function (error) { - console.error('[%s] restore: decipher stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - gunzip.on('error', function (error) { - console.error('[%s] restore: gunzip stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('error', function (error) { - console.error('[%s] restore: extract stream error.', error); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('finish', function () { - debug('[%s] restore: done.', backupId); - callback(null); - }); - - if (isOldFormat) { - s3get.pipe(progressStream).pipe(decrypt.stdin); - decrypt.stdout.pipe(gunzip).pipe(extract); - } else { - s3get.pipe(progressStream).pipe(decrypt).pipe(gunzip).pipe(extract); - } + console.error('[%s] restore: s3 stream error.', backupId, error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); }); + + targz.extract(s3get, isOldFormat, destination, apiConfig.key || '', callback); }); } diff --git a/src/storage/targz.js b/src/storage/targz.js new file mode 100644 index 000000000..1df2c0bae --- /dev/null +++ b/src/storage/targz.js @@ -0,0 +1,112 @@ +'use strict'; + +exports = module.exports = { + create: create, + extract: extract +}; + +var assert = require('assert'), + BackupsError = require('../backups.js').BackupsError, + crypto = require('crypto'), + debug = require('debug')('box:storage/filesystem'), + mkdirp = require('mkdirp'), + progress = require('progress-stream'), + spawn = require('child_process').spawn, + tar = require('tar-fs'), + zlib = require('zlib'); + +function create(sourceDirectories, key, outStream, callback) { + assert(Array.isArray(sourceDirectories)); + assert.strictEqual(typeof key, 'string'); + assert.strictEqual(typeof callback, 'function'); + + var pack = tar.pack('/', { + entries: sourceDirectories.map(function (m) { return m.source; }), + map: function(header) { + sourceDirectories.forEach(function (m) { + header.name = header.name.replace(new RegExp('^' + m.source + '(/?)'), m.destination + '$1'); + }); + return header; + } + }); + + var gzip = zlib.createGzip({}); + var encrypt = crypto.createCipher('aes-256-cbc', key); + var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds + + pack.on('error', function (error) { + console.error('backup: tar stream error.', error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + }); + + gzip.on('error', function (error) { + console.error('backup: gzip stream error.', error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + }); + + encrypt.on('error', function (error) { + console.error('backup: encrypt stream error.', error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + }); + + progressStream.on('progress', function(progress) { + debug('backup: %s@%s', Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); + }); + + pack.pipe(gzip).pipe(encrypt).pipe(progressStream).pipe(outStream); +} + +function extract(inStream, isOldFormat, destination, key, callback) { + assert.strictEqual(typeof isOldFormat, 'boolean'); + assert.strictEqual(typeof destination, 'string'); + assert.strictEqual(typeof key, 'string'); + assert.strictEqual(typeof callback, 'function'); + + mkdirp(destination, function (error) { + if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + + var decrypt; + + if (isOldFormat) { + let args = ['aes-256-cbc', '-d', '-pass', 'pass:' + key]; + decrypt = spawn('openssl', args, { stdio: [ 'pipe', 'pipe', process.stderr ]}); + } else { + decrypt = crypto.createDecipher('aes-256-cbc', key); + } + + var gunzip = zlib.createGunzip({}); + var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds + var extract = tar.extract(destination); + + progressStream.on('progress', function(progress) { + debug('restore: %s@%s', Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps'); + }); + + decrypt.on('error', function (error) { + console.error('restore: decrypt stream error.', error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + }); + + gunzip.on('error', function (error) { + console.error('restore: gunzip stream error.', error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + }); + + extract.on('error', function (error) { + console.error('restore: extract stream error.', error); + callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + }); + + extract.on('finish', function () { + debug('restore: done.'); + callback(null); + }); + + if (isOldFormat) { + inStream.pipe(progressStream).pipe(decrypt.stdin); + decrypt.stdout.pipe(gunzip).pipe(extract); + } else { + inStream.pipe(progressStream).pipe(decrypt).pipe(gunzip).pipe(extract); + } + }); +}