diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index 5656ad349..d802eeb27 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -15,16 +15,16 @@ const assert = require('assert'), DataLayout = require('../datalayout.js'), { DecryptStream } = require('../hush.js'), debug = require('debug')('box:backupformat/rsync'), + { EncryptStream } = require('../hush.js'), fs = require('fs'), hush = require('../hush.js'), - once = require('../once.js'), path = require('path'), ProgressStream = require('../progress-stream.js'), promiseRetry = require('../promise-retry.js'), safe = require('safetydance'), shell = require('../shell.js'), storage = require('../storage.js'), - stream = require('stream'), + stream = require('stream/promises'), syncer = require('../syncer.js'), util = require('util'); @@ -38,6 +38,46 @@ function getBackupFilePath(backupConfig, remotePath) { return path.join(backupConfig.rootPath, remotePath); } +async function addFile(sourceFile, encryption, uploader, progressCallback) { + assert.strictEqual(typeof sourceFile, 'string'); + assert.strictEqual(typeof encryption, 'object'); + assert.strictEqual(typeof uploader, 'object'); + assert.strictEqual(typeof progressCallback, 'function'); + + // make sure file can be opened for reading before we start the pipeline. otherwise, we end up with + // destinations dirs/file which are owned by root (this process id) and cannot be copied (run as normal user) + const [openError, sourceHandle] = await safe(fs.promises.open(sourceFile, 'r')); + if (openError) { + debug(`addFile: ignoring disappeared file: ${sourceFile}`); + return; + } + + const sourceStream = sourceHandle.createReadStream(sourceFile, { autoClose: true }); + const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds + ps.on('progress', function (progress) { + const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); + if (!transferred && !speed) return progressCallback({ message: `Uploading ${sourceFile}` }); // 0M@0MBps looks wrong + progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong + }); + + let pipeline = null; + if (encryption) { + const encryptStream = new EncryptStream(encryption); + pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, uploader.stream)); + } else { + pipeline = safe(stream.pipeline(sourceStream, ps, uploader.stream)); + } + + const [error] = await safe(pipeline); + if (error && error.message.includes('ENOENT')) { // ignore error if file disappears + } + + if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); + // debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`); + + await uploader.finish(); +} + function sync(backupConfig, remotePath, dataLayout, progressCallback, callback) { assert.strictEqual(typeof backupConfig, 'object'); assert.strictEqual(typeof remotePath, 'string'); @@ -47,10 +87,8 @@ function sync(backupConfig, remotePath, dataLayout, progressCallback, callback) // the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB const concurrency = backupConfig.limits?.syncConcurrency || (backupConfig.provider === 's3' ? 20 : 10); - const removeDir = util.callbackify(storage.api(backupConfig.provider).removeDir); - const remove = util.callbackify(storage.api(backupConfig.provider).remove); - syncer.sync(dataLayout, function processTask(task, iteratorCallback) { + syncer.sync(dataLayout, async function processTask(task) { debug('sync: processing task: %j', task); // the empty task.path is special to signify the directory const destPath = task.path && backupConfig.encryptedFilenames ? hush.encryptFilePath(task.path, backupConfig.encryption) : task.path; @@ -58,37 +96,19 @@ function sync(backupConfig, remotePath, dataLayout, progressCallback, callback) if (task.operation === 'removedir') { debug(`Removing directory ${backupFilePath}`); - return removeDir(backupConfig, backupFilePath, progressCallback, iteratorCallback); + await storage.api(backupConfig.provider).removeDir(backupConfig, backupFilePath, progressCallback); } else if (task.operation === 'remove') { debug(`Removing ${backupFilePath}`); - return remove(backupConfig, backupFilePath, iteratorCallback); - } - - let retryCount = 0; - async.retry({ times: 5, interval: 20000 }, function (retryCallback) { - retryCallback = once(retryCallback); // protect again upload() erroring much later after read stream error - - ++retryCount; - if (task.operation === 'add') { + await storage.api(backupConfig.provider).remove(backupConfig, backupFilePath); + } else if (task.operation === 'add') { + await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => { progressCallback({ message: `Adding ${task.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') }); debug(`Adding ${task.path} position ${task.position} try ${retryCount}`); - const stream = hush.createReadStream(dataLayout.toLocalPath('./' + task.path), backupConfig.encryption); - stream.on('error', (error) => retryCallback(error.message.includes('ENOENT') ? null : error)); // ignore error if file disappears - stream.on('progress', function (progress) { - const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); - if (!transferred && !speed) return progressCallback({ message: `Uploading ${task.path}` }); // 0M@0MBps looks wrong - progressCallback({ message: `Uploading ${task.path}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong - }); - // only create the destination path when we have confirmation that the source is available. otherwise, we end up with - // files owned as 'root' and the cp later will fail - stream.on('open', function () { - storage.api(backupConfig.provider).upload(backupConfig, backupFilePath, stream, function (error) { - debug(error ? `Error uploading ${task.path} try ${retryCount}: ${error.message}` : `Uploaded ${task.path}`); - retryCallback(error); - }); - }); - } - }, iteratorCallback); + + const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath); + await addFile(dataLayout.toLocalPath('./' + task.path), backupConfig.encryption, uploader, progressCallback); + }); + } }, concurrency, function (error) { if (error) return callback(new BoxError(BoxError.EXTERNAL_ERROR, error.message)); @@ -205,7 +225,7 @@ function downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback, progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` }); - const [pipelineError] = await safe(stream.promises.pipeline(streams)); + const [pipelineError] = await safe(stream.pipeline(streams)); if (pipelineError) { progressCallback({ message: `Download error ${entry.fullPath} to ${destFilePath}: ${pipelineError.message}` }); throw pipelineError; diff --git a/src/hush.js b/src/hush.js index 949974686..c86ff5046 100644 --- a/src/hush.js +++ b/src/hush.js @@ -154,40 +154,10 @@ function decryptFilePath(filePath, encryption) { return { result: decryptedParts.join('/') }; } -function createReadStream(sourceFile, encryption) { - assert.strictEqual(typeof sourceFile, 'string'); - assert.strictEqual(typeof encryption, 'object'); - - const stream = fs.createReadStream(sourceFile); - const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds - - stream.on('error', function (error) { - debug(`createReadStream: read stream error at ${sourceFile}. %o`, error); - ps.emit('error', new BoxError(BoxError.FS_ERROR, `Error reading ${sourceFile}: ${error.message} ${error.code}`)); - }); - - stream.on('open', () => ps.emit('open')); - - if (encryption) { - const encryptStream = new EncryptStream(encryption); - - encryptStream.on('error', function (error) { - debug(`createReadStream: encrypt stream error ${sourceFile}. %o`, error); - ps.emit('error', new BoxError(BoxError.CRYPTO_ERROR, `Encryption error at ${sourceFile}: ${error.message}`)); - }); - - return stream.pipe(encryptStream).pipe(ps); - } else { - return stream.pipe(ps); - } -} - exports = module.exports = { EncryptStream, DecryptStream, encryptFilePath, decryptFilePath, - - createReadStream, };