@@ -3,9 +3,10 @@
|
||||
exports = module.exports = {
|
||||
upload: upload,
|
||||
download: download,
|
||||
downloadDir: downloadDir,
|
||||
copy: copy,
|
||||
|
||||
listDir: listDir,
|
||||
|
||||
remove: remove,
|
||||
removeDir: removeDir,
|
||||
|
||||
@@ -21,9 +22,7 @@ var assert = require('assert'),
|
||||
BackupsError = require('../backups.js').BackupsError,
|
||||
debug = require('debug')('box:storage/gcs'),
|
||||
EventEmitter = require('events'),
|
||||
fs = require('fs'),
|
||||
GCS = require('@google-cloud/storage'),
|
||||
mkdirp = require('mkdirp'),
|
||||
PassThrough = require('stream').PassThrough,
|
||||
path = require('path');
|
||||
|
||||
@@ -115,24 +114,17 @@ function listDir(apiConfig, backupFilePath, batchSize, iteratorCallback, callbac
|
||||
|
||||
async.forever(function listAndDownload(foreverCallback) {
|
||||
bucket.getFiles(query, function (error, files, nextQuery) {
|
||||
if (error) {
|
||||
debug('remove: Failed to list %s. Not fatal.', error);
|
||||
return foreverCallback(error);
|
||||
}
|
||||
if (error) return foreverCallback(error);
|
||||
|
||||
if (files.length === 0) return foreverCallback(new Error('Done'));
|
||||
|
||||
debug('emitting ' + files.length + ' files found: ' + files.map(function(f) { return f.name; }).join(','));
|
||||
iteratorCallback(files, function (error) {
|
||||
if (error) {
|
||||
debug(`listDir page handled unsuccessfully ${error}`);
|
||||
return foreverCallback(error);
|
||||
}
|
||||
|
||||
const entries = files.map(function (f) { return { fullPath: f.name }; });
|
||||
iteratorCallback(entries, function (error) {
|
||||
if (error) return foreverCallback(error);
|
||||
if (!nextQuery) return foreverCallback(new Error('Done'));
|
||||
|
||||
query = nextQuery;
|
||||
debug(`listDir next page token ${query.pageToken}`);
|
||||
|
||||
foreverCallback();
|
||||
});
|
||||
});
|
||||
@@ -143,53 +135,6 @@ function listDir(apiConfig, backupFilePath, batchSize, iteratorCallback, callbac
|
||||
});
|
||||
}
|
||||
|
||||
function downloadDir(apiConfig, backupFilePath, destDir) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof destDir, 'string');
|
||||
|
||||
var events = new EventEmitter();
|
||||
var total = 0;
|
||||
|
||||
function downloadFile(file, iteratorCallback) {
|
||||
var relativePath = path.relative(backupFilePath, file.name);
|
||||
|
||||
events.emit('progress', `Downloading ${relativePath}`);
|
||||
|
||||
mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) {
|
||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
|
||||
download(apiConfig, file.name, function (error, sourceStream) {
|
||||
if (error) return iteratorCallback(error);
|
||||
|
||||
var destStream = fs.createWriteStream(path.join(destDir, relativePath));
|
||||
|
||||
destStream.on('open', function () {
|
||||
sourceStream.pipe(destStream);
|
||||
});
|
||||
|
||||
destStream.on('error', function (error) {
|
||||
iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
|
||||
destStream.on('finish', iteratorCallback);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const concurrency = 10, batchSize = -1;
|
||||
|
||||
listDir(apiConfig, backupFilePath, batchSize, function (files, done) {
|
||||
total += files.length;
|
||||
async.eachLimit(files, concurrency, downloadFile, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', `Downloaded ${total} files`);
|
||||
events.emit('done', error);
|
||||
});
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
@@ -197,16 +142,15 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
|
||||
var events = new EventEmitter(), retryCount = 0;
|
||||
|
||||
function copyFile(file, iteratorCallback) {
|
||||
function copyFile(entry, iteratorCallback) {
|
||||
var relativePath = path.relative(oldFilePath, entry.fullPath);
|
||||
|
||||
var relativePath = path.relative(oldFilePath, file.name);
|
||||
getBucket(apiConfig).file(entry.fullPath).copy(path.join(newFilePath, relativePath), function(error) {
|
||||
if (error) debug('copyBackup: gcs copy error', error);
|
||||
|
||||
file.copy(path.join(newFilePath, relativePath), function(error) {
|
||||
if (error && error.code === 404) return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, 'Old backup not found'));
|
||||
if (error) {
|
||||
debug('copyBackup: gcs copy error', error);
|
||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
}
|
||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
|
||||
iteratorCallback(null);
|
||||
});
|
||||
|
||||
@@ -216,17 +160,16 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
const batchSize = -1;
|
||||
var total = 0, concurrency = 4;
|
||||
|
||||
listDir(apiConfig, oldFilePath, batchSize, function (files, done) {
|
||||
total += files.length;
|
||||
listDir(apiConfig, oldFilePath, batchSize, function (entries, done) {
|
||||
total += entries.length;
|
||||
|
||||
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
|
||||
events.emit('progress', `${retryCount} errors. concurrency set to ${concurrency}`);
|
||||
retryCount = 0;
|
||||
|
||||
async.eachLimit(files, concurrency, copyFile, done);
|
||||
async.eachLimit(entries, concurrency, copyFile, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', `Copied ${total} files`);
|
||||
|
||||
events.emit('done', error);
|
||||
});
|
||||
|
||||
@@ -240,9 +183,9 @@ function remove(apiConfig, filename, callback) {
|
||||
|
||||
getBucket(apiConfig)
|
||||
.file(filename)
|
||||
.delete(function(e) {
|
||||
if (e) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filename, e.message);
|
||||
else debug('removeBackups: Deleted: %s', filename);
|
||||
.delete(function (error) {
|
||||
if (error) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filename, error.message);
|
||||
|
||||
callback(null);
|
||||
});
|
||||
}
|
||||
@@ -256,14 +199,16 @@ function removeDir(apiConfig, pathPrefix) {
|
||||
const batchSize = 1;
|
||||
var total = 0, concurrency = 4;
|
||||
|
||||
listDir(apiConfig, pathPrefix, batchSize, function (files, done) {
|
||||
total += files.length;
|
||||
listDir(apiConfig, pathPrefix, batchSize, function (entries, done) {
|
||||
total += entries.length;
|
||||
|
||||
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
|
||||
events.emit('progress', `${retryCount} errors. concurrency set to ${concurrency}`);
|
||||
retryCount = 0;
|
||||
|
||||
async.eachLimit(files, concurrency, remove.bind(null, apiConfig), done);
|
||||
async.eachLimit(entries, concurrency, function (entry, iteratorCallback) {
|
||||
remove(apiConfig, entry.fullPath, iteratorCallback);
|
||||
}, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', `Deleted ${total} files`);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user