Refactoring gcs to match the new storage interface
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
'use strict';
|
||||
|
||||
exports = module.exports = {
|
||||
backup: backup,
|
||||
restore: restore,
|
||||
copyBackup: copyBackup,
|
||||
removeBackups: removeBackups,
|
||||
upload: upload,
|
||||
download: download,
|
||||
downloadDir: downloadDir,
|
||||
copy: copy,
|
||||
|
||||
remove: remove,
|
||||
removeDir: removeDir,
|
||||
|
||||
backupDone: backupDone,
|
||||
|
||||
@@ -18,12 +21,15 @@ exports = module.exports = {
|
||||
var assert = require('assert'),
|
||||
GCS = require('@google-cloud/storage'),
|
||||
BackupsError = require('../backups.js').BackupsError,
|
||||
chunk = require('lodash.chunk'),
|
||||
debug = require('debug')('box:storage/gcs'),
|
||||
once = require('once'),
|
||||
PassThrough = require('stream').PassThrough,
|
||||
EventEmitter = require('events'),
|
||||
mkdirp = require('mkdirp'),
|
||||
fs = require('fs'),
|
||||
path = require('path'),
|
||||
async = require('async'),
|
||||
targz = require('./targz.js');
|
||||
async = require('async');
|
||||
|
||||
// test only
|
||||
var originalGCS;
|
||||
@@ -54,111 +60,224 @@ function getBackupCredentials(backupConfig) {
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
function getBucket(apiConfig) {
|
||||
var credentials = getBackupCredentials(apiConfig);
|
||||
return GCS(credentials).bucket(apiConfig.bucket);
|
||||
}
|
||||
|
||||
function getBackupFilePath(apiConfig, backupId) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupId, 'string');
|
||||
|
||||
const FILE_TYPE = apiConfig.key ? '.tar.gz.enc' : '.tar.gz';
|
||||
|
||||
return path.join(apiConfig.prefix, backupId.endsWith(FILE_TYPE) ? backupId : backupId+FILE_TYPE);
|
||||
}
|
||||
|
||||
// storage api
|
||||
function backup(apiConfig, backupId, sourceDir, callback) {
|
||||
|
||||
|
||||
function upload(apiConfig, backupFilePath, sourceStream, callback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupId, 'string');
|
||||
assert.strictEqual(typeof sourceDir, 'string');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof sourceStream, 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
callback = once(callback);
|
||||
function done(error) {
|
||||
if (error) {
|
||||
debug('[%s] upload: gcp upload error.', backupFilePath, error);
|
||||
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, 'Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}'));
|
||||
}
|
||||
|
||||
var backupFilePath = getBackupFilePath(apiConfig, backupId);
|
||||
callback(null);
|
||||
}
|
||||
|
||||
debug('[%s] backup: %s -> %s', backupId, sourceDir, backupFilePath);
|
||||
|
||||
var bucket = getBucket(apiConfig);
|
||||
var uploadingFile = bucket.file(backupFilePath);
|
||||
|
||||
var uploadStream = uploadingFile.createWriteStream({resumable: false})
|
||||
.on('finish', callback.bind(null, null))
|
||||
.on('error', function(e){
|
||||
if (e) callback(new BackupsError(BackupsError.EXTERNAL_ERROR, e.message));
|
||||
})
|
||||
;
|
||||
targz.create([{ source: sourceDir, destination: '.' }], apiConfig.key || null, uploadStream, callback);
|
||||
return uploadStream;
|
||||
return sourceStream.pipe(
|
||||
getBucket(apiConfig)
|
||||
.file(backupFilePath)
|
||||
.createWriteStream({resumable: false})
|
||||
.on('finish', done)
|
||||
.on('error', function(e){
|
||||
if (e) done(e);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
function restore(apiConfig, backupId, destination, callback) {
|
||||
function download(apiConfig, backupFilePath, callback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupId, 'string');
|
||||
assert.strictEqual(typeof destination, 'string');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
callback = once(callback);
|
||||
|
||||
var backupFilePath = getBackupFilePath(apiConfig, backupId);
|
||||
|
||||
debug('[%s] restore: %s -> %s', backupId, backupFilePath, destination);
|
||||
|
||||
var file = getBucket(apiConfig).file(backupFilePath);
|
||||
|
||||
var ps = new PassThrough();
|
||||
var readStream = file.createReadStream()
|
||||
.on('error', function(e){
|
||||
if (e && e.code == 404) return callback(new BackupsError(BackupsError.NOT_FOUND, e));
|
||||
callback(new BackupsError(BackupsError.EXTERNAL_ERROR, e));
|
||||
.on('error', function(error){
|
||||
if (error && error.code == 404){
|
||||
ps.emit('error', new BackupsError(BackupsError.NOT_FOUND));
|
||||
} else {
|
||||
debug('[%s] download: gcp stream error.', backupFilePath, error);
|
||||
ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error));
|
||||
}
|
||||
})
|
||||
;
|
||||
targz.extract(readStream, destination, apiConfig.key || null, callback);
|
||||
readStream.pipe(ps);
|
||||
|
||||
callback(null, ps);
|
||||
}
|
||||
|
||||
function copyBackup(apiConfig, oldBackupId, newBackupId, callback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldBackupId, 'string');
|
||||
assert.strictEqual(typeof newBackupId, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
function listDir(apiConfig, backupFilePath, batchSize, iteratorCallback, callback){
|
||||
var bucket = getBucket(apiConfig);
|
||||
|
||||
bucket
|
||||
.file(getBackupFilePath(apiConfig, oldBackupId))
|
||||
.copy(getBackupFilePath(apiConfig, newBackupId), function(error, newFile, apiResponse){
|
||||
if (error && error.code == 404) return callback(new BackupsError(BackupsError.NOT_FOUND, 'Old backup not found'));
|
||||
var query = {prefix: backupFilePath, maxResults: batchSize, autoPaginate: false};
|
||||
|
||||
async.forever(function listAndDownload(foreverCallback) {
|
||||
bucket.getFiles(query, function (error, files, nextQuery) {
|
||||
if (error) {
|
||||
debug('copyBackup: gcs copy error.', e);
|
||||
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
debug('remove: Failed to list %s. Not fatal.', error);
|
||||
return foreverCallback(error);
|
||||
}
|
||||
|
||||
var arr = batchSize === 1 ? files : chunk(files, batchSize);
|
||||
if (arr.length === 0) return foreverCallback(new Error('Done'));
|
||||
|
||||
debug('emitting '+arr.length+' files found');
|
||||
iteratorCallback(arr, function (error) {
|
||||
if (error) return foreverCallback(error);
|
||||
|
||||
if (arr.length < batchSize) return foreverCallback(new Error('Done'));
|
||||
|
||||
query = nextQuery;
|
||||
foreverCallback();
|
||||
});
|
||||
});
|
||||
}, function (error) {
|
||||
if (error.message === 'Done') return callback(null);
|
||||
|
||||
callback(error);
|
||||
});
|
||||
}
|
||||
|
||||
function downloadDir(apiConfig, backupFilePath, destDir) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof destDir, 'string');
|
||||
|
||||
var events = new EventEmitter();
|
||||
var total = 0;
|
||||
|
||||
function downloadFile(file, iteratorCallback) {
|
||||
var relativePath = path.relative(backupFilePath, file.name);
|
||||
|
||||
events.emit('progress', 'Downloading ${relativePath}');
|
||||
|
||||
mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) {
|
||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
|
||||
download(apiConfig, file.name, function (error, sourceStream) {
|
||||
if (error) return iteratorCallback(error);
|
||||
|
||||
var destStream = fs.createWriteStream(path.join(destDir, relativePath));
|
||||
|
||||
destStream.on('open', function () {
|
||||
sourceStream.pipe(destStream);
|
||||
});
|
||||
|
||||
destStream.on('error', function (error) {
|
||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
|
||||
destStream.on('finish', iteratorCallback);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const concurrency = 10, batchSize = 1;
|
||||
|
||||
listDir(apiConfig, backupFilePath, batchSize, function (objects, done) {
|
||||
total += objects.length;
|
||||
async.eachLimit(objects, concurrency, downloadFile, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', 'Downloaded ${total} files');
|
||||
|
||||
events.emit('done', error);
|
||||
});
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
assert.strictEqual(typeof newFilePath, 'string');
|
||||
|
||||
var events = new EventEmitter(), retryCount = 0;
|
||||
|
||||
function copyFile(file, iteratorCallback){
|
||||
|
||||
var relativePath = path.relative(oldFilePath, file.name);
|
||||
|
||||
file.copy(path.join(newFilePath, relativePath), function(error, newFile, apiResponse){
|
||||
if (error && error.code == 404) return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, 'Old backup not found'));
|
||||
if (error) {
|
||||
debug('copyBackup: gcs copy error', error);
|
||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
}
|
||||
iteratorCallback(null);
|
||||
});
|
||||
|
||||
events.emit('progress', 'Copying (multipart) ${relativePath}');
|
||||
}
|
||||
|
||||
const batchSize = 1;
|
||||
var total = 0, concurrency = 4;
|
||||
|
||||
listDir(apiConfig, oldFilePath, batchSize, function (files, done) {
|
||||
total += files.length;
|
||||
|
||||
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
|
||||
events.emit('progress', '${retryCount} errors. concurrency set to ${concurrency}');
|
||||
retryCount = 0;
|
||||
|
||||
async.eachLimit(files, concurrency, copyFile, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', 'Copied ${total} files');
|
||||
|
||||
events.emit('done', error);
|
||||
});
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
function remove(apiConfig, filename, callback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
getBucket(apiConfig)
|
||||
.file(filename)
|
||||
.delete(function(e){
|
||||
if (e) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filename, e.message);
|
||||
else debug('removeBackups: Deleted: %s', filename);
|
||||
callback(null);
|
||||
});
|
||||
}
|
||||
|
||||
function removeBackups(apiConfig, backupIds, callback) {
|
||||
function removeDir(apiConfig, pathPrefix) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert(Array.isArray(backupIds));
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
|
||||
var bucket = getBucket(apiConfig);
|
||||
var events = new EventEmitter(), retryCount = 0;
|
||||
|
||||
var removeQueue = [];
|
||||
backupIds.forEach(function (backupId) {
|
||||
removeQueue.push(function(cb){
|
||||
var filePath = getBackupFilePath(apiConfig, backupId);
|
||||
bucket.file(filePath).delete(function(e){
|
||||
if (e) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filePath, e.message);
|
||||
else debug('removeBackups: Deleted: %s', filePath);
|
||||
cb(typeof e == 'undefined');
|
||||
});
|
||||
});
|
||||
const batchSize = 1;
|
||||
var total = 0, concurrency = 4;
|
||||
|
||||
listDir(apiConfig, oldFilePath, batchSize, function (files, done) {
|
||||
total += files.length;
|
||||
|
||||
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
|
||||
events.emit('progress', '${retryCount} errors. concurrency set to ${concurrency}');
|
||||
retryCount = 0;
|
||||
|
||||
async.eachLimit(files, concurrency, remove.bind(null, apiConfig), done);
|
||||
}, function (error) {
|
||||
events.emit('progress', 'Deleted ${total} files');
|
||||
|
||||
events.emit('done', error);
|
||||
});
|
||||
|
||||
async.series(removeQueue, callback);
|
||||
return events;
|
||||
}
|
||||
|
||||
function testConfig(apiConfig, callback) {
|
||||
|
||||
Reference in New Issue
Block a user