Process delete commands before add commands

This is required for cases where a dir becomes a file (or vice-versa)
This commit is contained in:
Girish Ramakrishnan
2017-09-26 15:28:04 -07:00
parent a9e89b57d9
commit 859311f9e5
2 changed files with 26 additions and 15 deletions
+24 -13
View File
@@ -1,6 +1,7 @@
'use strict';
var assert = require('assert'),
async = require('async'),
debug = require('debug')('box:syncer'),
path = require('path'),
paths = require('./paths.js'),
@@ -28,14 +29,14 @@ function readTree(dir) {
return list.map(function (e) { return { stat: safe.fs.lstatSync(path.join(dir, e)), name: e }; });
}
// TODO: concurrency
// TODO: if dir became a file, remove the dir first
function sync(dir, taskProcessor, callback) {
function sync(dir, taskProcessor, concurrency, callback) {
assert.strictEqual(typeof dir, 'string');
assert.strictEqual(typeof taskProcessor, 'function');
assert.strictEqual(typeof concurrency, 'number');
assert.strictEqual(typeof callback, 'function');
var curCacheIndex = 0;
var curCacheIndex = 0, addQueue = [ ], delQueue = [ ];
var cacheFile = path.join(paths.SNAPSHOT_DIR, path.basename(dir) + '.cache'),
newCacheFile = path.join(paths.SNAPSHOT_DIR, path.basename(dir) + '.cache.new');
@@ -44,11 +45,9 @@ function sync(dir, taskProcessor, callback) {
var newCacheFd = safe.fs.openSync(newCacheFile, 'w'); // truncates any existing file
if (newCacheFd === -1) return callback(new Error('Error opening new cache file: ' + safe.error.message));
var dummyCallback = function() { };
function advanceCache(entryPath) {
for (; curCacheIndex !== cache.length && (entryPath === '' || cache[curCacheIndex].path < entryPath); ++curCacheIndex) {
taskProcessor({ operation: 'remove', path: cache[curCacheIndex].path }, dummyCallback);
delQueue.push({ operation: 'remove', path: cache[curCacheIndex].path });
}
}
@@ -74,11 +73,11 @@ function sync(dir, taskProcessor, callback) {
if (curCacheIndex !== cache.length && cache[curCacheIndex].path === entryPath) {
if (stat.mtime.getTime() !== cache[curCacheIndex].mtime) {
taskProcessor({ operation: 'add', path: entryPath }, dummyCallback);
addQueue.push({ operation: 'add', path: entryPath });
}
++curCacheIndex;
} else {
taskProcessor({ operation: 'add', path: entryPath }, dummyCallback);
addQueue.push({ operation: 'add', path: entryPath });
}
}
}
@@ -86,11 +85,23 @@ function sync(dir, taskProcessor, callback) {
traverse('');
advanceCache(''); // remove rest of the cache entries
// move the new cache file
safe.fs.closeSync(newCacheFd);
safe.fs.unlinkSync(cacheFile);
if (!safe.fs.renameSync(newCacheFile, cacheFile)) debug('Unable to save new cache file');
debug('Processing %s deletes and %s additions', delQueue.length, addQueue.length);
callback();
async.eachLimit(delQueue, concurrency, taskProcessor, function (error) {
debug('Done processing deletes', error);
async.eachLimit(addQueue, concurrency, taskProcessor, function (error) {
debug('Done processing adds', error);
if (error) return callback(error);
safe.fs.unlinkSync(cacheFile);
if (!safe.fs.renameSync(newCacheFile, cacheFile)) debug('Unable to save new cache file');
callback();
});
});
}