Sprinkle retries in syncer logic

This commit is contained in:
Girish Ramakrishnan
2017-10-10 14:25:03 -07:00
parent fd3b73bea2
commit 56d794745b
+30 -21
View File
@@ -229,18 +229,20 @@ function sync(backupConfig, backupId, dataDir, callback) {
debug('sync: processing task: %j', task);
var backupFilePath = path.join(getBackupFilePath(backupConfig, backupId, backupConfig.format), task.path);
if (task.operation === 'add') {
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, 'Adding ' + task.path);
var stream = fs.createReadStream(path.join(dataDir, task.path));
stream.on('error', function () { return iteratorCallback(); }); // ignore error if file disappears
api(backupConfig.provider).upload(backupConfig, backupFilePath, stream, iteratorCallback);
} else if (task.operation === 'remove') {
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, 'Removing ' + task.path);
api(backupConfig.provider).remove(backupConfig, backupFilePath, iteratorCallback);
} else if (task.operation === 'removedir') {
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, 'Removing directory ' + task.path);
api(backupConfig.provider).removeDir(backupConfig, backupFilePath, iteratorCallback);
}
async.retry({ times: 3, interval: 2000 }, function (retryCallback) {
if (task.operation === 'add') {
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, 'Adding ' + task.path);
var stream = fs.createReadStream(path.join(dataDir, task.path));
stream.on('error', function () { return retryCallback(); }); // ignore error if file disappears
api(backupConfig.provider).upload(backupConfig, backupFilePath, stream, retryCallback);
} else if (task.operation === 'remove') {
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, 'Removing ' + task.path);
api(backupConfig.provider).remove(backupConfig, backupFilePath, retryCallback);
} else if (task.operation === 'removedir') {
safe.fs.writeFileSync(paths.BACKUP_RESULT_FILE, 'Removing directory ' + task.path);
api(backupConfig.provider).removeDir(backupConfig, backupFilePath, retryCallback);
}
}, iteratorCallback);
}, 10 /* concurrency */, function (error) {
if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message));
@@ -275,9 +277,12 @@ function upload(backupId, format, dataDir, callback) {
if (error) return callback(new BackupsError(BackupsError.INTERNAL_ERROR, error));
if (format === 'tgz') {
var tarStream = createTarPackStream(dataDir, backupConfig.key || null);
tarStream.on('error', callback); // already returns BackupsError
api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, backupId, format), tarStream, callback);
async.retry({ times: 3, interval: 3000 }, function (retryCallback) {
var tarStream = createTarPackStream(dataDir, backupConfig.key || null);
tarStream.on('error', retryCallback); // already returns BackupsError
api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, backupId, format), tarStream, retryCallback);
}, callback);
} else {
async.series([
saveEmptyDirs.bind(null, dataDir),
@@ -491,10 +496,12 @@ function rotateBoxBackup(backupConfig, timestamp, appBackupIds, callback) {
progress.setDetail(progress.BACKUP, 'Rotating box snapshot');
var copy = api(backupConfig.provider).copy(backupConfig, getBackupFilePath(backupConfig, 'snapshot/box', format), getBackupFilePath(backupConfig, backupId, format));
copy.on('progress', function (detail) { progress.setDetail(progress.BACKUP, detail); });
async.retry({ times: 3, interval: 2000 }, function (retryCallback) {
var copy = api(backupConfig.provider).copy(backupConfig, getBackupFilePath(backupConfig, 'snapshot/box', format), getBackupFilePath(backupConfig, backupId, format));
copy.on('progress', function (detail) { progress.setDetail(progress.BACKUP, detail); });
copy.on('done', function (copyBackupError) {
copy.on('done', retryCallback);
}, function (copyBackupError) {
const state = copyBackupError ? backupdb.BACKUP_STATE_ERROR : backupdb.BACKUP_STATE_NORMAL;
backupdb.update(backupId, { state: state }, function (error) {
@@ -593,10 +600,12 @@ function rotateAppBackup(backupConfig, app, timestamp, callback) {
progress.setDetail(progress.BACKUP, 'Rotating app snapshot');
var copy = api(backupConfig.provider).copy(backupConfig, getBackupFilePath(backupConfig, `snapshot/app_${app.id}`, format), getBackupFilePath(backupConfig, backupId, format));
copy.on('progress', function (detail) { progress.setDetail(progress.BACKUP, detail); });
async.retry({ times: 3, interval: 2000 }, function (retryCallback) {
var copy = api(backupConfig.provider).copy(backupConfig, getBackupFilePath(backupConfig, `snapshot/app_${app.id}`, format), getBackupFilePath(backupConfig, backupId, format));
copy.on('progress', function (detail) { progress.setDetail(progress.BACKUP, detail); });
copy.on('done', function (copyBackupError) {
copy.on('done', retryCallback);
}, function (copyBackupError) {
const state = copyBackupError ? backupdb.BACKUP_STATE_ERROR : backupdb.BACKUP_STATE_NORMAL;
debugApp(app, 'rotateAppBackup: successful id:%s', backupId);