'use strict'; const assert = require('node:assert'), backupTargets = require('../backuptargets.js'), BoxError = require('../boxerror.js'), DataLayout = require('../datalayout.js'), debug = require('debug')('box:backupformat/tgz'), { DecryptStream, EncryptStream } = require('../hush.js'), fs = require('node:fs'), HashStream = require('../hash-stream.js'), path = require('node:path'), ProgressStream = require('../progress-stream.js'), promiseRetry = require('../promise-retry.js'), safe = require('safetydance'), stream = require('stream/promises'), { Transform } = require('node:stream'), tar = require('tar-stream'), zlib = require('node:zlib'); // In tar, the entry header contains the file size. If we don't provide it those many bytes, the tar will become corrupt // Linux provides no guarantee of how many bytes can be read from a file. This is the case with sqlite and log files // which are accessed by other processes when tar is in action. This class handles overflow and underflow class EnsureFileSizeStream extends Transform { constructor(options) { super(options); this._remaining = options.size; this._name = options.name; } _transform(chunk, encoding, callback) { if (this._remaining <= 0) { debug(`EnsureFileSizeStream: ${this._name} dropping ${chunk.length} bytes`); return callback(null); } if (this._remaining - chunk.length < 0) { debug(`EnsureFileSizeStream: ${this._name} dropping extra ${chunk.length - this._remaining} bytes`); chunk = chunk.subarray(0, this._remaining); this._remaining = 0; } else { this._remaining -= chunk.length; } callback(null, chunk); } _flush(callback) { if (this._remaining > 0) { debug(`EnsureFileSizeStream: ${this._name} injecting ${this._remaining} bytes`); this.push(Buffer.alloc(this._remaining, 0)); } callback(); } } function addEntryToPack(pack, header, options) { assert.strictEqual(typeof pack, 'object'); assert.strictEqual(typeof header, 'object'); assert.strictEqual(typeof options, 'object'); // { input } return new Promise((resolve, reject) => { const packEntry = safe(() => pack.entry(header, function (error) { if (error) { debug(`addToPack: error adding ${header.name} ${header.type} ${error.message}`); reject(new BoxError(BoxError.FS_ERROR, error.message)); } else { debug(`addToPack: added ${header.name} ${header.type}`); resolve(); } })); if (!packEntry) return reject(new BoxError(BoxError.FS_ERROR, `Failed to add ${header.name}: ${safe.error.message}`)); if (options?.input) { const ensureFileSizeStream = new EnsureFileSizeStream({ name: header.name, size: header.size }); safe(stream.pipeline(options.input, ensureFileSizeStream, packEntry), { debug }); // background. rely on pack.entry callback for promise completion } }); } async function addPathToPack(pack, localPath, dataLayout) { assert.strictEqual(typeof pack, 'object'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof localPath, 'string'); const stats = { fileCount: 0, linkCount: 0, dirCount: 0 }; const queue = [ localPath ]; while (queue.length) { // if (pack.destroyed || outStream.destroyed) break; const dir = queue.shift(); debug(`tarPack: processing ${dir}`); const [readdirError, entries] = await safe(fs.promises.readdir(dir, { withFileTypes: true })); if (!entries) { debug(`tarPack: skipping directory ${dir}: ${readdirError.message}`); continue; } const subdirs = []; for (const entry of entries) { const abspath = path.join(dir, entry.name); const headerName = dataLayout.toRemotePath(abspath); if (entry.isFile()) { const [openError, handle] = await safe(fs.promises.open(abspath, 'r')); if (!handle) { debug(`tarPack: skipping file, could not open ${abspath}: ${openError.message}`); continue; } const [statError, stat] = await safe(handle.stat()); if (!stat) { debug(`tarPack: skipping file, could not stat ${abspath}: ${statError.message}`); continue; } const header = { name: headerName, type: 'file', mode: stat.mode, size: stat.size, uid: process.getuid(), gid: process.getgid() }; if (stat.size > 8589934590 || entry.name.length > 99) header.pax = { size: stat.size }; const input = handle.createReadStream({ autoClose: true }); await addEntryToPack(pack, header, { input }); ++stats.fileCount; } else if (entry.isDirectory()) { const header = { name: headerName, type: 'directory', uid: process.getuid(), gid: process.getgid() }; subdirs.push(abspath); await addEntryToPack(pack, header, { /* options */ }); ++stats.dirCount; } else if (entry.isSymbolicLink()) { const [readlinkError, target] = await safe(fs.promises.readlink(abspath)); if (!target) { debug(`tarPack: skipping link, could not readlink ${abspath}: ${readlinkError.message}`); continue; } const header = { name: headerName, type: 'symlink', linkname: target, uid: process.getuid(), gid: process.getgid() }; await addEntryToPack(pack, header, { /* options */ }); ++stats.linkCount; } else { debug(`tarPack: ignoring unknown type ${entry.name} ${entry.type}`); } } queue.unshift(...subdirs); // add to front of queue and in order of readdir listing } return stats; } async function tarPack(dataLayout, encryption, uploader, progressCallback) { assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof encryption, 'object'); assert.strictEqual(typeof uploader, 'object'); assert.strictEqual(typeof progressCallback, 'function'); const gzip = zlib.createGzip({}); const ps = new ProgressStream({ interval: 10000 }); // emit 'progress' every 10 seconds ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` }); }); const pack = tar.pack(); const hash = new HashStream(); let pipeline = null; if (encryption) { const encryptStream = new EncryptStream(encryption); pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, hash, uploader.stream)); } else { pipeline = safe(stream.pipeline(pack, gzip, ps, hash, uploader.stream)); } let fileCount = 0; for (const localPath of dataLayout.localPaths()) { const [error, stats] = await safe(addPathToPack(pack, localPath, dataLayout), { debug }); if (error) break; // the pipeline will error and we will retry the whole packing all over fileCount += stats.fileCount; } pack.finalize(); // harmless to call if already in error state const [error] = await pipeline; // already wrapped in safe() if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); debug(`tarPack: pipeline finished: ${JSON.stringify(ps.stats())}`); await uploader.finish(); return { stats: { fileCount, ...ps.stats() }, integrity: { size: ps.stats().transferred, fileCount, sha256: hash.digest('hex') } }; } async function tarExtract(inStream, dataLayout, encryption, progressCallback) { assert.strictEqual(typeof inStream, 'object'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof encryption, 'object'); assert.strictEqual(typeof progressCallback, 'function'); const extract = tar.extract(); const now = new Date(); extract.on('entry', async function (header, entryStream, next) { if (path.isAbsolute(header.name)) { debug(`tarExtract: ignoring absolute path ${header.name}`); return next(); } const abspath = dataLayout.toLocalPath(header.name); debug(`tarExtract: ${header.name} ${header.size} ${header.type} to ${abspath}`); let error = null; if (header.type === 'directory') { [error] = await safe(fs.promises.mkdir(abspath, { recursive: true, mode: 0o755 })); } else if (header.type === 'file') { const output = fs.createWriteStream(abspath); [error] = await safe(stream.pipeline(entryStream, output)); if (!error) [error] = await safe(fs.promises.chmod(abspath, header.mode)); } else if (header.type === 'symlink') { await safe(fs.promises.unlink(abspath)); // remove any link created from previous failed extract [error] = await safe(fs.promises.symlink(header.linkname, abspath)); } else { debug(`tarExtract: ignoring unknown entry: ${header.name} ${header.type}`); entryStream.resume(); // drain } if (error) return next(error); [error] = await safe(fs.promises.lutimes(abspath, now /* atime */, header.mtime)); // for dirs, mtime will get overwritten next(error); }); extract.on('finish', () => debug('tarExtract: extract finished')); const gunzip = zlib.createGunzip({}); const ps = new ProgressStream({ interval: 10000 }); ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` }); }); if (encryption) { const decrypt = new DecryptStream(encryption); const [error] = await safe(stream.pipeline(inStream, ps, decrypt, gunzip, extract)); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } else { const [error] = await safe(stream.pipeline(inStream, ps, gunzip, extract)); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } debug(`tarExtract: pipeline finished: ${JSON.stringify(ps.stats())}`); } async function download(backupTarget, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupTarget, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof progressCallback, 'function'); debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`); await promiseRetry({ times: 5, interval: 20000, debug }, async () => { progressCallback({ message: `Downloading backup ${remotePath}` }); const sourceStream = await backupTargets.storageApi(backupTarget).download(backupTarget.config, remotePath); await tarExtract(sourceStream, dataLayout, backupTarget.encryption, progressCallback); }); } async function upload(backupTarget, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupTarget, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert.strictEqual(typeof dataLayout, 'object'); assert.strictEqual(typeof progressCallback, 'function'); debug(`upload: Uploading ${dataLayout.toString()} to ${remotePath}`); return await promiseRetry({ times: 5, interval: 20000, debug }, async () => { progressCallback({ message: `Uploading backup ${remotePath}` }); const uploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, remotePath); const { stats, integrity } = await tarPack(dataLayout, backupTarget.encryption, uploader, progressCallback); const integrityMap = new Map([ [path.basename(remotePath), integrity] ]); return { stats, integrityMap }; }); } function getFileExtension(encryption) { assert.strictEqual(typeof encryption, 'boolean'); return encryption ? '.tar.gz.enc' : '.tar.gz'; } exports = module.exports = { download, upload, getFileExtension, // exported for testing _EnsureFileSizeStream: EnsureFileSizeStream };