import assert from 'node:assert'; import backupSites from '../backupsites.js'; import BoxError from '../boxerror.js'; import DataLayout from '../datalayout.js'; import logger from '../logger.js'; import hush from '../hush.js'; const { DecryptStream, EncryptStream } = hush; import fs from 'node:fs'; import HashStream from '../hash-stream.js'; import path from 'node:path'; import ProgressStream from '../progress-stream.js'; import promiseRetry from '../promise-retry.js'; import safe from 'safetydance'; import stream from 'stream/promises'; import { Transform } from 'node:stream'; import tar from 'tar-stream'; import util from 'node:util'; import zlib from 'node:zlib'; const { log, trace } = logger('backupformat/tgz'); // In tar, the entry header contains the file size. If we don't provide it those many bytes, the tar will become corrupt // Linux provides no guarantee of how many bytes can be read from a file. This is the case with sqlite and log files // which are accessed by other processes when tar is in action. This class handles overflow and underflow class EnsureFileSizeStream extends Transform { constructor(options) { super(options); this._remaining = options.size; this._name = options.name; } _transform(chunk, encoding, callback) { if (this._remaining <= 0) { log(`EnsureFileSizeStream: ${this._name} dropping ${chunk.length} bytes`); return callback(null); } if (this._remaining - chunk.length < 0) { log(`EnsureFileSizeStream: ${this._name} dropping extra ${chunk.length - this._remaining} bytes`); chunk = chunk.subarray(0, this._remaining); this._remaining = 0; } else { this._remaining -= chunk.length; } callback(null, chunk); } _flush(callback) { if (this._remaining > 0) { log(`EnsureFileSizeStream: ${this._name} injecting ${this._remaining} bytes`); this.push(Buffer.alloc(this._remaining, 0)); } callback(); } } function addEntryToPack(pack, header, options) { assert.strictEqual(typeof pack, 'object'); assert.strictEqual(typeof header, 'object'); assert.strictEqual(typeof options, 'object'); // { input } return new Promise((resolve, reject) => { const packEntry = safe(() => pack.entry(header, function (error) { if (error) { log(`addToPack: error adding ${header.name} ${header.type} ${error.message}`); reject(new BoxError(BoxError.FS_ERROR, error.message)); } else { resolve(); } })); if (!packEntry) return reject(new BoxError(BoxError.FS_ERROR, `Failed to add ${header.name}: ${safe.error.message}`)); if (options?.input) { const ensureFileSizeStream = new EnsureFileSizeStream({ name: header.name, size: header.size }); safe(stream.pipeline(options.input, ensureFileSizeStream, packEntry), { debug: log }); // background. rely on pack.entry callback for promise completion } }); } async function addPathToPack(pack, localPath, dataLayout) { assert.strictEqual(typeof pack, 'object'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof localPath, 'string'); const stats = { fileCount: 0, linkCount: 0, dirCount: 0 }; const queue = [ localPath ]; while (queue.length) { // if (pack.destroyed || outStream.destroyed) break; const dir = queue.shift(); const [readdirError, entries] = await safe(fs.promises.readdir(dir, { withFileTypes: true })); if (!entries) { log(`tarPack: skipping directory ${dir}: ${readdirError.message}`); continue; } const subdirs = []; for (const entry of entries) { const abspath = path.join(dir, entry.name); const headerName = dataLayout.toRemotePath(abspath); if (entry.isFile()) { const [openError, handle] = await safe(fs.promises.open(abspath, 'r')); if (!handle) { log(`tarPack: skipping file, could not open ${abspath}: ${openError.message}`); continue; } const [statError, stat] = await safe(handle.stat()); if (!stat) { log(`tarPack: skipping file, could not stat ${abspath}: ${statError.message}`); continue; } const header = { name: headerName, type: 'file', mode: stat.mode, size: stat.size, uid: process.getuid(), gid: process.getgid() }; if (stat.size > 8589934590 || entry.name.length > 99) header.pax = { size: stat.size }; const input = handle.createReadStream({ autoClose: true }); await addEntryToPack(pack, header, { input }); ++stats.fileCount; } else if (entry.isDirectory()) { const header = { name: headerName, type: 'directory', uid: process.getuid(), gid: process.getgid() }; subdirs.push(abspath); await addEntryToPack(pack, header, { /* options */ }); ++stats.dirCount; } else if (entry.isSymbolicLink()) { const [readlinkError, site] = await safe(fs.promises.readlink(abspath)); if (!site) { log(`tarPack: skipping link, could not readlink ${abspath}: ${readlinkError.message}`); continue; } const header = { name: headerName, type: 'symlink', linkname: site, uid: process.getuid(), gid: process.getgid() }; await addEntryToPack(pack, header, { /* options */ }); ++stats.linkCount; } else { log(`tarPack: ignoring unknown type ${entry.name} ${entry.type}`); } } queue.unshift(...subdirs); // add to front of queue and in order of readdir listing } return stats; } async function tarPack(dataLayout, encryption, uploader, progressCallback) { assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof encryption, 'object'); assert.strictEqual(typeof uploader, 'object'); assert.strictEqual(typeof progressCallback, 'function'); const gzip = zlib.createGzip({}); const ps = new ProgressStream({ interval: 10000 }); // emit 'progress' every 10 seconds ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` }); }); ps.on('heartbeat', function ({ elapsed, transferred }) { progressCallback({ message: `Still uploading backup (${elapsed}s, ${Math.round(transferred/1024/1024)}M)` }); }); // careful not to have async code between here and pipeline() for 'error' handling const pack = tar.pack(); const hash = new HashStream(); const destStream = uploader.createStream(); let pipeline; if (encryption) { const encryptStream = new EncryptStream(encryption); pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, hash, destStream)); } else { pipeline = safe(stream.pipeline(pack, gzip, ps, hash, destStream)); } let fileCount = 0; for (const localPath of dataLayout.localPaths()) { const [error, stats] = await safe(addPathToPack(pack, localPath, dataLayout), { debug: log }); if (error) break; // the pipeline will error and we will retry the whole packing all over fileCount += stats.fileCount; } log(`tarPack: packed ${fileCount} files`); pack.finalize(); // harmless to call if already in error state const [error] = await pipeline; // already wrapped in safe() if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); const stats = ps.stats(); // { startTime, totalMsecs, transferred } log(`tarPack: pipeline finished: ${JSON.stringify(stats)}`); await uploader.finish(); return { stats: { fileCount, size: stats.transferred, transferred: stats.transferred }, integrity: { size: stats.transferred, fileCount, sha256: hash.digest('hex') } }; } async function tarExtract(inStream, dataLayout, encryption, progressCallback) { assert.strictEqual(typeof inStream, 'object'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof encryption, 'object'); assert.strictEqual(typeof progressCallback, 'function'); const extract = tar.extract(); const now = new Date(); let entryCount = 0; extract.on('entry', async function (header, entryStream, next) { if (path.isAbsolute(header.name)) { log(`tarExtract: ignoring absolute path ${header.name}`); return next(); } ++entryCount; const abspath = dataLayout.toLocalPath(header.name); let error = null; if (header.type === 'directory') { [error] = await safe(fs.promises.mkdir(abspath, { recursive: true, mode: 0o755 })); } else if (header.type === 'file') { const output = fs.createWriteStream(abspath); [error] = await safe(stream.pipeline(entryStream, output)); if (!error) [error] = await safe(fs.promises.chmod(abspath, header.mode)); } else if (header.type === 'symlink') { await safe(fs.promises.unlink(abspath)); // remove any link created from previous failed extract [error] = await safe(fs.promises.symlink(header.linkname, abspath)); } else { log(`tarExtract: ignoring unknown entry: ${header.name} ${header.type}`); entryStream.resume(); // drain } if (error) return next(error); [error] = await safe(fs.promises.lutimes(abspath, now /* atime */, header.mtime)); // for dirs, mtime will get overwritten next(error); }); extract.on('finish', () => log(`tarExtract: extracted ${entryCount} entries`)); const gunzip = zlib.createGunzip({}); const ps = new ProgressStream({ interval: 10000 }); ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` }); }); ps.on('heartbeat', function ({ elapsed, transferred }) { progressCallback({ message: `Still downloading backup (${elapsed}s, ${Math.round(transferred/1024/1024)}M)` }); }); if (encryption) { const decrypt = new DecryptStream(encryption); const [error] = await safe(stream.pipeline(inStream, ps, decrypt, gunzip, extract)); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } else { const [error] = await safe(stream.pipeline(inStream, ps, gunzip, extract)); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } log(`tarExtract: pipeline finished: ${JSON.stringify(ps.stats())}`); } async function download(backupSite, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupSite, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof progressCallback, 'function'); log(`download: Downloading ${remotePath} to ${dataLayout.toString()}`); await promiseRetry({ times: 3, interval: 20000, debug: log }, async () => { progressCallback({ message: `Downloading backup ${remotePath}` }); const sourceStream = await backupSites.storageApi(backupSite).download(backupSite.config, remotePath); await tarExtract(sourceStream, dataLayout, backupSite.encryption, progressCallback); }); } async function upload(backupSite, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupSite, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert.strictEqual(typeof dataLayout, 'object'); assert.strictEqual(typeof progressCallback, 'function'); log(`upload: uploading to site ${backupSite.id} path ${remotePath} (encrypted: ${!!backupSite.encryption}) dataLayout ${dataLayout.toString()}`); return await promiseRetry({ times: 5, interval: 20000, debug: log }, async () => { progressCallback({ message: `Uploading backup ${remotePath}` }); const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, backupSite.limits, remotePath); const { stats, integrity } = await tarPack(dataLayout, backupSite.encryption, uploader, progressCallback); // use '.' instead of remote path since the backup can be moved to another path const integrityMap = new Map([ ['.', integrity] ]); return { stats, integrityMap }; }); } async function copy(backupSite, fromPath, toPath, progressCallback) { assert.strictEqual(typeof backupSite, 'object'); assert.strictEqual(typeof fromPath, 'string'); assert.strictEqual(typeof toPath, 'string'); assert.strictEqual(typeof progressCallback, 'function'); await backupSites.storageApi(backupSite).copy(backupSite.config, fromPath, toPath, progressCallback); } async function verify(backupSite, remotePath, integrityMap, progressCallback) { assert.strictEqual(typeof backupSite, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert(util.types.isMap(integrityMap), 'integrityMap should be a Map'); assert.strictEqual(typeof progressCallback, 'function'); log(`verify: Verifying ${remotePath}`); const inStream = await backupSites.storageApi(backupSite).download(backupSite.config, remotePath); let fileCount = 0; const extract = tar.extract(); extract.on('entry', async function (header, entryStream, next) { if (path.isAbsolute(header.name)) { log(`verify: ignoring absolute path ${header.name}`); return next(); } log(`verify: ${header.name} ${header.size} ${header.type}`); if (header.type === 'file') { ++fileCount; } entryStream.resume(); // drain next(); }); extract.on('finish', () => log('verify: extract finished')); const hash = new HashStream(); const gunzip = zlib.createGunzip({}); const ps = new ProgressStream({ interval: 10000 }); ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` }); }); if (backupSite.encryption) { const decrypt = new DecryptStream(backupSite.encryption); const [error] = await safe(stream.pipeline(inStream, ps, hash, decrypt, gunzip, extract)); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } else { const [error] = await safe(stream.pipeline(inStream, ps, hash, gunzip, extract)); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`); } const integrity = integrityMap.get('.'); log(`verify: Expecting: ${JSON.stringify(integrity)} Actual: size:${ps.stats().transferred} filecount:${fileCount} digest:${hash.digest()}`); const messages = []; if (integrity.size !== ps.stats().transferred) messages.push(`Size mismatch. Expected: ${integrity.size} Actual: ${ps.stats().transferred}`); if (integrity.fileCount !== fileCount) messages.push(`File count mismatch. Expected: ${integrity.fileCount} Actual: ${fileCount}`); if (integrity.sha256 !== hash.digest()) messages.push(`File count mismatch. Expected: ${integrity.sha256} Actual: ${hash.digest()}`); return messages; } function getFileExtension(encryption) { assert.strictEqual(typeof encryption, 'boolean'); return encryption ? '.tar.gz.enc' : '.tar.gz'; } const _EnsureFileSizeStream = EnsureFileSizeStream; export default { download, upload, verify, getFileExtension, copy, _EnsureFileSizeStream, };