s3: refactor out directory listing
This commit is contained in:
@@ -25,6 +25,7 @@ var assert = require('assert'),
|
||||
config = require('../config.js'),
|
||||
debug = require('debug')('box:storage/s3'),
|
||||
fs = require('fs'),
|
||||
chunk = require('lodash.chunk'),
|
||||
mkdirp = require('mkdirp'),
|
||||
PassThrough = require('stream').PassThrough,
|
||||
path = require('path'),
|
||||
@@ -151,12 +152,7 @@ function download(apiConfig, backupFilePath, callback) {
|
||||
});
|
||||
}
|
||||
|
||||
function downloadDir(apiConfig, backupFilePath, destDir, callback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof destDir, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
function listDir(apiConfig, backupFilePath, options, iteratorCallback, callback) {
|
||||
getBackupCredentials(apiConfig, function (error, credentials) {
|
||||
if (error) return callback(error);
|
||||
|
||||
@@ -166,6 +162,8 @@ function downloadDir(apiConfig, backupFilePath, destDir, callback) {
|
||||
Prefix: backupFilePath
|
||||
};
|
||||
|
||||
var total = 0;
|
||||
|
||||
async.forever(function listAndDownload(foreverCallback) {
|
||||
s3.listObjectsV2(listParams, function (error, listData) {
|
||||
if (error) {
|
||||
@@ -173,20 +171,17 @@ function downloadDir(apiConfig, backupFilePath, destDir, callback) {
|
||||
return foreverCallback(error);
|
||||
}
|
||||
|
||||
async.eachLimit(listData.Contents, 10, function downloadFile(content, iteratorCallback) {
|
||||
var relativePath = path.relative(backupFilePath, content.Key);
|
||||
mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) {
|
||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
debug('listDir: processing %s files (processed %s so far)', listData.Contents.length, total);
|
||||
|
||||
var destStream = fs.createWriteStream(path.join(destDir, relativePath));
|
||||
destStream.on('error', function (error) {
|
||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
download(apiConfig, content.Key, destStream, iteratorCallback);
|
||||
});
|
||||
}, function doneCopying(error) {
|
||||
var arr = options.batchSize === 1 ? listData.Contents : chunk(listData.Contents, options.batchSize);
|
||||
|
||||
async.eachLimit(arr, options.concurrency, iteratorCallback.bind(null, s3), function iteratorDone(error) {
|
||||
if (error) return foreverCallback(error);
|
||||
|
||||
total += listData.Contents.length;
|
||||
|
||||
listParams.StartAfter = listData.Contents[listData.Contents.length - 1].Key; // NextMarker is returned only with delimiter
|
||||
|
||||
if (listData.IsTruncated) return foreverCallback();
|
||||
|
||||
foreverCallback(new Error('Done'));
|
||||
@@ -194,64 +189,57 @@ function downloadDir(apiConfig, backupFilePath, destDir, callback) {
|
||||
});
|
||||
}, function (error) {
|
||||
if (error.message === 'Done') return callback();
|
||||
|
||||
callback(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function downloadDir(apiConfig, backupFilePath, destDir, callback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof destDir, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
listDir(apiConfig, backupFilePath, { batchSize: 1, concurrency: 10 }, function downloadFile(s3, content, iteratorCallback) {
|
||||
var relativePath = path.relative(backupFilePath, content.Key);
|
||||
mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) {
|
||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
|
||||
var destStream = fs.createWriteStream(path.join(destDir, relativePath));
|
||||
destStream.on('error', function (error) {
|
||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
download(apiConfig, content.Key, destStream, iteratorCallback);
|
||||
});
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function copy(apiConfig, oldFilePath, newFilePath, callback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
assert.strictEqual(typeof newFilePath, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
getBackupCredentials(apiConfig, function (error, credentials) {
|
||||
if (error) return callback(error);
|
||||
listDir(apiConfig, oldFilePath, { batchSize: 1, concurrency: 10 }, function copyFile(s3, content, iteratorCallback) {
|
||||
var relativePath = path.relative(oldFilePath, content.Key);
|
||||
|
||||
var s3 = new AWS.S3(credentials);
|
||||
var listParams = {
|
||||
var copyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Prefix: oldFilePath
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
CopySource: path.join(apiConfig.bucket, content.Key)
|
||||
};
|
||||
|
||||
async.forever(function listAndDelete(foreverCallback) {
|
||||
s3.listObjectsV2(listParams, function (error, listData) {
|
||||
if (error) {
|
||||
debug('remove: Failed to list %s. Not fatal.', error);
|
||||
return foreverCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
}
|
||||
s3.copyObject(copyParams, function (error) {
|
||||
if (error && error.code === 'NoSuchKey') return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, 'Old backup not found'));
|
||||
if (error) {
|
||||
debug('copy: s3 copy error.', error);
|
||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
}
|
||||
|
||||
async.eachLimit(listData.Contents, 10, function copyFile(content, iteratorCallback) {
|
||||
var relativePath = path.relative(oldFilePath, content.Key);
|
||||
|
||||
var copyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
CopySource: path.join(apiConfig.bucket, content.Key)
|
||||
};
|
||||
|
||||
s3.copyObject(copyParams, function (error) {
|
||||
if (error && error.code === 'NoSuchKey') return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, 'Old backup not found'));
|
||||
if (error) {
|
||||
debug('copy: s3 copy error.', error);
|
||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
||||
}
|
||||
|
||||
iteratorCallback();
|
||||
});
|
||||
}, function doneCopying(error) {
|
||||
if (error) return foreverCallback(error);
|
||||
|
||||
if (listData.IsTruncated) return foreverCallback();
|
||||
|
||||
foreverCallback(new Error('Done'));
|
||||
});
|
||||
});
|
||||
}, function (error) {
|
||||
if (error.message === 'Done') return callback();
|
||||
callback(error);
|
||||
iteratorCallback();
|
||||
});
|
||||
});
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function remove(apiConfig, filename, callback) {
|
||||
@@ -284,47 +272,24 @@ function removeDir(apiConfig, pathPrefix, callback) {
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
getBackupCredentials(apiConfig, function (error, credentials) {
|
||||
if (error) return callback(error);
|
||||
|
||||
var s3 = new AWS.S3(credentials);
|
||||
var listParams = {
|
||||
listDir(apiConfig, pathPrefix, { batchSize: 1000, concurrency: 10 }, function deleteFiles(s3, contents, iteratorCallback) {
|
||||
var deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Prefix: pathPrefix
|
||||
Delete: {
|
||||
Objects: contents.map(function (c) { return { Key: c.Key }; })
|
||||
}
|
||||
};
|
||||
|
||||
async.forever(function listAndDelete(iteratorCallback) {
|
||||
s3.listObjectsV2(listParams, function (error, listData) {
|
||||
if (error) {
|
||||
debug('removeDir: Failed to list %s. Not fatal.', error);
|
||||
return iteratorCallback(error);
|
||||
}
|
||||
s3.deleteObjects(deleteParams, function (error, deleteData) {
|
||||
if (error) {
|
||||
debug('removeDir: Unable to remove %s. Not fatal.', deleteParams.Key, error);
|
||||
return iteratorCallback(error);
|
||||
}
|
||||
debug('removeDir: Deleted: %j Errors: %j', deleteData.Deleted, deleteData.Errors);
|
||||
|
||||
var deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Delete: {
|
||||
Objects: listData.Contents.map(function (c) { return { Key: c.Key }; })
|
||||
}
|
||||
};
|
||||
|
||||
s3.deleteObjects(deleteParams, function (error, deleteData) {
|
||||
if (error) {
|
||||
debug('removeDir: Unable to remove %s. Not fatal.', deleteParams.Key, error);
|
||||
return iteratorCallback(error);
|
||||
}
|
||||
debug('removeDir: Deleted: %j Errors: %j', deleteData.Deleted, deleteData.Errors);
|
||||
|
||||
listParams.Marker = listData.Contents[listData.Contents.length - 1].Key; // NextMarker is returned only with delimiter
|
||||
|
||||
if (listData.IsTruncated) return iteratorCallback();
|
||||
|
||||
iteratorCallback(new Error('Done'));
|
||||
});
|
||||
});
|
||||
}, function (/*ignoredError*/) {
|
||||
callback(null);
|
||||
iteratorCallback();
|
||||
});
|
||||
});
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function testConfig(apiConfig, callback) {
|
||||
|
||||
Reference in New Issue
Block a user