remove pipeline() chain

it cannot be chained afaict
This commit is contained in:
Girish Ramakrishnan
2023-08-29 17:44:02 +05:30
parent 35828fe1c7
commit 7fe2de448e
2 changed files with 16 additions and 35 deletions
+16 -4
View File
@@ -13,11 +13,13 @@ const assert = require('assert'),
async = require('async'), async = require('async'),
BoxError = require('../boxerror.js'), BoxError = require('../boxerror.js'),
DataLayout = require('../datalayout.js'), DataLayout = require('../datalayout.js'),
{ DecryptStream } = require('../hush.js'),
debug = require('debug')('box:backupformat/rsync'), debug = require('debug')('box:backupformat/rsync'),
fs = require('fs'), fs = require('fs'),
hush = require('../hush.js'), hush = require('../hush.js'),
once = require('../once.js'), once = require('../once.js'),
path = require('path'), path = require('path'),
ProgressStream = require('../progress-stream.js'),
promiseRetry = require('../promise-retry.js'), promiseRetry = require('../promise-retry.js'),
safe = require('safetydance'), safe = require('safetydance'),
storage = require('../storage.js'), storage = require('../storage.js'),
@@ -182,17 +184,27 @@ function downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback,
throw downloadError; throw downloadError;
} }
const destStream = hush.createWriteStream(destFilePath, backupConfig.encryption); const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
ps.on('progress', function (progress) {
destStream.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` }); progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` });
}); });
const destStream = fs.createWriteStream(destFilePath);
const streams = [ sourceStream, ps ];
if (backupConfig.encryption) {
const decryptStream = new DecryptStream(backupConfig.encryption);
streams.push(decryptStream);
}
streams.push(destStream);
progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` }); progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` });
const [pipelineError] = await safe(stream.promises.pipeline(sourceStream, destStream)); const [pipelineError] = await safe(stream.promises.pipeline(streams));
if (pipelineError) { if (pipelineError) {
progressCallback({ message: `Download error ${entry.fullPath} to ${destFilePath}: ${pipelineError.message}` }); progressCallback({ message: `Download error ${entry.fullPath} to ${destFilePath}: ${pipelineError.message}` });
throw pipelineError; throw pipelineError;
-31
View File
@@ -6,7 +6,6 @@ const assert = require('assert'),
debug = require('debug')('box:hush'), debug = require('debug')('box:hush'),
fs = require('fs'), fs = require('fs'),
ProgressStream = require('./progress-stream.js'), ProgressStream = require('./progress-stream.js'),
stream = require('stream'),
TransformStream = require('stream').Transform; TransformStream = require('stream').Transform;
class EncryptStream extends TransformStream { class EncryptStream extends TransformStream {
@@ -181,35 +180,6 @@ function createReadStream(sourceFile, encryption) {
} }
} }
function createWriteStream(destFile, encryption) {
assert.strictEqual(typeof destFile, 'string');
assert.strictEqual(typeof encryption, 'object');
const destStream = fs.createWriteStream(destFile);
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
const streams = [ ps ];
if (encryption) {
const decryptStream = new DecryptStream(encryption);
streams.push(decryptStream);
}
streams.push(destStream);
stream.pipeline(streams, function (error) {
if (error) {
debug(`createWriteStream: write stream error ${destFile}. %o`, error);
ps.emit('error', new BoxError(BoxError.FS_ERROR, `Write error ${destFile}: ${error.message}`));
} else {
debug(`createWriteStream: ${destFile} done`);
return ps.emit('done');
}
});
return ps;
}
exports = module.exports = { exports = module.exports = {
EncryptStream, EncryptStream,
DecryptStream, DecryptStream,
@@ -218,5 +188,4 @@ exports = module.exports = {
decryptFilePath, decryptFilePath,
createReadStream, createReadStream,
createWriteStream
}; };