diff --git a/CHANGES b/CHANGES index 27b38170d..1bb4e0e31 100644 --- a/CHANGES +++ b/CHANGES @@ -2801,4 +2801,5 @@ * ami: disable route53 * mailer: add html version of test mail * sshfs: server side copying +* backups: rewrite tgz backups using tar-stream diff --git a/package-lock.json b/package-lock.json index 790a00121..704235f7e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -47,7 +47,7 @@ "semver": "^7.6.2", "speakeasy": "^2.0.0", "superagent": "9.0.1", - "tar-fs": "github:cloudron-io/tar-fs#ignore_stat_error", + "tar-stream": "^3.1.7", "tldjs": "^2.3.1", "ua-parser-js": "^1.0.38", "underscore": "^1.13.6", @@ -708,6 +708,11 @@ "uuid": "dist/bin/uuid" } }, + "node_modules/b4a": { + "version": "1.6.6", + "resolved": "https://registry.npmjs.org/b4a/-/b4a-1.6.6.tgz", + "integrity": "sha512-5Tk1HLk6b6ctmjIkAcU/Ujv/1WqiDl0F0JdRCR80VsOcUlHcu7pWeWRlOqQLHfDEsVx9YH/aif5AG4ehoCtTmg==" + }, "node_modules/backoff": { "version": "2.5.0", "license": "MIT", @@ -722,6 +727,12 @@ "version": "1.0.0", "license": "MIT" }, + "node_modules/bare-events": { + "version": "2.4.2", + "resolved": "https://registry.npmjs.org/bare-events/-/bare-events-2.4.2.tgz", + "integrity": "sha512-qMKFd2qG/36aA4GwvKq8MxnPgCQAmBWmSyLWsJcbn8v03wvIPQ/hG1Ms8bPzndZxMDoHpxez5VOS+gC9Yi24/Q==", + "optional": true + }, "node_modules/base32.js": { "version": "0.0.1", "license": "MIT" @@ -763,6 +774,52 @@ "node": ">=8" } }, + "node_modules/bl": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz", + "integrity": "sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==", + "dependencies": { + "buffer": "^5.5.0", + "inherits": "^2.0.4", + "readable-stream": "^3.4.0" + } + }, + "node_modules/bl/node_modules/buffer": { + "version": "5.7.1", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", + "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.1.13" + } + }, + "node_modules/bl/node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/bluebird": { "version": "3.7.2", "license": "MIT" @@ -1668,6 +1725,19 @@ "node": ">= 8.0" } }, + "node_modules/dockerode/node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/dockerode/node_modules/tar-fs": { "version": "2.0.1", "license": "MIT", @@ -1678,6 +1748,21 @@ "tar-stream": "^2.0.0" } }, + "node_modules/dockerode/node_modules/tar-stream": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/tar-stream/-/tar-stream-2.2.0.tgz", + "integrity": "sha512-ujeqbceABgwMZxEJnk2HDY2DlnUZ+9oEcb1KzTVfYHio0UE6dG71n60d8D2I4qNvleWrrXpmjpt7vZeF1LnMZQ==", + "dependencies": { + "bl": "^4.0.3", + "end-of-stream": "^1.4.1", + "fs-constants": "^1.0.0", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1" + }, + "engines": { + "node": ">=6" + } + }, "node_modules/dotenv": { "version": "5.0.1", "license": "BSD-2-Clause", @@ -2236,6 +2321,11 @@ "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", "dev": true }, + "node_modules/fast-fifo": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/fast-fifo/-/fast-fifo-1.3.2.tgz", + "integrity": "sha512-/d9sfos4yxzpwkDkuN7k2SqFKtYNmCTzgfEpz82x34IM9/zc8KGxQoXg1liNC/izpRM/MBdt44Nmx41ZWqk+FQ==" + }, "node_modules/fast-json-stable-stringify": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz", @@ -2458,7 +2548,8 @@ }, "node_modules/fs-constants": { "version": "1.0.0", - "license": "MIT" + "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", + "integrity": "sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==" }, "node_modules/fs-extra": { "version": "0.6.4", @@ -4558,6 +4649,11 @@ } ] }, + "node_modules/queue-tick": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/queue-tick/-/queue-tick-1.0.1.tgz", + "integrity": "sha512-kJt5qhMxoszgU/62PLP1CJytzd2NKetjSRnyuj31fDd3Rlcz3fzlFdFLD1SItunPwyqEOkca6GbV612BWfaBag==" + }, "node_modules/quick-lru": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/quick-lru/-/quick-lru-7.0.0.tgz", @@ -5082,6 +5178,19 @@ "node": ">=0.8.0" } }, + "node_modules/streamx": { + "version": "2.18.0", + "resolved": "https://registry.npmjs.org/streamx/-/streamx-2.18.0.tgz", + "integrity": "sha512-LLUC1TWdjVdn1weXGcSxyTR3T4+acB6tVGXT95y0nGbca4t4o/ng1wKAGTljm9VicuCVLvRlqFYXYy5GwgM7sQ==", + "dependencies": { + "fast-fifo": "^1.3.2", + "queue-tick": "^1.0.1", + "text-decoder": "^1.1.0" + }, + "optionalDependencies": { + "bare-events": "^2.2.0" + } + }, "node_modules/string_decoder": { "version": "1.1.1", "license": "MIT", @@ -5170,72 +5279,14 @@ "version": "3.2.4", "license": "MIT" }, - "node_modules/tar-fs": { - "version": "2.0.0", - "resolved": "git+ssh://git@github.com/cloudron-io/tar-fs.git#08e18e67201e352697251fe98c816c9d2afddd38", - "license": "MIT", - "dependencies": { - "chownr": "^1.1.1", - "mkdirp": "^0.5.1", - "pump": "^3.0.0", - "tar-stream": "^2.0.0" - } - }, "node_modules/tar-stream": { - "version": "2.2.0", - "license": "MIT", + "version": "3.1.7", + "resolved": "https://registry.npmjs.org/tar-stream/-/tar-stream-3.1.7.tgz", + "integrity": "sha512-qJj60CXt7IU1Ffyc3NJMjh6EkuCFej46zUqJ4J7pqYlThyd9bO0XBTmcOIhSzZJVWfsLks0+nle/j538YAW9RQ==", "dependencies": { - "bl": "^4.0.3", - "end-of-stream": "^1.4.1", - "fs-constants": "^1.0.0", - "inherits": "^2.0.3", - "readable-stream": "^3.1.1" - }, - "engines": { - "node": ">=6" - } - }, - "node_modules/tar-stream/node_modules/bl": { - "version": "4.0.3", - "license": "MIT", - "dependencies": { - "buffer": "^5.5.0", - "inherits": "^2.0.4", - "readable-stream": "^3.4.0" - } - }, - "node_modules/tar-stream/node_modules/buffer": { - "version": "5.7.1", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT", - "dependencies": { - "base64-js": "^1.3.1", - "ieee754": "^1.1.13" - } - }, - "node_modules/tar-stream/node_modules/readable-stream": { - "version": "3.6.0", - "license": "MIT", - "dependencies": { - "inherits": "^2.0.3", - "string_decoder": "^1.1.1", - "util-deprecate": "^1.0.1" - }, - "engines": { - "node": ">= 6" + "b4a": "^1.6.4", + "fast-fifo": "^1.2.0", + "streamx": "^2.15.0" } }, "node_modules/teeny-request": { @@ -5276,6 +5327,14 @@ "node": ">= 6" } }, + "node_modules/text-decoder": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/text-decoder/-/text-decoder-1.1.0.tgz", + "integrity": "sha512-TmLJNj6UgX8xcUZo4UDStGQtDiTzF7BzWlzn9g7UWrjkpHr5uJTK1ld16wZ3LXb2vb6jH8qU89dW5whuMdXYdw==", + "dependencies": { + "b4a": "^1.6.4" + } + }, "node_modules/text-table": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz", diff --git a/package.json b/package.json index d6c574b69..650f4a2e6 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "semver": "^7.6.2", "speakeasy": "^2.0.0", "superagent": "9.0.1", - "tar-fs": "github:cloudron-io/tar-fs#ignore_stat_error", + "tar-stream": "^3.1.7", "tldjs": "^2.3.1", "ua-parser-js": "^1.0.38", "underscore": "^1.13.6", diff --git a/src/backupformat/tgz.js b/src/backupformat/tgz.js index e67e05bf0..7489009ae 100644 --- a/src/backupformat/tgz.js +++ b/src/backupformat/tgz.js @@ -7,17 +7,18 @@ exports = module.exports = { }; const assert = require('assert'), - async = require('async'), BoxError = require('../boxerror.js'), DataLayout = require('../datalayout.js'), debug = require('debug')('box:backupformat/tgz'), { DecryptStream, EncryptStream } = require('../hush.js'), - once = require('../once.js'), + fs = require('fs'), path = require('path'), ProgressStream = require('../progress-stream.js'), promiseRetry = require('../promise-retry.js'), + safe = require('safetydance'), storage = require('../storage.js'), - tar = require('tar-fs'), + stream = require('stream/promises'), + tar = require('tar-stream'), zlib = require('zlib'); function getBackupFilePath(backupConfig, remotePath) { @@ -33,108 +34,157 @@ function getBackupFilePath(backupConfig, remotePath) { return path.join(rootPath, remotePath + fileType); } -function tarPack(dataLayout, encryption) { +function addToPack(pack, header, options) { + return new Promise((resolve, reject) => { + const packEntry = safe(() => pack.entry(header, function (error) { + if (error) { + debug(`addToPack: error adding ${header.name} ${header.type} ${error.message}`); + reject(error); + } else { + debug(`addToPack: added ${header.name} ${header.type}`); + resolve(); + } + })); + + if (packEntry && options?.input) { + safe(stream.pipeline(options.input, packEntry)); // background + } + + if (!packEntry) { + reject(new Error(`Failed to add ${header.name}: ${safe.error.message}`)); + } + }); +} + +async function tarPack(dataLayout, encryption, uploader, progressCallback) { assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof encryption, 'object'); - - const pack = tar.pack('/', { - dereference: false, // pack the symlink and not what it points to - entries: dataLayout.localPaths(), - ignoreStatError: (path, err) => { - debug(`tarPack: error stat'ing ${path} - ${err.code}`); - return err.code === 'ENOENT'; // ignore if file or dir got removed (probably some temporary file) - }, - map: function(header) { - header.name = dataLayout.toRemotePath(header.name); - // the tar pax format allows us to encode filenames > 100 and size > 8GB (see #640) - // https://www.systutorials.com/docs/linux/man/5-star/ - if (header.size > 8589934590 || header.name > 99) header.pax = { size: header.size }; - return header; - }, - strict: false // do not error for unknown types (skip fifo, char/block devices) - }); + assert.strictEqual(typeof uploader, 'object'); + assert.strictEqual(typeof progressCallback, 'function'); const gzip = zlib.createGzip({}); const ps = new ProgressStream({ interval: 10000 }); // emit 'progress' every 10 seconds - - pack.on('error', function (error) { - debug('tarPack: tar stream error. %o', error); - ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, error.message)); + ps.on('progress', function (progress) { + const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); + if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong + progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` }); }); - gzip.on('error', function (error) { - debug('tarPack: gzip stream error. %o', error); - ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, error.message)); - }); + const pack = tar.pack(); + let pipeline = null; if (encryption) { const encryptStream = new EncryptStream(encryption); - encryptStream.on('error', function (error) { - debug('tarPack: encrypt stream error. %o', error); - ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, error.message)); - }); - - pack.pipe(gzip).pipe(encryptStream).pipe(ps); + pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, uploader.stream)); } else { - pack.pipe(gzip).pipe(ps); + pipeline = safe(stream.pipeline(pack, gzip, ps, uploader.stream)); } - return ps; + for (const localPath of dataLayout.localPaths()) { + const queue = [ localPath ]; + while (queue.length) { + // if (pack.destroyed || outStream.destroyed) break; + const dir = queue.shift(); + debug(`tarPack: processing ${dir}`); + const [readdirError, entries] = await safe(fs.promises.readdir(dir, { withFileTypes: true })); + if (!entries) { + debug(`tarPack: skipping directory ${dir}: ${readdirError.message}`); + continue; + } + const subdirs = []; + for (const entry of entries) { + const abspath = path.join(dir, entry.name); + const headerName = dataLayout.toRemotePath(abspath); + if (entry.isFile()) { + const [openError, handle] = await safe(fs.promises.open(abspath, 'r')); + if (!handle) { debug(`tarPack: skipping file, could not open ${abspath}: ${openError.message}`); continue; } + const [statError, stat] = await safe(handle.stat()); + if (!stat) { debug(`tarPack: skipping file, could not stat ${abspath}: ${statError.message}`); continue; } + const header = { name: headerName, type: 'file', size: stat.size, uid: process.getuid(), gid: process.getgid() }; + if (stat.size > 8589934590 || entry.name.length > 99) header.pax = { size: stat.size }; + const input = handle.createReadStream({ autoClose: true }); + await addToPack(pack, header, { input }); + } else if (entry.isDirectory()) { + const header = { name: headerName, type: 'directory', uid: process.getuid(), gid: process.getgid() }; + await addToPack(pack, header); + subdirs.push(abspath); + } else if (entry.isSymbolicLink()) { + const [readlinkError, target] = await safe(fs.promises.readlink(abspath)); + if (!target) { debug(`tarPack: skipping link, could not readlink ${abspath}: ${readlinkError.message}`); continue; } + const header = { name: headerName, type: 'symlink', linkname: target, uid: process.getuid(), gid: process.getgid() }; + await addToPack(pack, header); + } else { + debug(`tarPack: ignoring unknown type ${entry.name} ${entry.type}`); + } + } + + queue.unshift(...subdirs); // add to front of queue and in order of readdir listing + } + } + + pack.finalize(); + + const [error] = await safe(pipeline); + if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); + debug(`tarPack: pipeline finished: ${JSON.stringify(ps.stats())}`); + + await uploader.finish(); } -function tarExtract(inStream, dataLayout, encryption) { +async function tarExtract(inStream, dataLayout, encryption, progressCallback) { assert.strictEqual(typeof inStream, 'object'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof encryption, 'object'); + assert.strictEqual(typeof progressCallback, 'function'); + + const extract = tar.extract(); + const now = new Date(); + extract.on('entry', async function (header, entryStream, next) { + if (path.isAbsolute(header.name)) { + debug(`tarExtract: ignoring absolute path ${header.name}`); + return next(); + } + const abspath = dataLayout.toLocalPath(header.name); + debug(`tarExtract: ${header.name} ${header.size} ${header.type} to ${abspath}`); + let error = null; + if (header.type === 'directory') { + [error] = await safe(fs.promises.mkdir(abspath, { recursive: true, mode: 0o755 })); + } else if (header.type === 'file') { + const output = fs.createWriteStream(abspath); + [error] = await safe(stream.pipeline(entryStream, output)); + } else if (header.type === 'symlink') { + await safe(fs.promises.unlink(abspath)); // remove any link created from previous failed extract + [error] = await safe(fs.promises.symlink(header.linkname, abspath)); + } else { + debug(`tarExtract: ignoring unknown entry: ${header.name} ${header.type}`); + entryStream.resume(); // drain + } + + if (error) return next(error); + + [error] = await safe(fs.promises.lutimes(abspath, now /* atime */, header.mtime)); // for dirs, mtime will get overwritten + next(error); + }); + extract.on('finish', () => debug('tarExtract: extract finished')); const gunzip = zlib.createGunzip({}); - const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds - const extract = tar.extract('/', { - map: function (header) { - header.name = dataLayout.toLocalPath(header.name); - return header; - }, - dmode: 500 // ensure directory is writable - }); - - const emitError = once((error) => { - inStream.destroy(); - ps.emit('error', error); - }); - - inStream.on('error', function (error) { - debug('tarExtract: input stream error. %o', error); - emitError(new BoxError(BoxError.EXTERNAL_ERROR, error.message)); - }); - - gunzip.on('error', function (error) { - debug('tarExtract: gunzip stream error. %o', error); - emitError(new BoxError(BoxError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('error', function (error) { - debug('tarExtract: extract stream error. %o', error); - emitError(new BoxError(BoxError.EXTERNAL_ERROR, error.message)); - }); - - extract.on('finish', function () { - debug('tarExtract: done.'); - // we use a separate event because ps is a through2 stream which emits 'finish' event indicating end of inStream and not extract - ps.emit('done'); + const ps = new ProgressStream({ interval: 10000 }); + ps.on('progress', function (progress) { + const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); + if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong + progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` }); }); if (encryption) { const decrypt = new DecryptStream(encryption); - decrypt.on('error', function (error) { - debug('tarExtract: decrypt stream error.', error); - emitError(new BoxError(BoxError.EXTERNAL_ERROR, `Failed to decrypt: ${error.message}`)); - }); - inStream.pipe(ps).pipe(decrypt).pipe(gunzip).pipe(extract); + const [error] = await safe(stream.pipeline(inStream, ps, decrypt, gunzip, extract)); + if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } else { - inStream.pipe(ps).pipe(gunzip).pipe(extract); + const [error] = await safe(stream.pipeline(inStream, ps, gunzip, extract)); + if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } - return ps; + debug(`tarExtract: pipeline finished: ${ps.stats()}`); } async function download(backupConfig, remotePath, dataLayout, progressCallback) { @@ -151,17 +201,7 @@ async function download(backupConfig, remotePath, dataLayout, progressCallback) progressCallback({ message: `Downloading backup ${backupFilePath}` }); const sourceStream = await storage.api(backupConfig.provider).download(backupConfig, backupFilePath); - const ps = tarExtract(sourceStream, dataLayout, backupConfig.encryption); - - return await new Promise((resolve, reject) => { - ps.on('progress', function (progress) { - const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); - if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong - progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` }); - }); - ps.on('error', reject); - ps.on('done', resolve); - }); + await tarExtract(sourceStream, dataLayout, backupConfig.encryption, progressCallback); }); } @@ -173,23 +213,12 @@ async function upload(backupConfig, remotePath, dataLayout, progressCallback) { debug(`upload: Uploading ${dataLayout.toString()} to ${remotePath}`); - return new Promise((resolve, reject) => { - async.retry({ times: 5, interval: 20000 }, function (retryCallback) { - retryCallback = once(retryCallback); // protect again upload() erroring much later after tar stream error + const backupFilePath = getBackupFilePath(backupConfig, remotePath); - const tarStream = tarPack(dataLayout, backupConfig.encryption); + await promiseRetry({ times: 5, interval: 20000, debug }, async () => { + progressCallback({ message: `Uploading backup ${backupFilePath}` }); - tarStream.on('progress', function (progress) { - const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); - if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong - progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` }); - }); - tarStream.on('error', retryCallback); // already returns BoxError - - storage.api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, remotePath), tarStream, retryCallback); - }, (error) => { - if (error) return reject(error); - resolve(); - }); + const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath); + await tarPack(dataLayout, backupConfig.encryption, uploader, progressCallback); }); } diff --git a/src/storage/filesystem.js b/src/storage/filesystem.js index a305124eb..6a04bf85f 100644 --- a/src/storage/filesystem.js +++ b/src/storage/filesystem.js @@ -81,42 +81,25 @@ function hasChownSupportSync(apiConfig) { } } -function upload(apiConfig, backupFilePath, sourceStream, callback) { +async function upload(apiConfig, backupFilePath) { assert.strictEqual(typeof apiConfig, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); - assert.strictEqual(typeof sourceStream, 'object'); - assert.strictEqual(typeof callback, 'function'); - fs.mkdir(path.dirname(backupFilePath), { recursive: true }, function (error) { - if (error) return callback(new BoxError(BoxError.EXTERNAL_ERROR, error.message)); + const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(backupFilePath), { recursive: true })); + if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `Error creating directory ${backupFilePath}: ${mkdirError.message}`); - safe.fs.unlinkSync(backupFilePath); // remove any hardlink + await safe(fs.promises.unlink(backupFilePath), { debug }); // remove any hardlink - var fileStream = fs.createWriteStream(backupFilePath); - - // this pattern is required to ensure that the file got created before 'finish' - fileStream.on('open', function () { - sourceStream.pipe(fileStream); - }); - - fileStream.on('error', function (error) { - debug(`upload: [${backupFilePath}] out stream error. %o`, error); - callback(new BoxError(BoxError.EXTERNAL_ERROR, error.message)); - }); - - fileStream.on('finish', function () { + return { + stream: fs.createWriteStream(backupFilePath, { autoClose: true }), + async finish() { const backupUid = parseInt(process.env.SUDO_UID, 10) || process.getuid(); // in test, upload() may or may not be called via sudo script - if (hasChownSupportSync(apiConfig)) { - if (!safe.fs.chownSync(backupFilePath, backupUid, backupUid)) return callback(new BoxError(BoxError.EXTERNAL_ERROR, 'Unable to chown:' + safe.error.message)); - if (!safe.fs.chownSync(path.dirname(backupFilePath), backupUid, backupUid)) return callback(new BoxError(BoxError.EXTERNAL_ERROR, 'Unable to chown:' + safe.error.message)); + if (!safe.fs.chownSync(backupFilePath, backupUid, backupUid)) throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to chown ${backupFilePath}: ${safe.error.message}`); + if (!safe.fs.chownSync(path.dirname(backupFilePath), backupUid, backupUid)) throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to chown parentdir ${backupFilePath}: ${safe.error.message}`); } - - debug(`upload ${backupFilePath}: done`); - - callback(null); - }); - }); + } + }; } async function download(apiConfig, sourceFilePath) { diff --git a/src/storage/gcs.js b/src/storage/gcs.js index 655499541..9569fa8f5 100644 --- a/src/storage/gcs.js +++ b/src/storage/gcs.js @@ -73,29 +73,20 @@ async function getAvailableSize(apiConfig) { return Number.POSITIVE_INFINITY; } -function upload(apiConfig, backupFilePath, sourceStream, callback) { +async function upload(apiConfig, backupFilePath) { assert.strictEqual(typeof apiConfig, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); - assert.strictEqual(typeof sourceStream, 'object'); - assert.strictEqual(typeof callback, 'function'); debug(`Uploading to ${backupFilePath}`); - function done(error) { - if (error) { - debug(`upload: [${backupFilePath}] gcp upload error. %o`, error); - return callback(new BoxError(BoxError.EXTERNAL_ERROR, `Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`)); - } + const uploadStream = getBucket(apiConfig) + .file(backupFilePath) + .createWriteStream({ resumable: false }); - callback(null); - } - - const uploadStream = getBucket(apiConfig).file(backupFilePath) - .createWriteStream({resumable: false}) - .on('finish', done) - .on('error', done); - - sourceStream.pipe(uploadStream); + return { + uploadStream, + async finish() {} + }; } async function exists(apiConfig, backupFilePath) { diff --git a/src/storage/interface.js b/src/storage/interface.js index c69fb95e6..af5f7963a 100644 --- a/src/storage/interface.js +++ b/src/storage/interface.js @@ -56,22 +56,19 @@ async function getAvailableSize(apiConfig) { return Number.POSITIVE_INFINITY; } -function upload(apiConfig, backupFilePath, sourceStream, callback) { +async function upload(apiConfig, backupFilePath) { assert.strictEqual(typeof apiConfig, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); - assert.strictEqual(typeof sourceStream, 'object'); - assert.strictEqual(typeof callback, 'function'); - // Result: none - // sourceStream errors are handled upstream - - callback(new BoxError(BoxError.NOT_IMPLEMENTED, 'upload is not implemented')); + // Result: { stream, finish() callback } + throw new BoxError(BoxError.NOT_IMPLEMENTED, 'upload is not implemented'); } async function exists(apiConfig, backupFilePath) { assert.strictEqual(typeof apiConfig, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); + // Result: boolean if exists or not throw new BoxError(BoxError.NOT_IMPLEMENTED, 'exists is not implemented'); } diff --git a/src/storage/s3.js b/src/storage/s3.js index 7c682a077..e3d513441 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -34,6 +34,7 @@ const assert = require('assert'), constants = require('../constants.js'), debug = require('debug')('box:storage/s3'), https = require('https'), + { PassThrough } = require('node:stream'), path = require('path'), Readable = require('stream').Readable, safe = require('safetydance'), @@ -103,20 +104,11 @@ async function getAvailableSize(apiConfig) { return Number.POSITIVE_INFINITY; } -function upload(apiConfig, backupFilePath, sourceStream, callback) { +async function upload(apiConfig, backupFilePath) { assert.strictEqual(typeof apiConfig, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); - assert.strictEqual(typeof sourceStream, 'object'); - assert.strictEqual(typeof callback, 'function'); const credentials = getS3Config(apiConfig); - - const params = { - Bucket: apiConfig.bucket, - Key: backupFilePath, - Body: sourceStream - }; - const s3 = new aws.S3(credentials); // s3.upload automatically does a multi-part upload. we set queueSize to 3 to reduce memory usage @@ -125,16 +117,26 @@ function upload(apiConfig, backupFilePath, sourceStream, callback) { // s3: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html (max 10k parts and no size limit on the last part!) const partSize = apiConfig.limits?.uploadPartSize || (apiConfig.provider === 'scaleway-objectstorage' ? 100 * 1024 * 1024 : 10 * 1024 * 1024); - s3.upload(params, { partSize, queueSize: 3 }, function (error, data) { - if (error) { - debug(`upload: [${backupFilePath}] s3 upload error. %o`, error); - return callback(new BoxError(BoxError.EXTERNAL_ERROR, `Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`)); + const passThrough = new PassThrough(); + + const params = { + Bucket: apiConfig.bucket, + Key: backupFilePath, + Body: passThrough + }; + + const managedUpload = s3.upload(params, { partSize, queueSize: 3 }); + managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`)); + const uploadPromise = managedUpload.promise(); + + return { + stream: passThrough, + async finish() { + const [error, data] = await safe(uploadPromise); + if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Upload error: ${error.message}`); + debug(`Upload finished. ${JSON.stringify(data)}`); } - - debug(`Uploaded ${backupFilePath} with partSize ${partSize}: ${JSON.stringify(data)}`); - - callback(null); - }); + }; } async function exists(apiConfig, backupFilePath) { @@ -487,7 +489,6 @@ async function removeDir(apiConfig, pathPrefix, progressCallback) { assert.strictEqual(typeof pathPrefix, 'string'); assert.strictEqual(typeof progressCallback, 'function'); - const credentials = getS3Config(apiConfig); const s3 = new aws.S3(credentials); const listDirAsync = util.promisify(listDir);