diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index a7de143a2..24ff4f166 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -77,6 +77,29 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { await uploader.finish(); } +async function processChange(change, backupConfig, remotePath, dataLayout, progressCallback) { + debug('sync: processing task: %j', change); + // the empty task.path is special to signify the directory + const destPath = change.path && backupConfig.encryptedFilenames ? hush.encryptFilePath(change.path, backupConfig.encryption) : change.path; + const backupFilePath = path.join(getBackupFilePath(backupConfig, remotePath), destPath); + + if (change.operation === 'removedir') { + debug(`Removing directory ${backupFilePath}`); + await storage.api(backupConfig.provider).removeDir(backupConfig, backupFilePath, progressCallback); + } else if (change.operation === 'remove') { + debug(`Removing ${backupFilePath}`); + await storage.api(backupConfig.provider).remove(backupConfig, backupFilePath); + } else if (change.operation === 'add') { + await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => { + progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') }); + debug(`Adding ${change.path} position ${change.position} try ${retryCount}`); + + const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath); + await addFile(dataLayout.toLocalPath('./' + change.path), backupConfig.encryption, uploader, progressCallback); + }); + } +} + async function sync(backupConfig, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupConfig, 'object'); assert.strictEqual(typeof remotePath, 'string'); @@ -86,28 +109,17 @@ async function sync(backupConfig, remotePath, dataLayout, progressCallback) { // the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB const concurrency = backupConfig.limits?.syncConcurrency || (backupConfig.provider === 's3' ? 20 : 10); - await syncer.sync(dataLayout, async function processTask(task) { - debug('sync: processing task: %j', task); - // the empty task.path is special to signify the directory - const destPath = task.path && backupConfig.encryptedFilenames ? hush.encryptFilePath(task.path, backupConfig.encryption) : task.path; - const backupFilePath = path.join(getBackupFilePath(backupConfig, remotePath), destPath); + const changes = await syncer.sync(dataLayout); + debug(`sync: processing ${changes.delQueue.length} deletes and ${changes.addQueue.length} additions`); - if (task.operation === 'removedir') { - debug(`Removing directory ${backupFilePath}`); - await storage.api(backupConfig.provider).removeDir(backupConfig, backupFilePath, progressCallback); - } else if (task.operation === 'remove') { - debug(`Removing ${backupFilePath}`); - await storage.api(backupConfig.provider).remove(backupConfig, backupFilePath); - } else if (task.operation === 'add') { - await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => { - progressCallback({ message: `Adding ${task.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') }); - debug(`Adding ${task.path} position ${task.position} try ${retryCount}`); + const [delError] = await safe(async.eachLimit(changes.delQueue, concurrency, async (task) => await processChange(task, backupConfig, remotePath, dataLayout, progressCallback))); + debug('sync: done processing deletes. error: %o', delError); - const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath); - await addFile(dataLayout.toLocalPath('./' + task.path), backupConfig.encryption, uploader, progressCallback); - }); - } - }, concurrency); + const [addError] = await safe(async.eachLimit(changes.addQueue, concurrency, async (task) => await processChange(task, backupConfig, remotePath, dataLayout, progressCallback))); + debug('sync: done processing adds. error: %o', addError); + if (addError) throw addError; + + await syncer.finalize(changes); } // this is not part of 'snapshotting' because we need root access to traverse diff --git a/src/syncer.js b/src/syncer.js index 40cac5ab9..374874092 100644 --- a/src/syncer.js +++ b/src/syncer.js @@ -1,18 +1,17 @@ 'use strict'; const assert = require('assert'), - async = require('async'), BoxError = require('./boxerror.js'), DataLayout = require('./datalayout.js'), debug = require('debug')('box:syncer'), fs = require('fs'), path = require('path'), paths = require('./paths.js'), - safe = require('safetydance'), - util = require('util'); + safe = require('safetydance'); exports = module.exports = { - sync: util.promisify(sync) + sync, + finalize }; function readCache(cacheFile) { @@ -64,19 +63,16 @@ function ISFILE(x) { return (x & fs.constants.S_IFREG) === fs.constants.S_IFREG; } -function sync(dataLayout, taskProcessor, concurrency, callback) { +async function sync(dataLayout) { assert(dataLayout instanceof DataLayout, 'Expecting dataLayout to be a DataLayout'); - assert.strictEqual(typeof taskProcessor, 'function'); - assert.strictEqual(typeof concurrency, 'number'); - assert.strictEqual(typeof callback, 'function'); - const addQueue = [], delQueue = []; + const addQueue = [], delQueue = []; // separate queues. we have to process the del first and then the add let curCacheIndex = 0; const cacheFile = path.join(paths.BACKUP_INFO_DIR, dataLayout.getBasename() + '.sync.cache'), newCacheFile = path.join(paths.BACKUP_INFO_DIR, dataLayout.getBasename() + '.sync.cache.new'); - let cache = [ ]; + let cache = []; // if cache is missing or if we crashed/errored in previous run, start out empty. TODO: do a remote listDir and rebuild if (!safe.fs.existsSync(cacheFile)) { @@ -88,7 +84,7 @@ function sync(dataLayout, taskProcessor, concurrency, callback) { } const newCacheFd = safe.fs.openSync(newCacheFile, 'w'); // truncates any existing file - if (newCacheFd === -1) return callback(new BoxError(BoxError.FS_ERROR, 'Error opening new cache file: ' + safe.error.message)); + if (newCacheFd === -1) throw new BoxError(BoxError.FS_ERROR, 'Error opening new cache file: ' + safe.error.message); function advanceCache(entryPath) { let lastRemovedDir = null; @@ -157,21 +153,18 @@ function sync(dataLayout, taskProcessor, concurrency, callback) { safe.fs.closeSync(newCacheFd); - debug(`sync: processing ${delQueue.length} deletes and ${addQueue.length} additions`); - - async.eachLimit(delQueue, concurrency, taskProcessor, function (error) { - debug('sync: done processing deletes. error: %o', error); - - async.eachLimit(addQueue, concurrency, taskProcessor, function (error) { - debug('sync: done processing adds. error: %o', error); - - if (error) return callback(error); - - safe.fs.unlinkSync(cacheFile); - - if (!safe.fs.renameSync(newCacheFile, cacheFile)) debug('Unable to save new cache file'); - - callback(); - }); - }); + return { + delQueue, + addQueue, + cacheFile, + newCacheFile + }; +} + +async function finalize(changes) { + assert.strictEqual(typeof changes, 'object'); + + safe.fs.unlinkSync(changes.cacheFile); + + if (!safe.fs.renameSync(changes.newCacheFile, changes.cacheFile)) debug('Unable to save new cache file'); } diff --git a/src/test/syncer-test.js b/src/test/syncer-test.js index b43bb13c3..965775a1a 100644 --- a/src/test/syncer-test.js +++ b/src/test/syncer-test.js @@ -15,285 +15,202 @@ const createTree = require('./common.js').createTree, safe = require('safetydance'), syncer = require('../syncer.js'); -let gTasks = [ ]; - const gTmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'syncer-test')), gCacheFile = path.join(paths.BACKUP_INFO_DIR, path.basename(gTmpDir) + '.sync.cache'); -function collectTasks(task, callback) { - gTasks.push(task); - callback(); -} - describe('Syncer', function () { before(function () { console.log('Tests are run in %s with cache file %s', gTmpDir, gCacheFile); }); - it('missing cache - removes remote dir', function (done) { - gTasks = [ ]; + async function getTasks(dataLayout) { + const changes = await syncer.sync(dataLayout); + syncer.finalize(changes); + return changes.delQueue.concat(changes.addQueue); + } + + it('missing cache - removes remote dir', async function () { safe.fs.unlinkSync(gCacheFile); createTree(gTmpDir, { }); - let dataLayout = new DataLayout(gTmpDir, []); - - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - - expect(gTasks).to.eql([ - { operation: 'removedir', path: '', reason: 'nocache' } - ]); - done(); - }); + const dataLayout = new DataLayout(gTmpDir, []); + const tasks = await getTasks(dataLayout); + expect(tasks).to.eql([{ operation: 'removedir', path: '', reason: 'nocache' }]); }); - it('empty cache - adds all', function (done) { - gTasks = [ ]; + it('empty cache - adds all', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'src': { 'index.js': 'some code' }, 'test': { 'test.js': 'This is a README' }, 'walrus': 'animal' }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + const tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - - expect(gTasks).to.eql([ - { operation: 'add', path: 'src/index.js', reason: 'new', position: 0 }, - { operation: 'add', path: 'test/test.js', reason: 'new', position: 1 }, - { operation: 'add', path: 'walrus', reason: 'new', position: 2 } - ]); - done(); - }); + expect(tasks).to.eql([ + { operation: 'add', path: 'src/index.js', reason: 'new', position: 0 }, + { operation: 'add', path: 'test/test.js', reason: 'new', position: 1 }, + { operation: 'add', path: 'walrus', reason: 'new', position: 2 } + ]); }); - it('empty cache - deep', function (done) { - gTasks = [ ]; + it('empty cache - deep', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { a: { b: { c: { d: { e: 'some code' } } } } }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + const tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - - expect(gTasks).to.eql([ - { operation: 'add', path: 'a/b/c/d/e', reason: 'new', position: 0 } - ]); - done(); - }); + expect(tasks).to.eql([ + { operation: 'add', path: 'a/b/c/d/e', reason: 'new', position: 0 } + ]); }); - it('ignores special files', function (done) { - gTasks = [ ]; + it('ignores special files', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'link:file': '/tmp', 'readme': 'this is readme' }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + const tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - - expect(gTasks).to.eql([ - { operation: 'add', path: 'readme', reason: 'new', position: 0 } - ]); - done(); - }); + expect(tasks).to.eql([ + { operation: 'add', path: 'readme', reason: 'new', position: 0 } + ]); }); - it('adds changed files', function (done) { - gTasks = [ ]; + it('adds changed files', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'src': { 'index.js': 'some code' }, 'test': { 'test.js': 'This is a README' }, 'walrus': 'animal' }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(3); + expect(tasks.length).to.be(3); - execSync('touch src/index.js test/test.js', { cwd: gTmpDir }); + execSync('touch src/index.js test/test.js', { cwd: gTmpDir }); - gTasks = [ ]; - let dataLayout = new DataLayout(gTmpDir, []); + tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - - expect(gTasks).to.eql([ - { operation: 'add', path: 'src/index.js', reason: 'changed', position: 0 }, - { operation: 'add', path: 'test/test.js', reason: 'changed', position: 1 } - ]); - - done(); - }); - }); + expect(tasks).to.eql([ + { operation: 'add', path: 'src/index.js', reason: 'changed', position: 0 }, + { operation: 'add', path: 'test/test.js', reason: 'changed', position: 1 } + ]); }); - it('removes missing files', function (done) { - gTasks = [ ]; + it('removes missing files', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'src': { 'index.js': 'some code' }, 'test': { 'test.js': 'This is a README' }, 'walrus': 'animal' }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(3); + expect(tasks.length).to.be(3); - execSync('rm src/index.js walrus', { cwd: gTmpDir }); + execSync('rm src/index.js walrus', { cwd: gTmpDir }); - gTasks = [ ]; - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + tasks = await getTasks(dataLayout); - expect(gTasks).to.eql([ - { operation: 'remove', path: 'src/index.js', reason: 'missing' }, - { operation: 'remove', path: 'walrus', reason: 'missing' } - ]); - - done(); - }); - }); + expect(tasks).to.eql([ + { operation: 'remove', path: 'src/index.js', reason: 'missing' }, + { operation: 'remove', path: 'walrus', reason: 'missing' } + ]); }); - it('removes missing dirs', function (done) { - gTasks = [ ]; + it('removes missing dirs', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'src': { 'index.js': 'some code' }, 'test': { 'test.js': 'This is a README' }, 'walrus': 'animal' }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(3); + expect(tasks.length).to.be(3); - execSync('rm -rf src test', { cwd: gTmpDir }); + execSync('rm -rf src test', { cwd: gTmpDir }); - gTasks = [ ]; - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + tasks = await getTasks(dataLayout); - expect(gTasks).to.eql([ - { operation: 'removedir', path: 'src', reason: 'missing' }, - { operation: 'removedir', path: 'test', reason: 'missing' } - ]); - - done(); - }); - }); + expect(tasks).to.eql([ + { operation: 'removedir', path: 'src', reason: 'missing' }, + { operation: 'removedir', path: 'test', reason: 'missing' } + ]); }); - it('all files disappeared', function (done) { - gTasks = [ ]; + it('all files disappeared', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'src': { 'index.js': 'some code' }, 'test': { 'test.js': 'This is a README' }, 'walrus': 'animal' }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(3); + expect(tasks.length).to.be(3); - execSync('find . -delete', { cwd: gTmpDir }); + execSync('find . -delete', { cwd: gTmpDir }); - gTasks = [ ]; - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + tasks = await getTasks(dataLayout); - expect(gTasks).to.eql([ - { operation: 'removedir', path: 'src', reason: 'missing' }, - { operation: 'removedir', path: 'test', reason: 'missing' }, - { operation: 'remove', path: 'walrus', reason: 'missing' } - ]); - - done(); - }); - }); + expect(tasks).to.eql([ + { operation: 'removedir', path: 'src', reason: 'missing' }, + { operation: 'removedir', path: 'test', reason: 'missing' }, + { operation: 'remove', path: 'walrus', reason: 'missing' } + ]); }); - it('no redundant deletes', function (done) { - gTasks = [ ]; + it('no redundant deletes', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { a: { b: { c: { d: { e: 'some code' } } } } }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(1); + expect(tasks.length).to.be(1); - execSync('rm -r a/b; touch a/f', { cwd: gTmpDir }); + execSync('rm -r a/b; touch a/f', { cwd: gTmpDir }); - gTasks = [ ]; - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + tasks = await getTasks(dataLayout); - expect(gTasks).to.eql([ - { operation: 'removedir', path: 'a/b', reason: 'missing' }, - { operation: 'add', path: 'a/f', reason: 'new', position: 0 } - ]); - - done(); - }); - }); + expect(tasks).to.eql([ + { operation: 'removedir', path: 'a/b', reason: 'missing' }, + { operation: 'add', path: 'a/f', reason: 'new', position: 0 } + ]); }); - it('file became dir', function (done) { - gTasks = [ ]; + it('file became dir', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'data': { 'src': { 'index.js': 'some code' }, 'test': { 'test.js': 'This is a README' }, 'walrus': 'animal' } }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(3); + expect(tasks.length).to.be(3); - execSync('rm data/test/test.js; mkdir data/test/test.js; touch data/test/test.js/trick', { cwd: gTmpDir }); + execSync('rm data/test/test.js; mkdir data/test/test.js; touch data/test/test.js/trick', { cwd: gTmpDir }); - gTasks = [ ]; - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + tasks = await getTasks(dataLayout); - expect(gTasks).to.eql([ - { operation: 'remove', path: 'data/test/test.js', reason: 'wasfile' }, - { operation: 'add', path: 'data/test/test.js/trick', reason: 'new', position: 0 } - ]); - - done(); - }); - }); + expect(tasks).to.eql([ + { operation: 'remove', path: 'data/test/test.js', reason: 'wasfile' }, + { operation: 'add', path: 'data/test/test.js/trick', reason: 'new', position: 0 } + ]); }); - it('dir became file', function (done) { - gTasks = [ ]; + it('dir became file', async function () { fs.writeFileSync(gCacheFile, '', 'utf8'); createTree(gTmpDir, { 'src': { 'index.js': 'some code' }, 'test': { 'test.js': 'this', 'test2.js': 'test' }, 'walrus': 'animal' }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(4); + expect(tasks.length).to.be(4); - execSync('rm -r test; touch test', { cwd: gTmpDir }); + execSync('rm -r test; touch test', { cwd: gTmpDir }); - gTasks = [ ]; - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + tasks = await getTasks(dataLayout); - expect(gTasks).to.eql([ - { operation: 'removedir', path: 'test', reason: 'wasdir' }, - { operation: 'add', path: 'test', reason: 'wasdir', position: 0 } - ]); - - done(); - }); - }); + expect(tasks).to.eql([ + { operation: 'removedir', path: 'test', reason: 'wasdir' }, + { operation: 'add', path: 'test', reason: 'wasdir', position: 0 } + ]); }); - it('is complicated', function (done) { - gTasks = [ ]; + it('is complicated', async function () { createTree(gTmpDir, { a: 'data', a2: 'data', @@ -311,49 +228,37 @@ describe('Syncer', function () { } }); - let dataLayout = new DataLayout(gTmpDir, []); + const dataLayout = new DataLayout(gTmpDir, []); + let tasks = await getTasks(dataLayout); - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + execSync(`rm a; \ + mkdir a; \ + touch a/file; \ + rm a2; \ + touch b; \ + rm file g/file; \ + ln -s /tmp h; \ + rm -r j/l; + touch j/k/file; \ + rmdir j/m;`, { cwd: gTmpDir }); - execSync(`rm a; \ - mkdir a; \ - touch a/file; \ - rm a2; \ - touch b; \ - rm file g/file; \ - ln -s /tmp h; \ - rm -r j/l; - touch j/k/file; \ - rmdir j/m;`, { cwd: gTmpDir }); + tasks = await getTasks(dataLayout); - gTasks = [ ]; - let dataLayout = new DataLayout(gTmpDir, []); + expect(tasks).to.eql([ + { operation: 'remove', path: 'a', reason: 'wasfile' }, + { operation: 'remove', path: 'a2', reason: 'missing' }, + { operation: 'remove', path: 'file', reason: 'missing' }, + { operation: 'remove', path: 'g/file', reason: 'missing' }, + { operation: 'removedir', path: 'j/l', reason: 'missing' }, + { operation: 'removedir', path: 'j/m', reason: 'missing' }, - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); + { operation: 'add', path: 'a/file', reason: 'new', position: 0 }, + { operation: 'add', path: 'b', reason: 'changed', position: 1 }, + { operation: 'add', path: 'j/k/file', reason: 'new', position: 2 }, + ]); - expect(gTasks).to.eql([ - { operation: 'remove', path: 'a', reason: 'wasfile' }, - { operation: 'remove', path: 'a2', reason: 'missing' }, - { operation: 'remove', path: 'file', reason: 'missing' }, - { operation: 'remove', path: 'g/file', reason: 'missing' }, - { operation: 'removedir', path: 'j/l', reason: 'missing' }, - { operation: 'removedir', path: 'j/m', reason: 'missing' }, + tasks = await getTasks(dataLayout); - { operation: 'add', path: 'a/file', reason: 'new', position: 0 }, - { operation: 'add', path: 'b', reason: 'changed', position: 1 }, - { operation: 'add', path: 'j/k/file', reason: 'new', position: 2 }, - ]); - - gTasks = [ ]; - syncer.sync(dataLayout, collectTasks, 10, function (error) { - expect(error).to.not.be.ok(); - expect(gTasks.length).to.be(0); - - done(); - }); - }); - }); + expect(tasks.length).to.be(0); }); });