Various backup fixes

This commit is contained in:
Girish Ramakrishnan
2017-10-11 13:57:05 -07:00
parent 28ce5f41e3
commit dd68c8f91f
4 changed files with 62 additions and 42 deletions

View File

@@ -71,10 +71,10 @@ function getCaasConfig(apiConfig, callback) {
secretAccessKey: result.body.credentials.SecretAccessKey,
sessionToken: result.body.credentials.SessionToken,
region: apiConfig.region || 'us-east-1',
httpOptions: {
timeout: 60000
},
maxRetries: 5
maxRetries: 5,
retryDelayOptions: {
base: 20000 // 2^5 * 20 seconds
}
};
if (apiConfig.endpoint) credentials.endpoint = new AWS.Endpoint(apiConfig.endpoint);
@@ -100,16 +100,16 @@ function getS3Config(apiConfig, callback) {
accessKeyId: apiConfig.accessKeyId,
secretAccessKey: apiConfig.secretAccessKey,
region: apiConfig.region || 'us-east-1',
httpOptions: {
timeout: 60000
},
maxRetries: 5
maxRetries: 5,
retryDelayOptions: {
base: 20000 // 2^5 * 20 seconds
}
};
if (apiConfig.endpoint) credentials.endpoint = apiConfig.endpoint;
if (apiConfig.acceptSelfSignedCerts === true) {
credentials.httpOptions = {
credentials.httpOptions.agent = {
agent: new https.Agent({ rejectUnauthorized: false })
};
}
@@ -190,7 +190,7 @@ function download(apiConfig, backupFilePath, callback) {
});
}
function listDir(apiConfig, backupFilePath, options, iteratorCallback, callback) {
function listDir(apiConfig, backupFilePath, batchSize, iteratorCallback, callback) {
getS3Config(apiConfig, function (error, credentials) {
if (error) return callback(error);
@@ -200,8 +200,6 @@ function listDir(apiConfig, backupFilePath, options, iteratorCallback, callback)
Prefix: backupFilePath
};
var total = 0;
async.forever(function listAndDownload(foreverCallback) {
s3.listObjects(listParams, function (error, listData) {
if (error) {
@@ -209,15 +207,12 @@ function listDir(apiConfig, backupFilePath, options, iteratorCallback, callback)
return foreverCallback(error);
}
debug('listDir: processing %s files (processed %s so far). From "%s"', listData.Contents.length, total, listParams.Marker || '');
var arr = batchSize === 1 ? listData.Contents : chunk(listData.Contents, batchSize);
if (arr.length === 0) return foreverCallback(new Error('Done'));
var arr = options.batchSize === 1 ? listData.Contents : chunk(listData.Contents, options.batchSize);
async.eachLimit(arr, options.concurrency, iteratorCallback.bind(null, s3), function iteratorDone(error) {
iteratorCallback(s3, arr, function (error) {
if (error) return foreverCallback(error);
total += listData.Contents.length;
if (!listData.IsTruncated) return foreverCallback(new Error('Done'));
listParams.Marker = listData.Contents[listData.Contents.length - 1].Key; // NextMarker is returned only with delimiter
@@ -241,10 +236,9 @@ function downloadDir(apiConfig, backupFilePath, destDir) {
var events = new EventEmitter();
var total = 0;
listDir(apiConfig, backupFilePath, { batchSize: 1, concurrency: 10 }, function downloadFile(s3, content, iteratorCallback) {
function downloadFile(s3, content, iteratorCallback) {
var relativePath = path.relative(backupFilePath, content.Key);
++total;
events.emit('progress', `Downloading ${relativePath}`);
mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) {
@@ -266,6 +260,13 @@ function downloadDir(apiConfig, backupFilePath, destDir) {
destStream.on('finish', iteratorCallback);
});
});
}
const concurrency = 10, batchSize = 1;
listDir(apiConfig, backupFilePath, batchSize, function (s3, objects, done) {
total += objects.length;
async.eachLimit(objects, concurrency, downloadFile.bind(null, s3), done);
}, function (error) {
events.emit('progress', `Downloaded ${total} files`);
@@ -280,19 +281,16 @@ function copy(apiConfig, oldFilePath, newFilePath) {
assert.strictEqual(typeof oldFilePath, 'string');
assert.strictEqual(typeof newFilePath, 'string');
var events = new EventEmitter();
var total = 0;
var events = new EventEmitter(), retryCount = 0;
listDir(apiConfig, oldFilePath, { batchSize: 1, concurrency: 10 }, function copyFile(s3, content, iteratorCallback) {
function copyFile(s3, content, iteratorCallback) {
var relativePath = path.relative(oldFilePath, content.Key);
++total;
function done(error) {
if (error && error.code === 'NoSuchKey') return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, `Old backup not found: ${content.Key}`));
if (error) {
debug('copy: s3 copy error when copying %s', content.Key, error);
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error copying ${content.Key} : ${error.message}`));
debug('copy: s3 copy error when copying %s %s', content.Key, error);
return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error copying ${content.Key} : ${error.message} ${error.code}`));
}
iteratorCallback(null);
@@ -304,12 +302,17 @@ function copy(apiConfig, oldFilePath, newFilePath) {
};
// S3 copyObject has a file size limit of 5GB so if we have larger files, we do a multipart copy
if (content.Size < 5 * 1024 * 1024 * 1024) {
if (content.Size < 5 * 1024 * 1024 * 1024 || apiConfig.provider === 'digitalocean-spaces') { // DO has not implemented this yet
events.emit('progress', `Copying ${relativePath}`);
// for exoscale, '/' should not be encoded
copyParams.CopySource = path.join(apiConfig.bucket, encodeURIComponent(content.Key)); // See aws-sdk-js/issues/1302
return s3.copyObject(copyParams, done);
s3.copyObject(copyParams, done).on('retry', function (response) {
++retryCount;
events.emit('progress', `Retrying (${response.retryCount+1}) copy of ${relativePath}. Status code: ${response.httpResponse.statusCode}`);
});
return;
}
events.emit('progress', `Copying (multipart) ${relativePath}`);
@@ -357,11 +360,27 @@ function copy(apiConfig, oldFilePath, newFilePath) {
};
s3.completeMultipartUpload(params, done);
}).on('retry', function (response) {
++retryCount;
events.emit('progress', `Retrying (${response.retryCount+1}) multipart copy of ${relativePath}. Status code: ${response.httpResponse.statusCode}`);
});
}
copyNextChunk();
});
}
const batchSize = 1;
var total = 0, concurrency = 4;
listDir(apiConfig, oldFilePath, batchSize, function (s3, objects, done) {
total += objects.length;
if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5);
events.emit('progress', `${retryCount} errors. concurrency set to ${concurrency}`);
retryCount = 0;
async.eachLimit(objects, concurrency, copyFile.bind(null, s3), done);
}, function (error) {
events.emit('progress', `Copied ${total} files`);
@@ -403,7 +422,7 @@ function removeDir(apiConfig, pathPrefix) {
var events = new EventEmitter();
var total = 0;
listDir(apiConfig, pathPrefix, { batchSize: 1000, concurrency: 1 }, function deleteFiles(s3, contents, iteratorCallback) {
function deleteFiles(s3, contents, iteratorCallback) {
var deleteParams = {
Bucket: apiConfig.bucket,
Delete: {
@@ -416,14 +435,17 @@ function removeDir(apiConfig, pathPrefix) {
s3.deleteObjects(deleteParams, function (error /*, deleteData */) {
if (error) {
debug('removeDir: Unable to remove %s. Not fatal.', deleteParams.Key, error);
events.emit('progress', `Unable to remove ${deleteParams.Key} ${error.message}`);
return iteratorCallback(error);
}
// debug('removeDir: Deleted: %j Errors: %j', deleteData.Deleted, deleteData.Errors);
iteratorCallback();
});
}, function (error) {
}
const batchSize = apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100; // throttle requests per second
listDir(apiConfig, pathPrefix, batchSize, deleteFiles, function (error) {
events.emit('progress', `Removed ${total} files`);
events.emit('done', error);