@@ -3,9 +3,10 @@
|
|||||||
exports = module.exports = {
|
exports = module.exports = {
|
||||||
upload: upload,
|
upload: upload,
|
||||||
download: download,
|
download: download,
|
||||||
downloadDir: downloadDir,
|
|
||||||
copy: copy,
|
copy: copy,
|
||||||
|
|
||||||
|
listDir: listDir,
|
||||||
|
|
||||||
remove: remove,
|
remove: remove,
|
||||||
removeDir: removeDir,
|
removeDir: removeDir,
|
||||||
|
|
||||||
@@ -24,9 +25,7 @@ var assert = require('assert'),
|
|||||||
config = require('../config.js'),
|
config = require('../config.js'),
|
||||||
debug = require('debug')('box:storage/s3'),
|
debug = require('debug')('box:storage/s3'),
|
||||||
EventEmitter = require('events'),
|
EventEmitter = require('events'),
|
||||||
fs = require('fs'),
|
|
||||||
https = require('https'),
|
https = require('https'),
|
||||||
mkdirp = require('mkdirp'),
|
|
||||||
PassThrough = require('stream').PassThrough,
|
PassThrough = require('stream').PassThrough,
|
||||||
path = require('path'),
|
path = require('path'),
|
||||||
S3BlockReadStream = require('s3-block-read-stream'),
|
S3BlockReadStream = require('s3-block-read-stream'),
|
||||||
@@ -185,14 +184,21 @@ function download(apiConfig, backupFilePath, callback) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function listDir(apiConfig, dir, iteratorCallback, callback) {
|
function listDir(apiConfig, dir, batchSize, iteratorCallback, callback) {
|
||||||
|
assert.strictEqual(typeof apiConfig, 'object');
|
||||||
|
assert.strictEqual(typeof dir, 'string');
|
||||||
|
assert.strictEqual(typeof batchSize, 'number');
|
||||||
|
assert.strictEqual(typeof iteratorCallback, 'function');
|
||||||
|
assert.strictEqual(typeof callback, 'function');
|
||||||
|
|
||||||
getS3Config(apiConfig, function (error, credentials) {
|
getS3Config(apiConfig, function (error, credentials) {
|
||||||
if (error) return callback(error);
|
if (error) return callback(error);
|
||||||
|
|
||||||
var s3 = new AWS.S3(credentials);
|
var s3 = new AWS.S3(credentials);
|
||||||
var listParams = {
|
var listParams = {
|
||||||
Bucket: apiConfig.bucket,
|
Bucket: apiConfig.bucket,
|
||||||
Prefix: dir
|
Prefix: dir,
|
||||||
|
MaxKeys: batchSize
|
||||||
};
|
};
|
||||||
|
|
||||||
async.forever(function listAndDownload(foreverCallback) {
|
async.forever(function listAndDownload(foreverCallback) {
|
||||||
@@ -201,7 +207,10 @@ function listDir(apiConfig, dir, iteratorCallback, callback) {
|
|||||||
|
|
||||||
if (listData.Contents.length === 0) return foreverCallback(new Error('Done'));
|
if (listData.Contents.length === 0) return foreverCallback(new Error('Done'));
|
||||||
|
|
||||||
iteratorCallback(s3, listData.Contents, function (error) {
|
// TODO: remove s3
|
||||||
|
const entries = listData.Contents.map(function (c) { return { fullPath: c.Key, size: c.Size, s3: s3 }; });
|
||||||
|
|
||||||
|
iteratorCallback(entries, function (error) {
|
||||||
if (error) return foreverCallback(error);
|
if (error) return foreverCallback(error);
|
||||||
|
|
||||||
if (!listData.IsTruncated) return foreverCallback(new Error('Done'));
|
if (!listData.IsTruncated) return foreverCallback(new Error('Done'));
|
||||||
@@ -219,54 +228,6 @@ function listDir(apiConfig, dir, iteratorCallback, callback) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function downloadDir(apiConfig, backupFilePath, destDir) {
|
|
||||||
assert.strictEqual(typeof apiConfig, 'object');
|
|
||||||
assert.strictEqual(typeof backupFilePath, 'string');
|
|
||||||
assert.strictEqual(typeof destDir, 'string');
|
|
||||||
|
|
||||||
var events = new EventEmitter();
|
|
||||||
var total = 0;
|
|
||||||
|
|
||||||
function downloadFile(s3, content, iteratorCallback) {
|
|
||||||
var relativePath = path.relative(backupFilePath, content.Key);
|
|
||||||
|
|
||||||
events.emit('progress', `Downloading ${relativePath}`);
|
|
||||||
|
|
||||||
mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) {
|
|
||||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
|
||||||
|
|
||||||
download(apiConfig, content.Key, function (error, sourceStream) {
|
|
||||||
if (error) return iteratorCallback(error);
|
|
||||||
|
|
||||||
var destStream = fs.createWriteStream(path.join(destDir, relativePath));
|
|
||||||
|
|
||||||
destStream.on('open', function () {
|
|
||||||
sourceStream.pipe(destStream);
|
|
||||||
});
|
|
||||||
|
|
||||||
destStream.on('error', function (error) {
|
|
||||||
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
|
|
||||||
});
|
|
||||||
|
|
||||||
destStream.on('finish', iteratorCallback);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
const concurrency = 10;
|
|
||||||
|
|
||||||
listDir(apiConfig, backupFilePath, function (s3, objects, done) {
|
|
||||||
total += objects.length;
|
|
||||||
async.eachLimit(objects, concurrency, downloadFile.bind(null, s3), done);
|
|
||||||
}, function (error) {
|
|
||||||
events.emit('progress', `Downloaded ${total} files`);
|
|
||||||
|
|
||||||
events.emit('done', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
return events;
|
|
||||||
}
|
|
||||||
|
|
||||||
// https://github.com/aws/aws-sdk-js/blob/2b6bcbdec1f274fe931640c1b61ece999aae7a19/lib/util.js#L41
|
// https://github.com/aws/aws-sdk-js/blob/2b6bcbdec1f274fe931640c1b61ece999aae7a19/lib/util.js#L41
|
||||||
// https://github.com/GeorgePhillips/node-s3-url-encode/blob/master/index.js
|
// https://github.com/GeorgePhillips/node-s3-url-encode/blob/master/index.js
|
||||||
// See aws-sdk-js/issues/1302
|
// See aws-sdk-js/issues/1302
|
||||||
@@ -289,14 +250,15 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
|||||||
|
|
||||||
var events = new EventEmitter(), retryCount = 0;
|
var events = new EventEmitter(), retryCount = 0;
|
||||||
|
|
||||||
function copyFile(s3, content, iteratorCallback) {
|
function copyFile(entry, iteratorCallback) {
|
||||||
var relativePath = path.relative(oldFilePath, content.Key);
|
const s3 = entry.s3;
|
||||||
|
var relativePath = path.relative(oldFilePath, entry.fullPath);
|
||||||
|
|
||||||
function done(error) {
|
function done(error) {
|
||||||
if (error) debug(`copy: s3 copy error when copying ${content.Key}: ${error}`);
|
if (error) debug(`copy: s3 copy error when copying ${entry.fullPath}: ${error}`);
|
||||||
|
|
||||||
if (error && error.code === 'NoSuchKey') return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, `Old backup not found: ${content.Key}`));
|
if (error && error.code === 'NoSuchKey') return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, `Old backup not found: ${entry.fullPath}`));
|
||||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error copying ${content.Key} : ${error.code} ${error}`));
|
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error copying ${entry.fullPath} : ${error.code} ${error}`));
|
||||||
|
|
||||||
iteratorCallback(null);
|
iteratorCallback(null);
|
||||||
}
|
}
|
||||||
@@ -310,10 +272,10 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
|||||||
// Exoscale takes too long to copy 5GB
|
// Exoscale takes too long to copy 5GB
|
||||||
const largeFileLimit = apiConfig.provider === 'exoscale-sos' ? 1024 * 1024 * 1024 : 5 * 1024 * 1024 * 1024;
|
const largeFileLimit = apiConfig.provider === 'exoscale-sos' ? 1024 * 1024 * 1024 : 5 * 1024 * 1024 * 1024;
|
||||||
|
|
||||||
if (content.Size < largeFileLimit) {
|
if (entry.size < largeFileLimit) {
|
||||||
events.emit('progress', `Copying ${relativePath || oldFilePath}`);
|
events.emit('progress', `Copying ${relativePath || oldFilePath}`);
|
||||||
|
|
||||||
copyParams.CopySource = encodeCopySource(apiConfig.bucket, content.Key);
|
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
|
||||||
s3.copyObject(copyParams, done).on('retry', function (response) {
|
s3.copyObject(copyParams, done).on('retry', function (response) {
|
||||||
++retryCount;
|
++retryCount;
|
||||||
events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
|
events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
|
||||||
@@ -336,7 +298,7 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
|||||||
var partNumber = 1;
|
var partNumber = 1;
|
||||||
var startBytes = 0;
|
var startBytes = 0;
|
||||||
var endBytes = 0;
|
var endBytes = 0;
|
||||||
var size = content.Size-1;
|
var size = entry.size-1;
|
||||||
|
|
||||||
function copyNextChunk() {
|
function copyNextChunk() {
|
||||||
endBytes = startBytes + chunkSize;
|
endBytes = startBytes + chunkSize;
|
||||||
@@ -345,7 +307,7 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
|||||||
var params = {
|
var params = {
|
||||||
Bucket: apiConfig.bucket,
|
Bucket: apiConfig.bucket,
|
||||||
Key: path.join(newFilePath, relativePath),
|
Key: path.join(newFilePath, relativePath),
|
||||||
CopySource: encodeCopySource(apiConfig.bucket, content.Key), // See aws-sdk-js/issues/1302
|
CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302
|
||||||
CopySourceRange: 'bytes=' + startBytes + '-' + endBytes,
|
CopySourceRange: 'bytes=' + startBytes + '-' + endBytes,
|
||||||
PartNumber: partNumber,
|
PartNumber: partNumber,
|
||||||
UploadId: uploadId
|
UploadId: uploadId
|
||||||
@@ -382,14 +344,14 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
|||||||
|
|
||||||
var total = 0, concurrency = 4;
|
var total = 0, concurrency = 4;
|
||||||
|
|
||||||
listDir(apiConfig, oldFilePath, function listDirIterator(s3, objects, done) {
|
listDir(apiConfig, oldFilePath, 1000, function listDirIterator(entries, done) {
|
||||||
total += objects.length;
|
total += entries.length;
|
||||||
|
|
||||||
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
|
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
|
||||||
events.emit('progress', `Copying ${total-objects.length}-${total}. ${retryCount} errors so far. concurrency set to ${concurrency}`);
|
events.emit('progress', `Copying ${total-entries.length}-${total}. ${retryCount} errors so far. concurrency set to ${concurrency}`);
|
||||||
retryCount = 0;
|
retryCount = 0;
|
||||||
|
|
||||||
async.eachLimit(objects, concurrency, copyFile.bind(null, s3), done);
|
async.eachLimit(entries, concurrency, copyFile, done);
|
||||||
}, function (error) {
|
}, function (error) {
|
||||||
events.emit('progress', `Copied ${total} files with error: ${error}`);
|
events.emit('progress', `Copied ${total} files with error: ${error}`);
|
||||||
|
|
||||||
@@ -432,21 +394,22 @@ function removeDir(apiConfig, pathPrefix) {
|
|||||||
var events = new EventEmitter();
|
var events = new EventEmitter();
|
||||||
var total = 0;
|
var total = 0;
|
||||||
|
|
||||||
listDir(apiConfig, pathPrefix, function listDirIterator(s3, objects, done) {
|
listDir(apiConfig, pathPrefix, 1000, function listDirIterator(entries, done) {
|
||||||
total += objects.length;
|
total += entries.length;
|
||||||
|
|
||||||
const chunkSize = apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100; // throttle objects in each request
|
const chunkSize = apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100; // throttle objects in each request
|
||||||
var chunks = chunk(objects, chunkSize);
|
var chunks = chunk(entries, chunkSize);
|
||||||
|
|
||||||
async.eachSeries(chunks, function deleteFiles(contents, iteratorCallback) {
|
async.eachSeries(chunks, function deleteFiles(objects, iteratorCallback) {
|
||||||
|
const s3 = objects[0].s3;
|
||||||
var deleteParams = {
|
var deleteParams = {
|
||||||
Bucket: apiConfig.bucket,
|
Bucket: apiConfig.bucket,
|
||||||
Delete: {
|
Delete: {
|
||||||
Objects: contents.map(function (c) { return { Key: c.Key }; })
|
Objects: objects.map(function (o) { return { Key: o.fullPath }; })
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
events.emit('progress', `Removing ${contents.length} files from ${contents[0].Key} to ${contents[contents.length-1].Key}`);
|
events.emit('progress', `Removing ${objects.length} files from ${objects[0].fullPath} to ${objects[objects.length-1].Key}`);
|
||||||
|
|
||||||
// deleteObjects does not return error if key is not found
|
// deleteObjects does not return error if key is not found
|
||||||
s3.deleteObjects(deleteParams, function (error /*, deleteData */) {
|
s3.deleteObjects(deleteParams, function (error /*, deleteData */) {
|
||||||
|
|||||||
Reference in New Issue
Block a user