s3: copy parts in parallel

This commit is contained in:
Girish Ramakrishnan
2020-09-02 22:32:42 -07:00
parent 639bddb4b7
commit fc08f9823e
2 changed files with 45 additions and 42 deletions

View File

@@ -230,7 +230,7 @@ function copy(apiConfig, oldFilePath, newFilePath) {
assert.strictEqual(typeof oldFilePath, 'string');
assert.strictEqual(typeof newFilePath, 'string');
var events = new EventEmitter(), retryCount = 0;
var events = new EventEmitter();
function copyFile(entry, iteratorCallback) {
getS3Config(apiConfig, function (error, credentials) {
@@ -262,7 +262,6 @@ function copy(apiConfig, oldFilePath, newFilePath) {
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
s3.copyObject(copyParams, done).on('retry', function (response) {
++retryCount;
events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
// on DO, we get a random 408. these are not retried by the SDK
if (response.error) response.error.retryable = true; // https://github.com/aws/aws-sdk-js/issues/412
@@ -273,77 +272,80 @@ function copy(apiConfig, oldFilePath, newFilePath) {
events.emit('progress', `Copying (multipart) ${relativePath || oldFilePath}`);
s3.createMultipartUpload(copyParams, function (error, result) {
s3.createMultipartUpload(copyParams, function (error, multipart) {
if (error) return done(error);
// Exoscale (96M) was suggested by exoscale. 1GB - rather random size for others
const chunkSize = apiConfig.provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024;
var uploadId = result.UploadId;
var uploadedParts = [];
var partNumber = 1;
var startBytes = 0;
var endBytes = 0;
var size = entry.size-1;
const uploadId = multipart.UploadId;
let uploadedParts = [], ranges = [];
function copyNextChunk() {
endBytes = startBytes + chunkSize;
if (endBytes > size) endBytes = size;
let cur = 0;
while (cur + chunkSize < entry.size) {
ranges.push({ startBytes: cur, endBytes: cur + chunkSize - 1 });
cur += chunkSize;
}
ranges.push({ startBytes: cur, endBytes: entry.size-1 });
var partCopyParams = {
async.eachOfLimit(ranges, 5, function copyChunk(range, index, iteratorDone) {
const partCopyParams = {
Bucket: apiConfig.bucket,
Key: path.join(newFilePath, relativePath),
CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302
CopySourceRange: 'bytes=' + startBytes + '-' + endBytes,
PartNumber: partNumber,
CopySourceRange: 'bytes=' + range.startBytes + '-' + range.endBytes,
PartNumber: index+1,
UploadId: uploadId
};
events.emit('progress', `Copying part ${partCopyParams.PartNumber} - ${partCopyParams.CopySource} ${partCopyParams.CopySourceRange}`);
s3.uploadPartCopy(partCopyParams, function (error, result) {
if (error) return done(error);
s3.uploadPartCopy(partCopyParams, function (error, part) {
if (error) return iteratorDone(error);
events.emit('progress', `Uploaded part ${partCopyParams.PartNumber} - Etag: ${result.CopyPartResult.ETag}`);
events.emit('progress', `Uploaded part ${partCopyParams.PartNumber} - Etag: ${part.CopyPartResult.ETag}`);
if (!result.CopyPartResult.ETag) return done(new Error('Multi-part copy is broken or not implemented by the S3 storage provider'));
if (!part.CopyPartResult.ETag) return iteratorDone(new Error('Multi-part copy is broken or not implemented by the S3 storage provider'));
uploadedParts.push({ ETag: result.CopyPartResult.ETag, PartNumber: partNumber });
uploadedParts[index] = { ETag: part.CopyPartResult.ETag, PartNumber: partCopyParams.PartNumber };
if (endBytes < size) {
startBytes = endBytes + 1;
partNumber++;
return copyNextChunk();
}
var completeMultipartParams = {
Bucket: apiConfig.bucket,
Key: path.join(newFilePath, relativePath),
MultipartUpload: { Parts: uploadedParts },
UploadId: uploadId
};
events.emit('progress', `Finishing multipart copy - ${completeMultipartParams.Key}`);
s3.completeMultipartUpload(completeMultipartParams, done);
iteratorDone();
}).on('retry', function (response) {
++retryCount;
events.emit('progress', `Retrying (${response.retryCount+1}) multipart copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
});
}
}, function chunksCopied(error) {
if (error) { // we must still recommend the user to set a AbortIncompleteMultipartUpload lifecycle rule
const abortParams = {
Bucket: apiConfig.bucket,
Key: path.join(newFilePath, relativePath),
UploadId: uploadId
};
events.emit('progress', `Aborting multipart copy of ${relativePath || oldFilePath}`);
return s3.abortMultipartUpload(abortParams, () => done(error)); // ignore any abort errors
}
copyNextChunk();
const completeMultipartParams = {
Bucket: apiConfig.bucket,
Key: path.join(newFilePath, relativePath),
MultipartUpload: { Parts: uploadedParts },
UploadId: uploadId
};
events.emit('progress', `Finishing multipart copy - ${completeMultipartParams.Key}`);
s3.completeMultipartUpload(completeMultipartParams, done);
});
});
});
}
var total = 0;
let total = 0;
const concurrency = apiConfig.copyConcurrency || (apiConfig.provider === 's3' ? 500 : 10);
events.emit('progress', `Copying with concurrency of ${concurrency}`);
listDir(apiConfig, oldFilePath, 1000, function listDirIterator(entries, done) {
total += entries.length;
events.emit('progress', `Copying ${total-entries.length}-${total}. ${retryCount} errors so far. concurrency set to ${concurrency}`);
retryCount = 0;
events.emit('progress', `Copying files from ${total-entries.length}-${total}`);
async.eachLimit(entries, concurrency, copyFile, done);
}, function (error) {