diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index bd7bbf897..0173ce75f 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -18,8 +18,10 @@ const assert = require('assert'), hush = require('../hush.js'), once = require('../once.js'), path = require('path'), + promiseRetry = require('../promise-retry.js'), safe = require('safetydance'), storage = require('../storage.js'), + stream = require('stream'), syncer = require('../syncer.js'), util = require('util'); @@ -162,49 +164,41 @@ function downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback, debug(`downloadDir: ${backupFilePath} to ${dataLayout.toString()}`); - function downloadFile(entry, done) { + async function downloadFile(entry) { let relativePath = path.relative(backupFilePath, entry.fullPath); if (backupConfig.encryptedFilenames) { const { error, result } = hush.decryptFilePath(relativePath, backupConfig.encryption); - if (error) return done(new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file')); + if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file'); relativePath = result; } const destFilePath = dataLayout.toLocalPath('./' + relativePath); - fs.mkdir(path.dirname(destFilePath), { recursive: true }, function (error) { - if (error) return done(new BoxError(BoxError.FS_ERROR, error.message)); + const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(destFilePath), { recursive: true })); + if (mkdirError) throw new BoxError(BoxError.FS_ERROR, mkdirError.message); - async.retry({ times: 5, interval: 20000 }, async function (retryCallback) { - const [downloadError, sourceStream] = await safe(storage.api(backupConfig.provider).download(backupConfig, entry.fullPath)); - if (downloadError) { - progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${downloadError.message}` }); - return retryCallback(downloadError); - } + await promiseRetry({ times: 5, interval: 20000 }, async function () { + const [downloadError, sourceStream] = await safe(storage.api(backupConfig.provider).download(backupConfig, entry.fullPath)); + if (downloadError) { + progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${downloadError.message}` }); + throw downloadError; + } - const destStream = hush.createWriteStream(destFilePath, backupConfig.encryption); + const destStream = hush.createWriteStream(destFilePath, backupConfig.encryption); - // protect against multiple errors. must destroy the write stream so that a previous retry does not write - const closeAndRetry = once((error) => { - if (error) progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${error.message}` }); - else progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} finished` }); - sourceStream.destroy(); - destStream.destroy(); - retryCallback(error); - }); + destStream.on('progress', function (progress) { + const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); + if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong + progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` }); + }); - destStream.on('progress', function (progress) { - const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); - if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong - progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` }); - }); - destStream.on('error', closeAndRetry); + progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` }); - sourceStream.on('error', closeAndRetry); - - progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` }); - - sourceStream.pipe(destStream, { end: true }).on('done', closeAndRetry); - }, done); + const [pipelineError] = await safe(stream.promises.pipeline(sourceStream, destStream)); + if (pipelineError) { + progressCallback({ message: `Download error ${entry.fullPath} to ${destFilePath}: ${pipelineError.message}` }); + throw pipelineError; + } + progressCallback({ message: `Download finished ${entry.fullPath} to ${destFilePath}` }); }); }