storage: make copy async

This commit is contained in:
Girish Ramakrishnan
2022-04-30 16:01:42 -07:00
parent 8ceb80dc44
commit ea01586b52
7 changed files with 54 additions and 95 deletions

View File

@@ -33,7 +33,6 @@ const assert = require('assert'),
constants = require('../constants.js'),
DataLayout = require('../datalayout.js'),
debug = require('debug')('box:storage/s3'),
EventEmitter = require('events'),
https = require('https'),
path = require('path'),
Readable = require('stream').Readable,
@@ -323,12 +322,11 @@ function encodeCopySource(bucket, path) {
return `/${bucket}/${output}`;
}
function copy(apiConfig, oldFilePath, newFilePath) {
async function copy(apiConfig, oldFilePath, newFilePath, progressCallback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof oldFilePath, 'string');
assert.strictEqual(typeof newFilePath, 'string');
const events = new EventEmitter();
assert.strictEqual(typeof progressCallback, 'function');
function copyFile(entry, iteratorCallback) {
const credentials = getS3Config(apiConfig);
@@ -355,11 +353,11 @@ function copy(apiConfig, oldFilePath, newFilePath) {
const largeFileLimit = (apiConfig.provider === 'exoscale-sos' || apiConfig.provider === 'backblaze-b2' || apiConfig.provider === 'digitalocean-spaces') ? 1024 * 1024 * 1024 : 5 * 1024 * 1024 * 1024;
if (entry.size < largeFileLimit) {
events.emit('progress', `Copying ${relativePath || oldFilePath}`);
progressCallback({ message: `Copying ${relativePath || oldFilePath}` });
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
s3.copyObject(copyParams, done).on('retry', function (response) {
events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
progressCallback({ message: `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}` });
// on DO, we get a random 408. these are not retried by the SDK
if (response.error) response.error.retryable = true; // https://github.com/aws/aws-sdk-js/issues/412
});
@@ -367,7 +365,7 @@ function copy(apiConfig, oldFilePath, newFilePath) {
return;
}
events.emit('progress', `Copying (multipart) ${relativePath || oldFilePath}`);
progressCallback({ message: `Copying (multipart) ${relativePath || oldFilePath}` });
s3.createMultipartUpload(copyParams, function (error, multipart) {
if (error) return done(error);
@@ -394,12 +392,12 @@ function copy(apiConfig, oldFilePath, newFilePath) {
UploadId: uploadId
};
events.emit('progress', `Copying part ${partCopyParams.PartNumber} - ${partCopyParams.CopySource} ${partCopyParams.CopySourceRange}`);
progressCallback({ message: `Copying part ${partCopyParams.PartNumber} - ${partCopyParams.CopySource} ${partCopyParams.CopySourceRange}` });
s3.uploadPartCopy(partCopyParams, function (error, part) {
if (error) return iteratorDone(error);
events.emit('progress', `Copying part ${partCopyParams.PartNumber} - Etag: ${part.CopyPartResult.ETag}`);
progressCallback({ message: `Copying part ${partCopyParams.PartNumber} - Etag: ${part.CopyPartResult.ETag}` });
if (!part.CopyPartResult.ETag) return iteratorDone(new Error('Multi-part copy is broken or not implemented by the S3 storage provider'));
@@ -407,7 +405,7 @@ function copy(apiConfig, oldFilePath, newFilePath) {
iteratorDone();
}).on('retry', function (response) {
events.emit('progress', `Retrying (${response.retryCount+1}) multipart copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
progressCallback({ message: `Retrying (${response.retryCount+1}) multipart copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}` });
});
}, function chunksCopied(error) {
if (error) { // we must still recommend the user to set a AbortIncompleteMultipartUpload lifecycle rule
@@ -416,7 +414,7 @@ function copy(apiConfig, oldFilePath, newFilePath) {
Key: path.join(newFilePath, relativePath),
UploadId: uploadId
};
events.emit('progress', `Aborting multipart copy of ${relativePath || oldFilePath}`);
progressCallback({ message: `Aborting multipart copy of ${relativePath || oldFilePath}` });
return s3.abortMultipartUpload(abortParams, () => done(error)); // ignore any abort errors
}
@@ -427,7 +425,7 @@ function copy(apiConfig, oldFilePath, newFilePath) {
UploadId: uploadId
};
events.emit('progress', `Finishing multipart copy - ${completeMultipartParams.Key}`);
progressCallback({ message: `Finishing multipart copy - ${completeMultipartParams.Key}` });
s3.completeMultipartUpload(completeMultipartParams, done);
});
@@ -436,21 +434,19 @@ function copy(apiConfig, oldFilePath, newFilePath) {
let total = 0;
const concurrency = apiConfig.copyConcurrency || (apiConfig.provider === 's3' ? 500 : 10);
events.emit('progress', `Copying with concurrency of ${concurrency}`);
progressCallback({ message: `Copying with concurrency of ${concurrency}` });
listDir(apiConfig, oldFilePath, 1000, function listDirIterator(entries, done) {
const listDirAsync = util.promisify(listDir);
const [copyError] = await safe(listDirAsync(apiConfig, oldFilePath, 1000, function listDirIterator(entries, done) {
total += entries.length;
events.emit('progress', `Copying files from ${total-entries.length}-${total}`);
progressCallback({ message: `Copying files from ${total-entries.length}-${total}` });
async.eachLimit(entries, concurrency, copyFile, done);
}, function (error) {
events.emit('progress', `Copied ${total} files with error: ${error}`);
}));
process.nextTick(() => events.emit('done', error));
});
return events;
progressCallback({ message: `Copied ${total} files with error: ${copyError}` });
}
async function remove(apiConfig, filename) {