Refactor backup strategy logic into backups.js

This commit is contained in:
Girish Ramakrishnan
2017-09-20 09:57:16 -07:00
parent cbddb79d15
commit 97da8717ca
8 changed files with 262 additions and 242 deletions

View File

@@ -16,12 +16,10 @@ var assert = require('assert'),
BackupsError = require('../backups.js').BackupsError,
config = require('../config.js'),
debug = require('debug')('box:storage/caas'),
once = require('once'),
PassThrough = require('stream').PassThrough,
path = require('path'),
S3BlockReadStream = require('s3-block-read-stream'),
superagent = require('superagent'),
targz = require('./targz.js');
superagent = require('superagent');
// internal only
function getBackupCredentials(apiConfig, callback) {
@@ -50,51 +48,41 @@ function getBackupCredentials(apiConfig, callback) {
}
// storage api
function upload(apiConfig, backupFilePath, sourceDir, callback) {
function upload(apiConfig, backupFilePath, sourceStream, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof sourceDir, 'string');
assert.strictEqual(typeof sourceStream, 'object');
assert.strictEqual(typeof callback, 'function');
callback = once(callback);
debug('backup: %s -> %s', sourceDir, backupFilePath);
debug('upload: %s', backupFilePath);
getBackupCredentials(apiConfig, function (error, credentials) {
if (error) return callback(error);
var passThrough = new PassThrough();
var params = {
Bucket: apiConfig.bucket,
Key: backupFilePath,
Body: passThrough
Body: sourceStream
};
var s3 = new AWS.S3(credentials);
// s3.upload automatically does a multi-part upload. we set queueSize to 1 to reduce memory usage
s3.upload(params, { partSize: 10 * 1024 * 1024, queueSize: 1 }, function (error) {
if (error) {
debug('[%s] backup: s3 upload error.', backupFilePath, error);
debug('[%s] upload: s3 upload error.', backupFilePath, error);
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error));
}
callback(null);
});
targz.create(sourceDir, apiConfig.key || null, passThrough, callback);
});
}
function download(apiConfig, backupFilePath, destination, callback) {
function download(apiConfig, backupFilePath, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof destination, 'string');
assert.strictEqual(typeof callback, 'function');
callback = once(callback);
debug('restore: %s -> %s', backupFilePath, destination);
debug('download: %s', backupFilePath);
getBackupCredentials(apiConfig, function (error, credentials) {
if (error) return callback(error);
@@ -105,17 +93,23 @@ function download(apiConfig, backupFilePath, destination, callback) {
};
var s3 = new AWS.S3(credentials);
var ps = new PassThrough();
var multipartDownload = new S3BlockReadStream(s3, params, { blockSize: 64 * 1024 * 1024, logCallback: debug });
multipartDownload.on('error', function (error) {
// TODO ENOENT for the mock, fix upstream!
if (error.code === 'NoSuchKey' || error.code === 'ENOENT') return callback(new BackupsError(BackupsError.NOT_FOUND));
debug('[%s] restore: s3 stream error.', backupFilePath, error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
if (error.code === 'NoSuchKey' || error.code === 'ENOENT') {
ps.emit('error', new BackupsError(BackupsError.NOT_FOUND));
} else {
debug('[%s] download: s3 stream error.', backupFilePath, error);
ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
}
});
targz.extract(multipartDownload, destination, apiConfig.key || null, callback);
multipartDownload.pipe(ps);
callback(null, ps);
});
}
@@ -191,7 +185,7 @@ function backupDone(backupId, appBackupIds, callback) {
assert.strictEqual(typeof callback, 'function');
// Caas expects filenames instead of backupIds, this means no prefix but a file type extension
var FILE_TYPE = '.tar.gz.enc';
var FILE_TYPE = '.tar.gz.enc';
var boxBackupFilename = backupId + FILE_TYPE;
var appBackupFilenames = appBackupIds.map(function (id) { return id + FILE_TYPE; });

View File

@@ -19,24 +19,21 @@ var assert = require('assert'),
debug = require('debug')('box:storage/filesystem'),
fs = require('fs'),
mkdirp = require('mkdirp'),
once = require('once'),
PassThrough = require('stream').PassThrough,
path = require('path'),
safe = require('safetydance'),
shell = require('../shell.js'),
targz = require('./targz.js');
shell = require('../shell.js');
var BACKUP_USER = config.TEST ? process.env.USER : 'yellowtent';
// storage api
function upload(apiConfig, backupFilePath, sourceDir, callback) {
function upload(apiConfig, backupFilePath, sourceStream, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof sourceDir, 'string');
assert.strictEqual(typeof sourceStream, 'object');
assert.strictEqual(typeof callback, 'function');
callback = once(callback);
debug('backup: %s -> %s', sourceDir, backupFilePath);
debug('upload: %s', backupFilePath);
mkdirp(path.dirname(backupFilePath), function (error) {
if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
@@ -44,44 +41,38 @@ function upload(apiConfig, backupFilePath, sourceDir, callback) {
var fileStream = fs.createWriteStream(backupFilePath);
fileStream.on('error', function (error) {
debug('[%s] backup: out stream error.', backupFilePath, error);
debug('[%s] upload: out stream error.', backupFilePath, error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
fileStream.on('close', function () {
debug('[%s] backup: changing ownership.', backupFilePath);
debug('[%s] upload: changing ownership.', backupFilePath);
if (!safe.child_process.execSync('chown -R ' + BACKUP_USER + ':' + BACKUP_USER + ' ' + path.dirname(backupFilePath))) return callback(new BackupsError(BackupsError.INTERNAL_ERROR, safe.error.message));
debug('[%s] backup: done.', backupFilePath);
debug('[%s] upload: done.', backupFilePath);
callback(null);
});
targz.create(sourceDir, apiConfig.key || null, fileStream, callback);
sourceStream.pipe(fileStream);
});
}
function download(apiConfig, sourceFilePath, destination, callback) {
function download(apiConfig, sourceFilePath, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof sourceFilePath, 'string');
assert.strictEqual(typeof destination, 'string');
assert.strictEqual(typeof callback, 'function');
callback = once(callback);
debug('restore: %s -> %s', sourceFilePath, destination);
if (!fs.existsSync(sourceFilePath)) return callback(new BackupsError(BackupsError.NOT_FOUND, 'backup file does not exist'));
debug('download: %s', sourceFilePath);
var ps = new PassThrough();
var fileStream = fs.createReadStream(sourceFilePath);
fileStream.on('error', function (error) {
debug('restore: file stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
ps.emit('error', new BackupsError(BackupsError.NOT_FOUND, error.message));
});
targz.extract(fileStream, destination, apiConfig.key || null, callback);
fileStream.pipe(ps);
callback(null, ps);
}
function copy(apiConfig, oldFilePath, newFilePath, callback) {
@@ -90,8 +81,6 @@ function copy(apiConfig, oldFilePath, newFilePath, callback) {
assert.strictEqual(typeof newFilePath, 'string');
assert.strictEqual(typeof callback, 'function');
callback = once(callback);
debug('copy: %s -> %s', oldFilePath, newFilePath);
mkdirp(path.dirname(newFilePath), function (error) {

View File

@@ -20,10 +20,10 @@ exports = module.exports = {
var assert = require('assert');
function upload(apiConfig, backupFilePath, sourceDir, callback) {
function upload(apiConfig, backupFilePath, sourceStream, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof sourceDir, 'string');
assert.strictEqual(typeof sourceStream, 'object');
assert.strictEqual(typeof callback, 'function');
// Result: none
@@ -31,14 +31,12 @@ function upload(apiConfig, backupFilePath, sourceDir, callback) {
callback(new Error('not implemented'));
}
function download(apiConfig, sourceFilePath, destination, callback) {
function download(apiConfig, backupFilePath, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof sourceFilePath, 'string');
assert.strictEqual(typeof destination, 'string');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof callback, 'function');
// Result: none
// Result: download stream
callback(new Error('not implemented'));
}

View File

@@ -15,26 +15,25 @@ exports = module.exports = {
var assert = require('assert'),
debug = require('debug')('box:storage/noop');
function upload(apiConfig, backupFilePath, sourceDir, callback) {
function upload(apiConfig, backupFilePath, sourceStream, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof sourceDir, 'string');
assert.strictEqual(typeof sourceStream, 'object');
assert.strictEqual(typeof callback, 'function');
debug('backup: %s %s', backupFilePath, sourceDir);
debug('upload: %s', backupFilePath);
callback();
}
function download(apiConfig, sourceFilePath, destination, callback) {
function download(apiConfig, backupFilePath, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof sourceFilePath, 'string');
assert.strictEqual(typeof destination, 'string');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof callback, 'function');
debug('restore: %s %s', sourceFilePath, destination);
debug('download: %s', backupFilePath);
callback(new Error('Cannot restore from noop backend'));
callback(new Error('Cannot download from noop backend'));
}
function copy(apiConfig, oldFilePath, newFilePath, callback) {

View File

@@ -20,11 +20,9 @@ var assert = require('assert'),
AWS = require('aws-sdk'),
BackupsError = require('../backups.js').BackupsError,
debug = require('debug')('box:storage/s3'),
once = require('once'),
PassThrough = require('stream').PassThrough,
path = require('path'),
S3BlockReadStream = require('s3-block-read-stream'),
targz = require('./targz.js');
S3BlockReadStream = require('s3-block-read-stream');
// test only
var originalAWS;
@@ -58,51 +56,42 @@ function getBackupCredentials(apiConfig, callback) {
}
// storage api
function upload(apiConfig, backupFilePath, sourceDir, callback) {
function upload(apiConfig, backupFilePath, sourceStream, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof sourceDir, 'string');
assert.strictEqual(typeof sourceStream, 'object');
assert.strictEqual(typeof callback, 'function');
callback = once(callback);
debug('backup: %s -> %s', sourceDir, backupFilePath);
debug('upload: %s', backupFilePath);
getBackupCredentials(apiConfig, function (error, credentials) {
if (error) return callback(error);
var passThrough = new PassThrough();
var params = {
Bucket: apiConfig.bucket,
Key: backupFilePath,
Body: passThrough
Body: sourceStream
};
var s3 = new AWS.S3(credentials);
// s3.upload automatically does a multi-part upload. we set queueSize to 1 to reduce memory usage
s3.upload(params, { partSize: 10 * 1024 * 1024, queueSize: 1 }, function (error) {
if (error) {
debug('[%s] backup: s3 upload error.', backupFilePath, error);
debug('[%s] upload: s3 upload error.', backupFilePath, error);
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
}
callback(null);
});
targz.create(sourceDir, apiConfig.key || null, passThrough, callback);
});
}
function download(apiConfig, backupFilePath, destination, callback) {
function download(apiConfig, backupFilePath, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof destination, 'string');
assert.strictEqual(typeof callback, 'function');
callback = once(callback);
debug('restore: %s -> %s', backupFilePath, destination);
debug('download: %s', backupFilePath);
getBackupCredentials(apiConfig, function (error, credentials) {
if (error) return callback(error);
@@ -114,17 +103,22 @@ function download(apiConfig, backupFilePath, destination, callback) {
var s3 = new AWS.S3(credentials);
var ps = new PassThrough();
var multipartDownload = new S3BlockReadStream(s3, params, { blockSize: 64 * 1024 * 1024, logCallback: debug });
multipartDownload.on('error', function (error) {
// TODO ENOENT for the mock, fix upstream!
if (error.code === 'NoSuchKey' || error.code === 'ENOENT') return callback(new BackupsError(BackupsError.NOT_FOUND));
debug('[%s] restore: s3 stream error.', backupFilePath, error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
if (error.code === 'NoSuchKey' || error.code === 'ENOENT') {
ps.emit('error', new BackupsError(BackupsError.NOT_FOUND));
} else {
debug('[%s] download: s3 stream error.', backupFilePath, error);
ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
}
});
targz.extract(multipartDownload, destination, apiConfig.key || null, callback);
multipartDownload.pipe(ps);
callback(null, ps);
});
}

View File

@@ -1,104 +0,0 @@
'use strict';
exports = module.exports = {
create: create,
extract: extract
};
var assert = require('assert'),
BackupsError = require('../backups.js').BackupsError,
crypto = require('crypto'),
debug = require('debug')('box:storage/targz'),
mkdirp = require('mkdirp'),
progress = require('progress-stream'),
tar = require('tar-fs'),
zlib = require('zlib');
// curiously, this function never calls back on success :-)
function create(sourceDir, key, outStream, callback) {
assert.strictEqual(typeof sourceDir, 'string');
assert(key === null || typeof key === 'string');
assert.strictEqual(typeof callback, 'function');
var pack = tar.pack('/', {
dereference: false, // pack the symlink and not what it points to
entries: [ sourceDir ],
map: function(header) {
header.name = header.name.replace(new RegExp('^' + sourceDir + '(/?)'), '.$1'); // make paths relative
return header;
},
strict: false // do not error for unknown types (skip fifo, char/block devices)
});
var gzip = zlib.createGzip({});
var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds
pack.on('error', function (error) {
debug('backup: tar stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
gzip.on('error', function (error) {
debug('backup: gzip stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
progressStream.on('progress', function(progress) {
debug('backup: %s@%s', Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps');
});
if (key !== null) {
var encrypt = crypto.createCipher('aes-256-cbc', key);
encrypt.on('error', function (error) {
debug('backup: encrypt stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
pack.pipe(gzip).pipe(encrypt).pipe(progressStream).pipe(outStream);
} else {
pack.pipe(gzip).pipe(progressStream).pipe(outStream);
}
}
function extract(inStream, destination, key, callback) {
assert.strictEqual(typeof destination, 'string');
assert(key === null || typeof key === 'string');
assert.strictEqual(typeof callback, 'function');
mkdirp(destination, function (error) {
if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
var gunzip = zlib.createGunzip({});
var progressStream = progress({ time: 10000 }); // display a progress every 10 seconds
var extract = tar.extract(destination);
progressStream.on('progress', function(progress) {
debug('restore: %s@%s', Math.round(progress.transferred/1024/1024) + 'M', Math.round(progress.speed/1024/1024) + 'Mbps');
});
gunzip.on('error', function (error) {
debug('restore: gunzip stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
extract.on('error', function (error) {
debug('restore: extract stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
extract.on('finish', function () {
debug('restore: done.');
callback(null);
});
if (key !== null) {
var decrypt = crypto.createDecipher('aes-256-cbc', key);
decrypt.on('error', function (error) {
debug('restore: decrypt stream error.', error);
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
});
inStream.pipe(progressStream).pipe(decrypt).pipe(gunzip).pipe(extract);
} else {
inStream.pipe(progressStream).pipe(gunzip).pipe(extract);
}
});
}