Revert "merge the done callback into the main code"
This reverts commit c39bec8cc1.
This was committed with extra stuff by mistake
This commit is contained in:
@@ -231,32 +231,32 @@ function sync(backupConfig, backupId, dataDir, callback) {
|
||||
assert.strictEqual(typeof dataDir, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
function setBackupProgress(message) {
|
||||
debug(message);
|
||||
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, message);
|
||||
}
|
||||
|
||||
syncer.sync(dataDir, function processTask(task, iteratorCallback) {
|
||||
debug('sync: processing task: %j', task);
|
||||
var backupFilePath = path.join(getBackupFilePath(backupConfig, backupId, backupConfig.format), task.path);
|
||||
|
||||
if (task.operation === 'removedir') {
|
||||
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, `Removing directory ${task.path}`);
|
||||
return api(backupConfig.provider).removeDir(backupConfig, backupFilePath)
|
||||
.on('progress', function (detail) {
|
||||
debug(`sync: ${detail}`);
|
||||
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, detail);
|
||||
})
|
||||
.on('done', iteratorCallback);
|
||||
} else if (task.operation === 'remove') {
|
||||
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, `Removing ${task.path}`);
|
||||
return api(backupConfig.provider).remove(backupConfig, backupFilePath, iteratorCallback);
|
||||
}
|
||||
|
||||
var retryCount = 0;
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
++retryCount;
|
||||
debug(`${task.operation} ${task.path} try ${retryCount}`);
|
||||
if (task.operation === 'add') {
|
||||
setBackupProgress(`Adding ${task.path}`);
|
||||
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, `Adding ${task.path}`);
|
||||
var stream = fs.createReadStream(path.join(dataDir, task.path));
|
||||
stream.on('error', function () { return retryCallback(); }); // ignore error if file disappears
|
||||
api(backupConfig.provider).upload(backupConfig, backupFilePath, stream, retryCallback);
|
||||
} else if (task.operation === 'removedir') {
|
||||
setBackupProgress(`Removing directory ${task.path}`);
|
||||
return api(backupConfig.provider).removeDir(backupConfig, backupFilePath)
|
||||
.on('progress', setBackupProgress)
|
||||
.on('done', retryCallback);
|
||||
} else if (task.operation === 'remove') {
|
||||
setBackupProgress(`Removing ${task.path}`);
|
||||
return api(backupConfig.provider).remove(backupConfig, backupFilePath, retryCallback);
|
||||
}
|
||||
}, iteratorCallback);
|
||||
}, 10 /* concurrency */, function (error) {
|
||||
|
||||
@@ -68,7 +68,7 @@ function remove(apiConfig, filename, callback) {
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
// Result: none. Should not error if file is not found
|
||||
// Result: none
|
||||
|
||||
callback(new Error('not implemented'));
|
||||
}
|
||||
@@ -77,7 +77,7 @@ function removeDir(apiConfig, pathPrefix) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
|
||||
// Result: none. Should not error if dir is not found
|
||||
// Result: none
|
||||
var events = new EventEmitter();
|
||||
process.nextTick(function () { events.emit('done', new Error('not implemented')); });
|
||||
return events;
|
||||
|
||||
@@ -122,6 +122,15 @@ function upload(apiConfig, backupFilePath, sourceStream, callback) {
|
||||
assert.strictEqual(typeof sourceStream, 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
function done(error) {
|
||||
if (error) {
|
||||
debug('[%s] upload: s3 upload error.', backupFilePath, error);
|
||||
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`));
|
||||
}
|
||||
|
||||
callback(null);
|
||||
}
|
||||
|
||||
getS3Config(apiConfig, function (error, credentials) {
|
||||
if (error) return callback(error);
|
||||
|
||||
@@ -135,14 +144,7 @@ function upload(apiConfig, backupFilePath, sourceStream, callback) {
|
||||
|
||||
// s3.upload automatically does a multi-part upload. we set queueSize to 1 to reduce memory usage
|
||||
// uploader will buffer at most queueSize * partSize bytes into memory at any given time.
|
||||
s3.upload(params, { partSize: 10 * 1024 * 1024, queueSize: 1 }, function (error) {
|
||||
if (error) {
|
||||
debug('[%s] upload: s3 upload error.', backupFilePath, error);
|
||||
return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, `Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`));
|
||||
}
|
||||
|
||||
callback(null);
|
||||
});
|
||||
return s3.upload(params, { partSize: 10 * 1024 * 1024, queueSize: 1 }, done);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -412,11 +414,10 @@ function remove(apiConfig, filename, callback) {
|
||||
}
|
||||
};
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
s3.deleteObjects(deleteParams, function (error) {
|
||||
if (error) debug(`remove: Unable to remove ${filename}. ${error.message}`);
|
||||
if (error) debug('remove: Unable to remove %s. Not fatal.', deleteParams.Key, error);
|
||||
|
||||
callback(error);
|
||||
callback(null);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -438,10 +439,9 @@ function removeDir(apiConfig, pathPrefix) {
|
||||
|
||||
events.emit('progress', `Removing ${contents.length} files from ${contents[0].Key} to ${contents[contents.length-1].Key}`);
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
s3.deleteObjects(deleteParams, function (error /*, deleteData */) {
|
||||
if (error) {
|
||||
events.emit('progress', `Unable to remove ${contents.length} files from ${contents[0].Key} to ${contents[contents.length-1].Key}: ${error.message}`);
|
||||
events.emit('progress', `Unable to remove ${deleteParams.Key} ${error.message}`);
|
||||
return iteratorCallback(error);
|
||||
}
|
||||
|
||||
@@ -452,9 +452,8 @@ function removeDir(apiConfig, pathPrefix) {
|
||||
listDir(apiConfig, pathPrefix, function (s3, objects, done) {
|
||||
total += objects.length;
|
||||
|
||||
// digitalocean spaces takes too long to delete 1000 objects at a time
|
||||
const chunkSize = apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100;
|
||||
var chunks = chunk(objects, chunkSize);
|
||||
const batchSize = apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100; // throttle objects in each request
|
||||
var chunks = batchSize === 1 ? objects : chunk(objects, batchSize);
|
||||
|
||||
async.eachSeries(chunks, deleteFiles.bind(null, s3), done);
|
||||
}, function (error) {
|
||||
|
||||
Reference in New Issue
Block a user