storage: start migration of s3 api
This commit is contained in:
@@ -18,76 +18,67 @@ exports = module.exports = {
|
||||
injectPrivateFields,
|
||||
|
||||
// Used to mock AWS
|
||||
_mockInject: mockInject,
|
||||
_mockRestore: mockRestore,
|
||||
_chunk: chunk
|
||||
};
|
||||
|
||||
// https://github.com/aws/aws-sdk-js/issues/4354
|
||||
require('aws-sdk/lib/maintenance_mode_message').suppress = true;
|
||||
|
||||
const assert = require('assert'),
|
||||
async = require('async'),
|
||||
AwsSdk = require('aws-sdk'),
|
||||
BoxError = require('../boxerror.js'),
|
||||
{ ConfiguredRetryStrategy } = require('@smithy/util-retry'),
|
||||
constants = require('../constants.js'),
|
||||
debug = require('debug')('box:storage/s3'),
|
||||
http = require('http'),
|
||||
https = require('https'),
|
||||
{ NodeHttpHandler } = require('@smithy/node-http-handler'),
|
||||
{ PassThrough } = require('node:stream'),
|
||||
path = require('path'),
|
||||
Readable = require('stream').Readable,
|
||||
{ S3 } = require('@aws-sdk/client-s3'),
|
||||
safe = require('safetydance'),
|
||||
_ = require('underscore');
|
||||
|
||||
let aws = AwsSdk;
|
||||
|
||||
// test only
|
||||
let originalAWS;
|
||||
function mockInject(mock) {
|
||||
originalAWS = aws;
|
||||
aws = mock;
|
||||
}
|
||||
|
||||
function mockRestore() {
|
||||
aws = originalAWS;
|
||||
}
|
||||
{ Upload } = require('@aws-sdk/lib-storage');
|
||||
|
||||
function S3_NOT_FOUND(error) {
|
||||
return error.code === 'NoSuchKey' || error.code === 'NotFound' || error.code === 'ENOENT';
|
||||
}
|
||||
|
||||
function getS3Config(apiConfig) {
|
||||
const RETRY_STRATEGY = new ConfiguredRetryStrategy(10 /* max attempts */, (/* attempt */) => 20000 /* constant backoff */);
|
||||
|
||||
function createS3Client(apiConfig, options) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof options, 'object');
|
||||
|
||||
const credentials = {
|
||||
signatureVersion: apiConfig.signatureVersion || 'v4',
|
||||
s3ForcePathStyle: false, // Use vhost style instead of path style - https://forums.aws.amazon.com/ann.jspa?annID=6776
|
||||
accessKeyId: apiConfig.accessKeyId,
|
||||
secretAccessKey: apiConfig.secretAccessKey,
|
||||
region: apiConfig.region || 'us-east-1',
|
||||
maxRetries: 10,
|
||||
retryDelayOptions: {
|
||||
customBackoff: (/* retryCount, error */) => 20000 // constant backoff - https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html#retryDelayOptions-property
|
||||
},
|
||||
httpOptions: {
|
||||
connectTimeout: 60000, // https://github.com/aws/aws-sdk-js/pull/1446
|
||||
timeout: 20 * 60 * 1000 // https://github.com/aws/aws-sdk-js/issues/1704
|
||||
}
|
||||
secretAccessKey: apiConfig.secretAccessKey
|
||||
};
|
||||
|
||||
if (apiConfig.endpoint) credentials.endpoint = apiConfig.endpoint;
|
||||
const requestHandler = new NodeHttpHandler({
|
||||
connectionTimeout: 60000,
|
||||
socketTimeout: 20 * 60 * 1000
|
||||
});
|
||||
|
||||
if (apiConfig.s3ForcePathStyle === true) credentials.s3ForcePathStyle = true;
|
||||
// sdk v3 only has signature support v4
|
||||
const clientConfig = {
|
||||
forcePathStyle: apiConfig.s3ForcePathStyle === true ? true : false, // Use vhost style instead of path style - https://forums.aws.amazon.com/ann.jspa?annID=6776
|
||||
region: apiConfig.region || 'us-east-1',
|
||||
credentials,
|
||||
requestHandler: requestHandler
|
||||
};
|
||||
|
||||
if (options.retryStrategy) clientConfig.retryStrategy = options.retryStrategy;
|
||||
if (apiConfig.endpoint) clientConfig.endpoint = apiConfig.endpoint;
|
||||
|
||||
// s3 endpoint names come from the SDK
|
||||
const isHttps = (credentials.endpoint && credentials.endpoint.startsWith('https://')) || apiConfig.provider === 's3';
|
||||
if (isHttps) { // only set agent for https calls. otherwise, it crashes
|
||||
const isHttps = clientConfig.endpoint?.startsWith('https://') || apiConfig.provider === 's3';
|
||||
if (isHttps) {
|
||||
if (apiConfig.acceptSelfSignedCerts || apiConfig.bucket.includes('.')) {
|
||||
credentials.httpOptions.agent = new https.Agent({ rejectUnauthorized: false });
|
||||
requestHandler.agent = new https.Agent({ rejectUnauthorized: false });
|
||||
}
|
||||
} else { // http agent is required for http endpoints
|
||||
requestHandler.agent = new http.Agent({});
|
||||
}
|
||||
|
||||
return credentials;
|
||||
return constants.TEST ? new globalThis.S3Mock(clientConfig) : new S3(clientConfig);
|
||||
}
|
||||
|
||||
async function getAvailableSize(apiConfig) {
|
||||
@@ -100,8 +91,7 @@ async function upload(apiConfig, backupFilePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
const s3 = new aws.S3(credentials);
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
|
||||
// s3.upload automatically does a multi-part upload. we set queueSize to 3 to reduce memory usage
|
||||
// uploader will buffer at most queueSize * partSize bytes into memory at any given time.
|
||||
@@ -111,15 +101,21 @@ async function upload(apiConfig, backupFilePath) {
|
||||
|
||||
const passThrough = new PassThrough();
|
||||
|
||||
const params = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: backupFilePath,
|
||||
Body: passThrough
|
||||
const options = {
|
||||
client: s3,
|
||||
params: {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: backupFilePath,
|
||||
Body: passThrough
|
||||
},
|
||||
partSize,
|
||||
queueSize: 3,
|
||||
leavePartsOnError: false
|
||||
};
|
||||
|
||||
const managedUpload = s3.upload(params, { partSize, queueSize: 3 });
|
||||
const managedUpload = constants.TEST ? new globalThis.S3MockUpload(options) : new Upload(options);
|
||||
managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`));
|
||||
const uploadPromise = managedUpload.promise();
|
||||
const uploadPromise = managedUpload.done();
|
||||
|
||||
return {
|
||||
stream: passThrough,
|
||||
@@ -135,9 +131,7 @@ async function exists(apiConfig, backupFilePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
const s3 = new aws.S3(_.omit(credentials, 'retryDelayOptions', 'maxRetries'));
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: null });
|
||||
|
||||
if (!backupFilePath.endsWith('/')) { // check for file
|
||||
const params = {
|
||||
@@ -145,7 +139,7 @@ async function exists(apiConfig, backupFilePath) {
|
||||
Key: backupFilePath
|
||||
};
|
||||
|
||||
const [error, response] = await safe(s3.headObject(params).promise());
|
||||
const [error, response] = await safe(s3.headObject(params));
|
||||
if (error && S3_NOT_FOUND(error)) return false;
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error headObject ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`);
|
||||
if (!response || typeof response.Metadata !== 'object') throw new BoxError(BoxError.EXTERNAL_ERROR, 'not a s3 endpoint');
|
||||
@@ -158,7 +152,7 @@ async function exists(apiConfig, backupFilePath) {
|
||||
MaxKeys: 1
|
||||
};
|
||||
|
||||
const [error, listData] = await safe(s3.listObjects(listParams).promise());
|
||||
const [error, listData] = await safe(s3.listObjects(listParams));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`);
|
||||
|
||||
return listData.Contents.length !== 0;
|
||||
@@ -193,24 +187,23 @@ class S3MultipartDownloadStream extends Readable {
|
||||
}
|
||||
}
|
||||
|
||||
_downloadRange(offset, length) {
|
||||
async _downloadRange(offset, length) {
|
||||
const params = Object.assign({}, this._params);
|
||||
const lastPos = offset + length - 1;
|
||||
const range = `bytes=${offset}-${lastPos}`;
|
||||
params['Range'] = range;
|
||||
|
||||
this._s3.getObject(params, (error, data) => {
|
||||
if (error) return this._handleError(error);
|
||||
const [error, data] = await safe(this._s3.getObject(params));
|
||||
if (error) return this._handleError(error);
|
||||
|
||||
const length = parseInt(data.ContentLength, 10);
|
||||
const contentLength = parseInt(data.ContentLength, 10); // should be same as length
|
||||
|
||||
if (length > 0) {
|
||||
this._readSize += length;
|
||||
this.push(data.Body);
|
||||
} else {
|
||||
this._done();
|
||||
}
|
||||
});
|
||||
if (contentLength > 0) {
|
||||
this._readSize += contentLength;
|
||||
this.push(data.Body);
|
||||
} else {
|
||||
this._done();
|
||||
}
|
||||
}
|
||||
|
||||
_nextDownload() {
|
||||
@@ -223,22 +216,21 @@ class S3MultipartDownloadStream extends Readable {
|
||||
this._downloadRange(this._readSize, len);
|
||||
}
|
||||
|
||||
_fetchSize() {
|
||||
this._s3.headObject(this._params, (error, data) => {
|
||||
if (error) return this._handleError(error);
|
||||
async _fetchSize() {
|
||||
const [error, data] = await safe(this._s3.headObject(this._params));
|
||||
if (error) return this._handleError(error);
|
||||
|
||||
const length = parseInt(data.ContentLength, 10);
|
||||
const length = parseInt(data.ContentLength, 10);
|
||||
|
||||
if (length > 0) {
|
||||
this._fileSize = length;
|
||||
this._nextDownload();
|
||||
} else {
|
||||
this._done();
|
||||
}
|
||||
});
|
||||
if (length > 0) {
|
||||
this._fileSize = length;
|
||||
this._nextDownload();
|
||||
} else {
|
||||
this._done();
|
||||
}
|
||||
}
|
||||
|
||||
_read() {
|
||||
_read() { // reimp
|
||||
if (this._readSize === this._fileSize) return this._done();
|
||||
if (this._readSize === 0) return this._fetchSize();
|
||||
|
||||
@@ -250,14 +242,12 @@ async function download(apiConfig, backupFilePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
const params = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: backupFilePath
|
||||
};
|
||||
|
||||
const s3 = new aws.S3(credentials);
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
return new S3MultipartDownloadStream(s3, params, { blockSize: 64 * 1024 * 1024 });
|
||||
}
|
||||
|
||||
@@ -267,9 +257,7 @@ async function listDir(apiConfig, dir, batchSize, marker) {
|
||||
assert.strictEqual(typeof batchSize, 'number');
|
||||
assert(typeof marker !== 'undefined');
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
const s3 = new aws.S3(credentials);
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
const listParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Prefix: dir,
|
||||
@@ -277,7 +265,7 @@ async function listDir(apiConfig, dir, batchSize, marker) {
|
||||
};
|
||||
if (marker) listParams.Marker = marker;
|
||||
|
||||
const [error, listData] = await safe(s3.listObjects(listParams).promise());
|
||||
const [error, listData] = await safe(s3.listObjects(listParams));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects in ${dir}. Message: ${error.message} HTTP Code: ${error.code}`);
|
||||
if (listData.Contents.length === 0) return { entries: [], marker: null }; // no more
|
||||
const entries = listData.Contents.map(function (c) { return { fullPath: c.Key, size: c.Size }; });
|
||||
@@ -297,115 +285,107 @@ function encodeCopySource(bucket, path) {
|
||||
return `/${bucket}/${output}`;
|
||||
}
|
||||
|
||||
async function copyFile(apiConfig, oldFilePath, newFilePath, entry, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
assert.strictEqual(typeof newFilePath, 'string');
|
||||
assert.strictEqual(typeof entry, 'object');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY }); // https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
|
||||
const relativePath = path.relative(oldFilePath, entry.fullPath);
|
||||
|
||||
function throwError(error) {
|
||||
if (error) debug(`copy: s3 copy error when copying ${entry.fullPath}: ${error}`);
|
||||
|
||||
if (error && S3_NOT_FOUND(error)) throw new BoxError(BoxError.NOT_FOUND, `Old backup not found: ${entry.fullPath}`);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error copying ${entry.fullPath} (${entry.size} bytes): ${error.code || ''} ${error}`);
|
||||
}
|
||||
|
||||
const copyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath)
|
||||
};
|
||||
|
||||
// S3 copyObject has a file size limit of 5GB so if we have larger files, we do a multipart copy
|
||||
const largeFileLimit = (apiConfig.provider === 'vultr-objectstorage' || apiConfig.provider === 'exoscale-sos' || apiConfig.provider === 'backblaze-b2' || apiConfig.provider === 'digitalocean-spaces') ? 1024 * 1024 * 1024 : 3 * 1024 * 1024 * 1024;
|
||||
|
||||
if (entry.size < largeFileLimit) {
|
||||
progressCallback({ message: `Copying ${relativePath || oldFilePath}` });
|
||||
|
||||
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
|
||||
const [copyError] = await safe(s3.copyObject(copyParams));
|
||||
if (copyError) return throwError(copyError);
|
||||
return;
|
||||
}
|
||||
|
||||
progressCallback({ message: `Copying (multipart) ${relativePath || oldFilePath}` });
|
||||
|
||||
const [createMultipartError, multipart] = await safe(s3.createMultipartUpload(copyParams));
|
||||
if (createMultipartError) return throwError(createMultipartError);
|
||||
|
||||
// Exoscale (96M) was suggested by exoscale. 1GB for others is arbitrary size
|
||||
const chunkSize = apiConfig.provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024;
|
||||
const uploadId = multipart.UploadId;
|
||||
const uploadedParts = [], ranges = [];
|
||||
|
||||
let cur = 0;
|
||||
while (cur + chunkSize < entry.size) {
|
||||
ranges.push({ startBytes: cur, endBytes: cur + chunkSize - 1 });
|
||||
cur += chunkSize;
|
||||
}
|
||||
ranges.push({ startBytes: cur, endBytes: entry.size-1 });
|
||||
|
||||
const [copyError] = await safe(async.eachOfLimit(ranges, 3, async function copyChunk(range, index) {
|
||||
const partCopyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302
|
||||
CopySourceRange: 'bytes=' + range.startBytes + '-' + range.endBytes,
|
||||
PartNumber: index+1,
|
||||
UploadId: uploadId
|
||||
};
|
||||
|
||||
progressCallback({ message: `Copying part ${partCopyParams.PartNumber} - ${partCopyParams.CopySource} ${partCopyParams.CopySourceRange}` });
|
||||
|
||||
const part = await s3.uploadPartCopy(partCopyParams);
|
||||
progressCallback({ message: `Copied part ${partCopyParams.PartNumber} - Etag: ${part.CopyPartResult.ETag}` });
|
||||
|
||||
if (!part.CopyPartResult.ETag) throw new Error('Multi-part copy is broken or not implemented by the S3 storage provider');
|
||||
|
||||
uploadedParts[index] = { ETag: part.CopyPartResult.ETag, PartNumber: partCopyParams.PartNumber };
|
||||
}));
|
||||
|
||||
if (copyError) { // we must still recommend the user to set a AbortIncompleteMultipartUpload lifecycle rule
|
||||
const abortParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
UploadId: uploadId
|
||||
};
|
||||
progressCallback({ message: `Aborting multipart copy of ${relativePath || oldFilePath}` });
|
||||
await safe(s3.abortMultipartUpload(abortParams), { debug }); // ignore any abort errors
|
||||
return throwError(copyError);
|
||||
}
|
||||
|
||||
const completeMultipartParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
MultipartUpload: { Parts: uploadedParts },
|
||||
UploadId: uploadId
|
||||
};
|
||||
|
||||
progressCallback({ message: `Finishing multipart copy - ${completeMultipartParams.Key}` });
|
||||
|
||||
const [completeMultipartError] = await safe(s3.completeMultipartUpload(completeMultipartParams));
|
||||
if (completeMultipartError) return throwError(completeMultipartError);
|
||||
}
|
||||
|
||||
async function copy(apiConfig, oldFilePath, newFilePath, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
assert.strictEqual(typeof newFilePath, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
function copyFile(entry, iteratorCallback) {
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
const s3 = new aws.S3(credentials);
|
||||
const relativePath = path.relative(oldFilePath, entry.fullPath);
|
||||
|
||||
function done(error) {
|
||||
if (error) debug(`copy: s3 copy error when copying ${entry.fullPath}: ${error}`);
|
||||
|
||||
if (error && S3_NOT_FOUND(error)) return iteratorCallback(new BoxError(BoxError.NOT_FOUND, `Old backup not found: ${entry.fullPath}`));
|
||||
if (error) return iteratorCallback(new BoxError(BoxError.EXTERNAL_ERROR, `Error copying ${entry.fullPath} (${entry.size} bytes): ${error.code || ''} ${error}`));
|
||||
|
||||
iteratorCallback(null);
|
||||
}
|
||||
|
||||
const copyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath)
|
||||
};
|
||||
|
||||
// S3 copyObject has a file size limit of 5GB so if we have larger files, we do a multipart copy
|
||||
const largeFileLimit = (apiConfig.provider === 'vultr-objectstorage' || apiConfig.provider === 'exoscale-sos' || apiConfig.provider === 'backblaze-b2' || apiConfig.provider === 'digitalocean-spaces') ? 1024 * 1024 * 1024 : 3 * 1024 * 1024 * 1024;
|
||||
|
||||
if (entry.size < largeFileLimit) {
|
||||
progressCallback({ message: `Copying ${relativePath || oldFilePath}` });
|
||||
|
||||
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
|
||||
s3.copyObject(copyParams, done).on('retry', function (response) {
|
||||
progressCallback({ message: `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}` });
|
||||
// on DO, we get a random 408. these are not retried by the SDK
|
||||
if (response.error) response.error.retryable = true; // https://github.com/aws/aws-sdk-js/issues/412
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
progressCallback({ message: `Copying (multipart) ${relativePath || oldFilePath}` });
|
||||
|
||||
s3.createMultipartUpload(copyParams, function (error, multipart) {
|
||||
if (error) return done(error);
|
||||
|
||||
// Exoscale (96M) was suggested by exoscale. 1GB for others is arbitrary size
|
||||
const chunkSize = apiConfig.provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024;
|
||||
const uploadId = multipart.UploadId;
|
||||
const uploadedParts = [], ranges = [];
|
||||
|
||||
let cur = 0;
|
||||
while (cur + chunkSize < entry.size) {
|
||||
ranges.push({ startBytes: cur, endBytes: cur + chunkSize - 1 });
|
||||
cur += chunkSize;
|
||||
}
|
||||
ranges.push({ startBytes: cur, endBytes: entry.size-1 });
|
||||
|
||||
async.eachOfLimit(ranges, 3, function copyChunk(range, index, iteratorDone) {
|
||||
const partCopyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302
|
||||
CopySourceRange: 'bytes=' + range.startBytes + '-' + range.endBytes,
|
||||
PartNumber: index+1,
|
||||
UploadId: uploadId
|
||||
};
|
||||
|
||||
progressCallback({ message: `Copying part ${partCopyParams.PartNumber} - ${partCopyParams.CopySource} ${partCopyParams.CopySourceRange}` });
|
||||
|
||||
s3.uploadPartCopy(partCopyParams, function (error, part) {
|
||||
if (error) return iteratorDone(error);
|
||||
|
||||
progressCallback({ message: `Copying part ${partCopyParams.PartNumber} - Etag: ${part.CopyPartResult.ETag}` });
|
||||
|
||||
if (!part.CopyPartResult.ETag) return iteratorDone(new Error('Multi-part copy is broken or not implemented by the S3 storage provider'));
|
||||
|
||||
uploadedParts[index] = { ETag: part.CopyPartResult.ETag, PartNumber: partCopyParams.PartNumber };
|
||||
|
||||
iteratorDone();
|
||||
}).on('retry', function (response) {
|
||||
progressCallback({ message: `Retrying (${response.retryCount+1}) multipart copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}` });
|
||||
});
|
||||
}, function chunksCopied(error) {
|
||||
if (error) { // we must still recommend the user to set a AbortIncompleteMultipartUpload lifecycle rule
|
||||
const abortParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
UploadId: uploadId
|
||||
};
|
||||
progressCallback({ message: `Aborting multipart copy of ${relativePath || oldFilePath}` });
|
||||
return s3.abortMultipartUpload(abortParams, () => done(error)); // ignore any abort errors
|
||||
}
|
||||
|
||||
const completeMultipartParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
MultipartUpload: { Parts: uploadedParts },
|
||||
UploadId: uploadId
|
||||
};
|
||||
|
||||
progressCallback({ message: `Finishing multipart copy - ${completeMultipartParams.Key}` });
|
||||
|
||||
s3.completeMultipartUpload(completeMultipartParams, done);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
let total = 0;
|
||||
const concurrency = apiConfig.limits?.copyConcurrency || (apiConfig.provider === 's3' ? 500 : 10);
|
||||
progressCallback({ message: `Copying with concurrency of ${concurrency}` });
|
||||
@@ -415,7 +395,7 @@ async function copy(apiConfig, oldFilePath, newFilePath, progressCallback) {
|
||||
const batch = await listDir(apiConfig, oldFilePath, 1000, marker);
|
||||
total += batch.entries.length;
|
||||
progressCallback({ message: `Copying files from ${total-batch.entries.length}-${total}` });
|
||||
await async.eachLimit(batch.entries, concurrency, copyFile);
|
||||
await async.eachLimit(batch.entries, concurrency, async (entry) => await copyFile(apiConfig, oldFilePath, newFilePath, entry, progressCallback));
|
||||
if (!batch.marker) break;
|
||||
marker = batch.marker;
|
||||
}
|
||||
@@ -427,9 +407,7 @@ async function remove(apiConfig, filename) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
const s3 = new aws.S3(credentials);
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
|
||||
const deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
@@ -439,7 +417,7 @@ async function remove(apiConfig, filename) {
|
||||
};
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
const [error] = await safe(s3.deleteObjects(deleteParams).promise());
|
||||
const [error] = await safe(s3.deleteObjects(deleteParams));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${deleteParams.Key}. error: ${error.message}`);
|
||||
}
|
||||
|
||||
@@ -464,8 +442,7 @@ async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
const s3 = new aws.S3(credentials);
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
|
||||
let total = 0;
|
||||
let marker = null;
|
||||
@@ -489,7 +466,7 @@ async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
progressCallback({ message: `Removing ${objects.length} files from ${objects[0].fullPath} to ${objects[objects.length-1].fullPath}` });
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
const [error] = await safe(s3.deleteObjects(deleteParams).promise());
|
||||
const [error] = await safe(s3.deleteObjects(deleteParams));
|
||||
if (error) {
|
||||
progressCallback({ message: `Unable to remove ${deleteParams.Key} ${error.message || error.code}` });
|
||||
throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${deleteParams.Key}. error: ${error.message}`);
|
||||
@@ -523,17 +500,14 @@ async function testConfig(apiConfig) {
|
||||
if ('acceptSelfSignedCerts' in apiConfig && typeof apiConfig.acceptSelfSignedCerts !== 'boolean') throw new BoxError(BoxError.BAD_FIELD, 'acceptSelfSignedCerts must be a boolean');
|
||||
if ('s3ForcePathStyle' in apiConfig && typeof apiConfig.s3ForcePathStyle !== 'boolean') throw new BoxError(BoxError.BAD_FIELD, 's3ForcePathStyle must be a boolean');
|
||||
|
||||
// attempt to upload and delete a file with new credentials
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
const putParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(apiConfig.prefix, 'snapshot/cloudron-testfile'),
|
||||
Body: 'testcontent'
|
||||
};
|
||||
|
||||
const s3 = new aws.S3(_.omit(credentials, 'retryDelayOptions', 'maxRetries'));
|
||||
const [putError] = await safe(s3.putObject(putParams).promise());
|
||||
const s3 = createS3Client(apiConfig, {});
|
||||
const [putError] = await safe(s3.putObject(putParams));
|
||||
if (putError) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error put object cloudron-testfile. Message: ${putError.message} HTTP Code: ${putError.code}`);
|
||||
|
||||
const listParams = {
|
||||
@@ -542,7 +516,7 @@ async function testConfig(apiConfig) {
|
||||
MaxKeys: 1
|
||||
};
|
||||
|
||||
const [listError] = await safe(s3.listObjects(listParams).promise());
|
||||
const [listError] = await safe(s3.listObjects(listParams));
|
||||
if (listError) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects. Message: ${listError.message} HTTP Code: ${listError.code}`);
|
||||
|
||||
const delParams = {
|
||||
@@ -550,7 +524,7 @@ async function testConfig(apiConfig) {
|
||||
Key: path.join(apiConfig.prefix, 'snapshot/cloudron-testfile')
|
||||
};
|
||||
|
||||
const [delError] = await safe(s3.deleteObject(delParams).promise());
|
||||
const [delError] = await safe(s3.deleteObject(delParams));
|
||||
if (delError) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error del object cloudron-testfile. Message: ${delError.message} HTTP Code: ${delError.code}`);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user