From 859311f9e5e5c95787d04539b2e07d6e1a67a32a Mon Sep 17 00:00:00 2001 From: Girish Ramakrishnan Date: Tue, 26 Sep 2017 15:28:04 -0700 Subject: [PATCH] Process delete commands before add commands This is required for cases where a dir becomes a file (or vice-versa) --- src/backups.js | 4 ++-- src/syncer.js | 37 ++++++++++++++++++++++++------------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/backups.js b/src/backups.js index aa7e9cf46..0265c361b 100644 --- a/src/backups.js +++ b/src/backups.js @@ -217,7 +217,7 @@ function createTarPackStream(sourceDir, key) { function sync(backupConfig, backupId, dataDir, callback) { syncer.sync(dataDir, function processTask(task, iteratorCallback) { - debug('syncer task: %j', task); + debug('processing task: %j', task); if (task.operation === 'add') { var stream = fs.createReadStream(path.join(dataDir, task.path)); stream.on('error', function () { return iteratorCallback(); }); // ignore error if file disappears @@ -225,7 +225,7 @@ function sync(backupConfig, backupId, dataDir, callback) { } else if (task.operation === 'remove') { api(backupConfig.provider).remove(backupConfig, getBackupFilePath(backupConfig, backupId, task.path), iteratorCallback); } - }, function (error) { + }, 10 /* concurrency */, function (error) { if (error) return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); callback(); diff --git a/src/syncer.js b/src/syncer.js index 9ab306db5..63d199b3e 100644 --- a/src/syncer.js +++ b/src/syncer.js @@ -1,6 +1,7 @@ 'use strict'; var assert = require('assert'), + async = require('async'), debug = require('debug')('box:syncer'), path = require('path'), paths = require('./paths.js'), @@ -28,14 +29,14 @@ function readTree(dir) { return list.map(function (e) { return { stat: safe.fs.lstatSync(path.join(dir, e)), name: e }; }); } -// TODO: concurrency -// TODO: if dir became a file, remove the dir first -function sync(dir, taskProcessor, callback) { +function sync(dir, taskProcessor, concurrency, callback) { assert.strictEqual(typeof dir, 'string'); assert.strictEqual(typeof taskProcessor, 'function'); + assert.strictEqual(typeof concurrency, 'number'); assert.strictEqual(typeof callback, 'function'); - var curCacheIndex = 0; + var curCacheIndex = 0, addQueue = [ ], delQueue = [ ]; + var cacheFile = path.join(paths.SNAPSHOT_DIR, path.basename(dir) + '.cache'), newCacheFile = path.join(paths.SNAPSHOT_DIR, path.basename(dir) + '.cache.new'); @@ -44,11 +45,9 @@ function sync(dir, taskProcessor, callback) { var newCacheFd = safe.fs.openSync(newCacheFile, 'w'); // truncates any existing file if (newCacheFd === -1) return callback(new Error('Error opening new cache file: ' + safe.error.message)); - var dummyCallback = function() { }; - function advanceCache(entryPath) { for (; curCacheIndex !== cache.length && (entryPath === '' || cache[curCacheIndex].path < entryPath); ++curCacheIndex) { - taskProcessor({ operation: 'remove', path: cache[curCacheIndex].path }, dummyCallback); + delQueue.push({ operation: 'remove', path: cache[curCacheIndex].path }); } } @@ -74,11 +73,11 @@ function sync(dir, taskProcessor, callback) { if (curCacheIndex !== cache.length && cache[curCacheIndex].path === entryPath) { if (stat.mtime.getTime() !== cache[curCacheIndex].mtime) { - taskProcessor({ operation: 'add', path: entryPath }, dummyCallback); + addQueue.push({ operation: 'add', path: entryPath }); } ++curCacheIndex; } else { - taskProcessor({ operation: 'add', path: entryPath }, dummyCallback); + addQueue.push({ operation: 'add', path: entryPath }); } } } @@ -86,11 +85,23 @@ function sync(dir, taskProcessor, callback) { traverse(''); advanceCache(''); // remove rest of the cache entries - // move the new cache file safe.fs.closeSync(newCacheFd); - safe.fs.unlinkSync(cacheFile); - if (!safe.fs.renameSync(newCacheFile, cacheFile)) debug('Unable to save new cache file'); + debug('Processing %s deletes and %s additions', delQueue.length, addQueue.length); - callback(); + async.eachLimit(delQueue, concurrency, taskProcessor, function (error) { + debug('Done processing deletes', error); + + async.eachLimit(addQueue, concurrency, taskProcessor, function (error) { + debug('Done processing adds', error); + + if (error) return callback(error); + + safe.fs.unlinkSync(cacheFile); + + if (!safe.fs.renameSync(newCacheFile, cacheFile)) debug('Unable to save new cache file'); + + callback(); + }); + }); } \ No newline at end of file