Fix uploader API to handle write stream errors

When the upload is aborted/abandoed because the source file is missing,
the write stream is never destroyed. This dangling write stream can
later error and cause a crash.

Instead, create the write stream on demand and nearer to pipeline() to
make sure we do 'error' handling.
This commit is contained in:
Girish Ramakrishnan
2026-01-20 22:22:02 +01:00
parent 944f163882
commit 42cefd56eb
6 changed files with 36 additions and 30 deletions

View File

@@ -54,14 +54,16 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) {
progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong
});
// careful not to have async code between here and pipeline() for 'error' handling
const hash = new HashStream();
const destStream = uploader.createStream();
let pipeline = null;
if (encryption) {
const encryptStream = new EncryptStream(encryption);
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, uploader.stream));
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, hash, destStream));
} else {
pipeline = safe(stream.pipeline(sourceStream, ps, hash, uploader.stream));
pipeline = safe(stream.pipeline(sourceStream, ps, hash, destStream));
}
const [error] = await safe(pipeline);

View File

@@ -146,16 +146,17 @@ async function tarPack(dataLayout, encryption, uploader, progressCallback) {
progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` });
});
// careful not to have async code between here and pipeline() for 'error' handling
const pack = tar.pack();
const hash = new HashStream();
const destStream = uploader.createStream();
let pipeline = null;
if (encryption) {
const encryptStream = new EncryptStream(encryption);
pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, hash, uploader.stream));
pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, hash, destStream));
} else {
pipeline = safe(stream.pipeline(pack, gzip, ps, hash, uploader.stream));
pipeline = safe(stream.pipeline(pack, gzip, ps, hash, destStream));
}
let fileCount = 0;

View File

@@ -117,7 +117,9 @@ async function upload(config, limits, remotePath) {
await safe(fs.promises.unlink(fullRemotePath)); // remove any hardlink
return {
stream: fs.createWriteStream(fullRemotePath, { autoClose: true }),
createStream() {
return fs.createWriteStream(fullRemotePath, { autoClose: true });
},
async finish() {
const backupUid = parseInt(process.env.SUDO_UID, 10) || process.getuid(); // in test, upload() may or may not be called via sudo script
if (hasChownSupportSync(config)) {

View File

@@ -76,12 +76,8 @@ async function upload(config, limits, remotePath) {
debug(`Uploading to ${fullRemotePath}`);
const uploadStream = getBucket(config)
.file(fullRemotePath)
.createWriteStream({ resumable: false });
return {
stream: uploadStream,
createStream() { return getBucket(config).file(fullRemotePath).createWriteStream({ resumable: false }); },
async finish() {}
};
}

View File

@@ -67,7 +67,7 @@ async function upload(config, limits, backupFilePath) {
assert.strictEqual(typeof limits, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
// Result: { stream, finish() callback }
// Result: { createStream(), finish() callback }
throw new BoxError(BoxError.NOT_IMPLEMENTED, 'upload is not implemented');
}

View File

@@ -182,27 +182,32 @@ async function upload(config, limits, remotePath) {
// s3: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html (max 10k parts and no size limit on the last part!)
const partSize = limits.uploadPartSize || (config._provider === 'scaleway-objectstorage' ? 100 * 1024 * 1024 : 10 * 1024 * 1024);
const passThrough = new PassThrough();
const options = {
client: s3,
params: {
Bucket: config.bucket,
Key: path.join(config.prefix, remotePath),
Body: passThrough
},
partSize,
queueSize: 3,
leavePartsOnError: false
};
const managedUpload = constants.TEST ? new globalThis.S3MockUpload(options) : new Upload(options);
managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`));
const uploadPromise = managedUpload.done();
let uploadPromise = null;
return {
stream: passThrough,
createStream() {
const passThrough = new PassThrough();
const options = {
client: s3,
params: {
Bucket: config.bucket,
Key: path.join(config.prefix, remotePath),
Body: passThrough
},
partSize,
queueSize: 3,
leavePartsOnError: false
};
const managedUpload = constants.TEST ? new globalThis.S3MockUpload(options) : new Upload(options);
managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`));
uploadPromise = managedUpload.done();
return passThrough;
},
async finish() {
if (!uploadPromise) return; // stream was never created
const [error, data] = await safe(uploadPromise);
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Upload error: code: ${error.Code} message: ${error.message}`); // sometimes message is null
debug(`Upload finished. ${JSON.stringify(data)}`);