Fixing listDir to support batchSize = -1 for non-chunked listings. Also strings extrapolation fix (ES6)

This commit is contained in:
Aleksandr Bogdanov
2017-10-31 11:40:00 +01:00
parent 51ca1c7384
commit 9b1f8febf1
2 changed files with 51 additions and 40 deletions

View File

@@ -66,18 +66,18 @@ function getBucket(apiConfig) {
}
// storage api
function upload(apiConfig, backupFilePath, sourceStream, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof sourceStream, 'object');
assert.strictEqual(typeof callback, 'function');
debug(`Uploading to ${backupFilePath}`);
function done(error) {
if (error) {
debug('[%s] upload: gcp upload error.', backupFilePath, error);
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, 'Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}'));
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`));
}
callback(null);
@@ -99,6 +99,8 @@ function download(apiConfig, backupFilePath, callback) {
assert.strictEqual(typeof backupFilePath, 'string');
assert.strictEqual(typeof callback, 'function');
debug(`Download ${backupFilePath} starting`);
var file = getBucket(apiConfig).file(backupFilePath);
var ps = new PassThrough();
@@ -120,7 +122,10 @@ function download(apiConfig, backupFilePath, callback) {
function listDir(apiConfig, backupFilePath, batchSize, iteratorCallback, callback){
var bucket = getBucket(apiConfig);
var query = {prefix: backupFilePath, maxResults: batchSize, autoPaginate: false};
var query = {prefix: backupFilePath, autoPaginate: batchSize === -1};
if (batchSize > 0) {
query.maxResults = batchSize;
}
async.forever(function listAndDownload(foreverCallback) {
bucket.getFiles(query, function (error, files, nextQuery) {
@@ -129,16 +134,19 @@ function listDir(apiConfig, backupFilePath, batchSize, iteratorCallback, callbac
return foreverCallback(error);
}
var arr = batchSize === 1 ? files : chunk(files, batchSize);
if (arr.length === 0) return foreverCallback(new Error('Done'));
if (files.length === 0) return foreverCallback(new Error('Done'));
debug('emitting '+arr.length+' files found');
iteratorCallback(arr, function (error) {
if (error) return foreverCallback(error);
debug('emitting '+files.length+' files found: ' + files.map(function(f){return f.name}).join(','));
iteratorCallback(files, function (error) {
if (error) {
debug(`listDir page handled unsuccessfully ${error}`);
return foreverCallback(error);
}
if (arr.length < batchSize) return foreverCallback(new Error('Done'));
if (!nextQuery) return foreverCallback(new Error('Done'));
query = nextQuery;
debug(`listDir next page token ${query.pageToken}`);
foreverCallback();
});
});
@@ -160,7 +168,7 @@ function downloadDir(apiConfig, backupFilePath, destDir) {
function downloadFile(file, iteratorCallback) {
var relativePath = path.relative(backupFilePath, file.name);
events.emit('progress', 'Downloading ${relativePath}');
events.emit('progress', `Downloading ${relativePath}`);
mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) {
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
@@ -183,14 +191,13 @@ function downloadDir(apiConfig, backupFilePath, destDir) {
});
}
const concurrency = 10, batchSize = 1;
const concurrency = 10, batchSize = -1;
listDir(apiConfig, backupFilePath, batchSize, function (objects, done) {
total += objects.length;
async.eachLimit(objects, concurrency, downloadFile, done);
listDir(apiConfig, backupFilePath, batchSize, function (files, done) {
total += files.length;
async.eachLimit(files, concurrency, downloadFile, done);
}, function (error) {
events.emit('progress', 'Downloaded ${total} files');
events.emit('progress', `Downloaded ${total} files`);
events.emit('done', error);
});
@@ -217,22 +224,22 @@ function copy(apiConfig, oldFilePath, newFilePath) {
iteratorCallback(null);
});
events.emit('progress', 'Copying (multipart) ${relativePath}');
events.emit('progress', `Copying (multipart) ${relativePath}`);
}
const batchSize = 1;
const batchSize = -1;
var total = 0, concurrency = 4;
listDir(apiConfig, oldFilePath, batchSize, function (files, done) {
total += files.length;
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
events.emit('progress', '${retryCount} errors. concurrency set to ${concurrency}');
events.emit('progress', `${retryCount} errors. concurrency set to ${concurrency}`);
retryCount = 0;
async.eachLimit(files, concurrency, copyFile, done);
}, function (error) {
events.emit('progress', 'Copied ${total} files');
events.emit('progress', `Copied ${total} files`);
events.emit('done', error);
});
@@ -267,12 +274,12 @@ function removeDir(apiConfig, pathPrefix) {
total += files.length;
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
events.emit('progress', '${retryCount} errors. concurrency set to ${concurrency}');
events.emit('progress', `${retryCount} errors. concurrency set to ${concurrency}`);
retryCount = 0;
async.eachLimit(files, concurrency, remove.bind(null, apiConfig), done);
}, function (error) {
events.emit('progress', 'Deleted ${total} files');
events.emit('progress', `Deleted ${total} files`);
events.emit('done', error);
});
@@ -296,31 +303,38 @@ function testConfig(apiConfig, callback) {
// attempt to upload and delete a file with new credentials
var bucket = getBucket(apiConfig);
var testFile = bucket.file(path.join(apiConfig.prefix, 'cloudron-testfile'));
var uploadStream = testFile.createWriteStream({resumable: false});
var uploadStream = testFile.createWriteStream({resumable: false})
.on('error', function(error){
debug('uploadStream failed uploading cloudron-testfile', error);
if (error && error.code && (error.code == 403 || error.code == 404)){
callback(new BackupsError(BackupsError.BAD_FIELD, error.message));
}
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
})
;
var testfileStream = new PassThrough();
testfileStream.write("testfilecontents");
testfileStream.end();
testfileStream
.on('error', function(error){
debug('failed uploading cloudron-testfile', error);
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
})
.on('end', function(){
debug('uploaded cloudron-testfile');
debug('uploadStream uploaded cloudron-testfile '+JSON.stringify(arguments));
testFile.delete(function(error){
if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
debug('deleted cloudron-testfile');
debug('testFileStream deleted cloudron-testfile');
callback();
});
})
.pipe(uploadStream);
.pipe(uploadStream);
}
function backupDone(backupId, appBackupIds, callback) {
function backupDone(apiConfig, backupId, appBackupIds, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupId, 'string');
assert(Array.isArray(appBackupIds));
assert.strictEqual(typeof callback, 'function');