diff --git a/CHANGES b/CHANGES index a672a03fd..a0623166e 100644 --- a/CHANGES +++ b/CHANGES @@ -2085,4 +2085,5 @@ * HTTP URLs now redirect directly to the HTTPS of the final domain * linode: Add singapore region * ovh: add sydney region +* s3: makes multi-part copies in parallel diff --git a/src/storage/s3.js b/src/storage/s3.js index 058020c5d..d80cf17a5 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -230,7 +230,7 @@ function copy(apiConfig, oldFilePath, newFilePath) { assert.strictEqual(typeof oldFilePath, 'string'); assert.strictEqual(typeof newFilePath, 'string'); - var events = new EventEmitter(), retryCount = 0; + var events = new EventEmitter(); function copyFile(entry, iteratorCallback) { getS3Config(apiConfig, function (error, credentials) { @@ -262,7 +262,6 @@ function copy(apiConfig, oldFilePath, newFilePath) { copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath); s3.copyObject(copyParams, done).on('retry', function (response) { - ++retryCount; events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`); // on DO, we get a random 408. these are not retried by the SDK if (response.error) response.error.retryable = true; // https://github.com/aws/aws-sdk-js/issues/412 @@ -273,77 +272,80 @@ function copy(apiConfig, oldFilePath, newFilePath) { events.emit('progress', `Copying (multipart) ${relativePath || oldFilePath}`); - s3.createMultipartUpload(copyParams, function (error, result) { + s3.createMultipartUpload(copyParams, function (error, multipart) { if (error) return done(error); // Exoscale (96M) was suggested by exoscale. 1GB - rather random size for others const chunkSize = apiConfig.provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024; - var uploadId = result.UploadId; - var uploadedParts = []; - var partNumber = 1; - var startBytes = 0; - var endBytes = 0; - var size = entry.size-1; + const uploadId = multipart.UploadId; + let uploadedParts = [], ranges = []; - function copyNextChunk() { - endBytes = startBytes + chunkSize; - if (endBytes > size) endBytes = size; + let cur = 0; + while (cur + chunkSize < entry.size) { + ranges.push({ startBytes: cur, endBytes: cur + chunkSize - 1 }); + cur += chunkSize; + } + ranges.push({ startBytes: cur, endBytes: entry.size-1 }); - var partCopyParams = { + async.eachOfLimit(ranges, 5, function copyChunk(range, index, iteratorDone) { + const partCopyParams = { Bucket: apiConfig.bucket, Key: path.join(newFilePath, relativePath), CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302 - CopySourceRange: 'bytes=' + startBytes + '-' + endBytes, - PartNumber: partNumber, + CopySourceRange: 'bytes=' + range.startBytes + '-' + range.endBytes, + PartNumber: index+1, UploadId: uploadId }; events.emit('progress', `Copying part ${partCopyParams.PartNumber} - ${partCopyParams.CopySource} ${partCopyParams.CopySourceRange}`); - s3.uploadPartCopy(partCopyParams, function (error, result) { - if (error) return done(error); + s3.uploadPartCopy(partCopyParams, function (error, part) { + if (error) return iteratorDone(error); - events.emit('progress', `Uploaded part ${partCopyParams.PartNumber} - Etag: ${result.CopyPartResult.ETag}`); + events.emit('progress', `Uploaded part ${partCopyParams.PartNumber} - Etag: ${part.CopyPartResult.ETag}`); - if (!result.CopyPartResult.ETag) return done(new Error('Multi-part copy is broken or not implemented by the S3 storage provider')); + if (!part.CopyPartResult.ETag) return iteratorDone(new Error('Multi-part copy is broken or not implemented by the S3 storage provider')); - uploadedParts.push({ ETag: result.CopyPartResult.ETag, PartNumber: partNumber }); + uploadedParts[index] = { ETag: part.CopyPartResult.ETag, PartNumber: partCopyParams.PartNumber }; - if (endBytes < size) { - startBytes = endBytes + 1; - partNumber++; - return copyNextChunk(); - } - - var completeMultipartParams = { - Bucket: apiConfig.bucket, - Key: path.join(newFilePath, relativePath), - MultipartUpload: { Parts: uploadedParts }, - UploadId: uploadId - }; - - events.emit('progress', `Finishing multipart copy - ${completeMultipartParams.Key}`); - - s3.completeMultipartUpload(completeMultipartParams, done); + iteratorDone(); }).on('retry', function (response) { - ++retryCount; events.emit('progress', `Retrying (${response.retryCount+1}) multipart copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`); }); - } + }, function chunksCopied(error) { + if (error) { // we must still recommend the user to set a AbortIncompleteMultipartUpload lifecycle rule + const abortParams = { + Bucket: apiConfig.bucket, + Key: path.join(newFilePath, relativePath), + UploadId: uploadId + }; + events.emit('progress', `Aborting multipart copy of ${relativePath || oldFilePath}`); + return s3.abortMultipartUpload(abortParams, () => done(error)); // ignore any abort errors + } - copyNextChunk(); + const completeMultipartParams = { + Bucket: apiConfig.bucket, + Key: path.join(newFilePath, relativePath), + MultipartUpload: { Parts: uploadedParts }, + UploadId: uploadId + }; + + events.emit('progress', `Finishing multipart copy - ${completeMultipartParams.Key}`); + + s3.completeMultipartUpload(completeMultipartParams, done); + }); }); }); } - var total = 0; + let total = 0; const concurrency = apiConfig.copyConcurrency || (apiConfig.provider === 's3' ? 500 : 10); + events.emit('progress', `Copying with concurrency of ${concurrency}`); listDir(apiConfig, oldFilePath, 1000, function listDirIterator(entries, done) { total += entries.length; - events.emit('progress', `Copying ${total-entries.length}-${total}. ${retryCount} errors so far. concurrency set to ${concurrency}`); - retryCount = 0; + events.emit('progress', `Copying files from ${total-entries.length}-${total}`); async.eachLimit(entries, concurrency, copyFile, done); }, function (error) {