diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index 7a9c8913e..335b52e4a 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -54,14 +54,16 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong }); + // careful not to have async code between here and pipeline() for 'error' handling const hash = new HashStream(); + const destStream = uploader.createStream(); let pipeline = null; if (encryption) { const encryptStream = new EncryptStream(encryption); - pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, uploader.stream)); + pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, destStream)); } else { - pipeline = safe(stream.pipeline(sourceStream, ps, hash, uploader.stream)); + pipeline = safe(stream.pipeline(sourceStream, ps, hash, destStream)); } const [error] = await safe(pipeline); diff --git a/src/backupformat/tgz.js b/src/backupformat/tgz.js index 6c2ff9908..241936d33 100644 --- a/src/backupformat/tgz.js +++ b/src/backupformat/tgz.js @@ -146,16 +146,17 @@ async function tarPack(dataLayout, encryption, uploader, progressCallback) { progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` }); }); + // careful not to have async code between here and pipeline() for 'error' handling const pack = tar.pack(); - const hash = new HashStream(); + const destStream = uploader.createStream(); let pipeline = null; if (encryption) { const encryptStream = new EncryptStream(encryption); - pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, hash, uploader.stream)); + pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, hash, destStream)); } else { - pipeline = safe(stream.pipeline(pack, gzip, ps, hash, uploader.stream)); + pipeline = safe(stream.pipeline(pack, gzip, ps, hash, destStream)); } let fileCount = 0; diff --git a/src/storage/filesystem.js b/src/storage/filesystem.js index 2c2271fd9..c7604f4ab 100644 --- a/src/storage/filesystem.js +++ b/src/storage/filesystem.js @@ -117,7 +117,9 @@ async function upload(config, limits, remotePath) { await safe(fs.promises.unlink(fullRemotePath)); // remove any hardlink return { - stream: fs.createWriteStream(fullRemotePath, { autoClose: true }), + createStream() { + return fs.createWriteStream(fullRemotePath, { autoClose: true }); + }, async finish() { const backupUid = parseInt(process.env.SUDO_UID, 10) || process.getuid(); // in test, upload() may or may not be called via sudo script if (hasChownSupportSync(config)) { diff --git a/src/storage/gcs.js b/src/storage/gcs.js index fe735b327..a7dd43f44 100644 --- a/src/storage/gcs.js +++ b/src/storage/gcs.js @@ -76,12 +76,8 @@ async function upload(config, limits, remotePath) { debug(`Uploading to ${fullRemotePath}`); - const uploadStream = getBucket(config) - .file(fullRemotePath) - .createWriteStream({ resumable: false }); - return { - stream: uploadStream, + createStream() { return getBucket(config).file(fullRemotePath).createWriteStream({ resumable: false }); }, async finish() {} }; } diff --git a/src/storage/interface.js b/src/storage/interface.js index b47867cf1..942de67dc 100644 --- a/src/storage/interface.js +++ b/src/storage/interface.js @@ -67,7 +67,7 @@ async function upload(config, limits, backupFilePath) { assert.strictEqual(typeof limits, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); - // Result: { stream, finish() callback } + // Result: { createStream(), finish() callback } throw new BoxError(BoxError.NOT_IMPLEMENTED, 'upload is not implemented'); } diff --git a/src/storage/s3.js b/src/storage/s3.js index 139e223a5..954d51219 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -182,27 +182,32 @@ async function upload(config, limits, remotePath) { // s3: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html (max 10k parts and no size limit on the last part!) const partSize = limits.uploadPartSize || (config._provider === 'scaleway-objectstorage' ? 100 * 1024 * 1024 : 10 * 1024 * 1024); - const passThrough = new PassThrough(); - - const options = { - client: s3, - params: { - Bucket: config.bucket, - Key: path.join(config.prefix, remotePath), - Body: passThrough - }, - partSize, - queueSize: 3, - leavePartsOnError: false - }; - - const managedUpload = constants.TEST ? new globalThis.S3MockUpload(options) : new Upload(options); - managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`)); - const uploadPromise = managedUpload.done(); + let uploadPromise = null; return { - stream: passThrough, + createStream() { + const passThrough = new PassThrough(); + + const options = { + client: s3, + params: { + Bucket: config.bucket, + Key: path.join(config.prefix, remotePath), + Body: passThrough + }, + partSize, + queueSize: 3, + leavePartsOnError: false + }; + + const managedUpload = constants.TEST ? new globalThis.S3MockUpload(options) : new Upload(options); + managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`)); + uploadPromise = managedUpload.done(); + + return passThrough; + }, async finish() { + if (!uploadPromise) return; // stream was never created const [error, data] = await safe(uploadPromise); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Upload error: code: ${error.Code} message: ${error.message}`); // sometimes message is null debug(`Upload finished. ${JSON.stringify(data)}`);