s3: do not rely on entry.s3
This commit is contained in:
@@ -207,8 +207,7 @@ function listDir(apiConfig, dir, batchSize, iteratorCallback, callback) {
|
||||
|
||||
if (listData.Contents.length === 0) return foreverCallback(new Error('Done'));
|
||||
|
||||
// TODO: remove s3
|
||||
const entries = listData.Contents.map(function (c) { return { fullPath: c.Key, size: c.Size, s3: s3 }; });
|
||||
const entries = listData.Contents.map(function (c) { return { fullPath: c.Key, size: c.Size }; });
|
||||
|
||||
iteratorCallback(entries, function (error) {
|
||||
if (error) return foreverCallback(error);
|
||||
@@ -251,94 +250,98 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
var events = new EventEmitter(), retryCount = 0;
|
||||
|
||||
function copyFile(entry, iteratorCallback) {
|
||||
const s3 = entry.s3;
|
||||
var relativePath = path.relative(oldFilePath, entry.fullPath);
|
||||
getS3Config(apiConfig, function (error, credentials) {
|
||||
if (error) return iteratorCallback(error);
|
||||
|
||||
function done(error) {
|
||||
if (error) debug(`copy: s3 copy error when copying ${entry.fullPath}: ${error}`);
|
||||
var s3 = new AWS.S3(credentials);
|
||||
var relativePath = path.relative(oldFilePath, entry.fullPath);
|
||||
|
||||
if (error && error.code === 'NoSuchKey') return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, `Old backup not found: ${entry.fullPath}`));
|
||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error copying ${entry.fullPath} : ${error.code} ${error}`));
|
||||
function done(error) {
|
||||
if (error) debug(`copy: s3 copy error when copying ${entry.fullPath}: ${error}`);
|
||||
|
||||
iteratorCallback(null);
|
||||
}
|
||||
if (error && error.code === 'NoSuchKey') return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, `Old backup not found: ${entry.fullPath}`));
|
||||
if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error copying ${entry.fullPath} : ${error.code} ${error}`));
|
||||
|
||||
var copyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath)
|
||||
};
|
||||
iteratorCallback(null);
|
||||
}
|
||||
|
||||
// S3 copyObject has a file size limit of 5GB so if we have larger files, we do a multipart copy
|
||||
// Exoscale takes too long to copy 5GB
|
||||
const largeFileLimit = apiConfig.provider === 'exoscale-sos' ? 1024 * 1024 * 1024 : 5 * 1024 * 1024 * 1024;
|
||||
var copyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath)
|
||||
};
|
||||
|
||||
if (entry.size < largeFileLimit) {
|
||||
events.emit('progress', `Copying ${relativePath || oldFilePath}`);
|
||||
// S3 copyObject has a file size limit of 5GB so if we have larger files, we do a multipart copy
|
||||
// Exoscale takes too long to copy 5GB
|
||||
const largeFileLimit = apiConfig.provider === 'exoscale-sos' ? 1024 * 1024 * 1024 : 5 * 1024 * 1024 * 1024;
|
||||
|
||||
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
|
||||
s3.copyObject(copyParams, done).on('retry', function (response) {
|
||||
++retryCount;
|
||||
events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
|
||||
// on DO, we get a random 408. these are not retried by the SDK
|
||||
if (response.error) response.error.retryable = true; // https://github.com/aws/aws-sdk-js/issues/412
|
||||
});
|
||||
if (entry.size < largeFileLimit) {
|
||||
events.emit('progress', `Copying ${relativePath || oldFilePath}`);
|
||||
|
||||
return;
|
||||
}
|
||||
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
|
||||
s3.copyObject(copyParams, done).on('retry', function (response) {
|
||||
++retryCount;
|
||||
events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
|
||||
// on DO, we get a random 408. these are not retried by the SDK
|
||||
if (response.error) response.error.retryable = true; // https://github.com/aws/aws-sdk-js/issues/412
|
||||
});
|
||||
|
||||
events.emit('progress', `Copying (multipart) ${relativePath || oldFilePath}`);
|
||||
return;
|
||||
}
|
||||
|
||||
s3.createMultipartUpload(copyParams, function (error, result) {
|
||||
if (error) return done(error);
|
||||
events.emit('progress', `Copying (multipart) ${relativePath || oldFilePath}`);
|
||||
|
||||
// Exoscale (96M) was suggested by exoscale. 1GB - rather random size for others
|
||||
const chunkSize = apiConfig.provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024;
|
||||
var uploadId = result.UploadId;
|
||||
var uploadedParts = [];
|
||||
var partNumber = 1;
|
||||
var startBytes = 0;
|
||||
var endBytes = 0;
|
||||
var size = entry.size-1;
|
||||
s3.createMultipartUpload(copyParams, function (error, result) {
|
||||
if (error) return done(error);
|
||||
|
||||
function copyNextChunk() {
|
||||
endBytes = startBytes + chunkSize;
|
||||
if (endBytes > size) endBytes = size;
|
||||
// Exoscale (96M) was suggested by exoscale. 1GB - rather random size for others
|
||||
const chunkSize = apiConfig.provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024;
|
||||
var uploadId = result.UploadId;
|
||||
var uploadedParts = [];
|
||||
var partNumber = 1;
|
||||
var startBytes = 0;
|
||||
var endBytes = 0;
|
||||
var size = entry.size-1;
|
||||
|
||||
var params = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302
|
||||
CopySourceRange: 'bytes=' + startBytes + '-' + endBytes,
|
||||
PartNumber: partNumber,
|
||||
UploadId: uploadId
|
||||
};
|
||||
|
||||
s3.uploadPartCopy(params, function (error, result) {
|
||||
if (error) return done(error);
|
||||
|
||||
uploadedParts.push({ ETag: result.CopyPartResult.ETag, PartNumber: partNumber });
|
||||
|
||||
if (endBytes < size) {
|
||||
startBytes = endBytes + 1;
|
||||
partNumber++;
|
||||
return copyNextChunk();
|
||||
}
|
||||
function copyNextChunk() {
|
||||
endBytes = startBytes + chunkSize;
|
||||
if (endBytes > size) endBytes = size;
|
||||
|
||||
var params = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
MultipartUpload: { Parts: uploadedParts },
|
||||
CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302
|
||||
CopySourceRange: 'bytes=' + startBytes + '-' + endBytes,
|
||||
PartNumber: partNumber,
|
||||
UploadId: uploadId
|
||||
};
|
||||
|
||||
s3.completeMultipartUpload(params, done);
|
||||
}).on('retry', function (response) {
|
||||
++retryCount;
|
||||
events.emit('progress', `Retrying (${response.retryCount+1}) multipart copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
|
||||
});
|
||||
}
|
||||
s3.uploadPartCopy(params, function (error, result) {
|
||||
if (error) return done(error);
|
||||
|
||||
copyNextChunk();
|
||||
uploadedParts.push({ ETag: result.CopyPartResult.ETag, PartNumber: partNumber });
|
||||
|
||||
if (endBytes < size) {
|
||||
startBytes = endBytes + 1;
|
||||
partNumber++;
|
||||
return copyNextChunk();
|
||||
}
|
||||
|
||||
var params = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
MultipartUpload: { Parts: uploadedParts },
|
||||
UploadId: uploadId
|
||||
};
|
||||
|
||||
s3.completeMultipartUpload(params, done);
|
||||
}).on('retry', function (response) {
|
||||
++retryCount;
|
||||
events.emit('progress', `Retrying (${response.retryCount+1}) multipart copy of ${relativePath || oldFilePath}. Error: ${response.error} ${response.httpResponse.statusCode}`);
|
||||
});
|
||||
}
|
||||
|
||||
copyNextChunk();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -394,34 +397,39 @@ function removeDir(apiConfig, pathPrefix) {
|
||||
var events = new EventEmitter();
|
||||
var total = 0;
|
||||
|
||||
listDir(apiConfig, pathPrefix, 1000, function listDirIterator(entries, done) {
|
||||
total += entries.length;
|
||||
getS3Config(apiConfig, function (error, credentials) {
|
||||
if (error) events.emit('done', error);
|
||||
|
||||
const chunkSize = apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100; // throttle objects in each request
|
||||
var chunks = chunk(entries, chunkSize);
|
||||
var s3 = new AWS.S3(credentials);
|
||||
|
||||
async.eachSeries(chunks, function deleteFiles(objects, iteratorCallback) {
|
||||
const s3 = objects[0].s3;
|
||||
var deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Delete: {
|
||||
Objects: objects.map(function (o) { return { Key: o.fullPath }; })
|
||||
}
|
||||
};
|
||||
listDir(apiConfig, pathPrefix, 1000, function listDirIterator(entries, done) {
|
||||
total += entries.length;
|
||||
|
||||
events.emit('progress', `Removing ${objects.length} files from ${objects[0].fullPath} to ${objects[objects.length-1].Key}`);
|
||||
const chunkSize = apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100; // throttle objects in each request
|
||||
var chunks = chunk(entries, chunkSize);
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
s3.deleteObjects(deleteParams, function (error /*, deleteData */) {
|
||||
if (error) events.emit('progress', `Unable to remove ${deleteParams.Key} ${error.message}`);
|
||||
async.eachSeries(chunks, function deleteFiles(objects, iteratorCallback) {
|
||||
var deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Delete: {
|
||||
Objects: objects.map(function (o) { return { Key: o.fullPath }; })
|
||||
}
|
||||
};
|
||||
|
||||
iteratorCallback(error);
|
||||
});
|
||||
}, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', `Removed ${total} files`);
|
||||
events.emit('progress', `Removing ${objects.length} files from ${objects[0].fullPath} to ${objects[objects.length-1].Key}`);
|
||||
|
||||
events.emit('done', error);
|
||||
// deleteObjects does not return error if key is not found
|
||||
s3.deleteObjects(deleteParams, function (error /*, deleteData */) {
|
||||
if (error) events.emit('progress', `Unable to remove ${deleteParams.Key} ${error.message}`);
|
||||
|
||||
iteratorCallback(error);
|
||||
});
|
||||
}, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', `Removed ${total} files`);
|
||||
|
||||
events.emit('done', error);
|
||||
});
|
||||
});
|
||||
|
||||
return events;
|
||||
|
||||
Reference in New Issue
Block a user