12e073e8cf
mostly because code is being autogenerated by all the AI stuff using this prefix. it's also used in the stack trace.
296 lines
13 KiB
JavaScript
296 lines
13 KiB
JavaScript
'use strict';
|
|
|
|
const assert = require('node:assert'),
|
|
backupTargets = require('../backuptargets.js'),
|
|
BoxError = require('../boxerror.js'),
|
|
crypto = require('node:crypto'),
|
|
DataLayout = require('../datalayout.js'),
|
|
debug = require('debug')('box:backupformat/tgz'),
|
|
{ DecryptStream, EncryptStream } = require('../hush.js'),
|
|
fs = require('node:fs'),
|
|
HashStream = require('../hash-stream.js'),
|
|
path = require('node:path'),
|
|
ProgressStream = require('../progress-stream.js'),
|
|
promiseRetry = require('../promise-retry.js'),
|
|
{ Readable } = require('node:stream'),
|
|
safe = require('safetydance'),
|
|
stream = require('stream/promises'),
|
|
{ Transform } = require('node:stream'),
|
|
tar = require('tar-stream'),
|
|
zlib = require('node:zlib');
|
|
|
|
// In tar, the entry header contains the file size. If we don't provide it those many bytes, the tar will become corrupt
|
|
// Linux provides no guarantee of how many bytes can be read from a file. This is the case with sqlite and log files
|
|
// which are accessed by other processes when tar is in action. This class handles overflow and underflow
|
|
class EnsureFileSizeStream extends Transform {
|
|
constructor(options) {
|
|
super(options);
|
|
this._remaining = options.size;
|
|
this._name = options.name;
|
|
}
|
|
|
|
_transform(chunk, encoding, callback) {
|
|
if (this._remaining <= 0) {
|
|
debug(`EnsureFileSizeStream: ${this._name} dropping ${chunk.length} bytes`);
|
|
return callback(null);
|
|
}
|
|
|
|
if (this._remaining - chunk.length < 0) {
|
|
debug(`EnsureFileSizeStream: ${this._name} dropping extra ${chunk.length - this._remaining} bytes`);
|
|
chunk = chunk.subarray(0, this._remaining);
|
|
this._remaining = 0;
|
|
} else {
|
|
this._remaining -= chunk.length;
|
|
}
|
|
|
|
callback(null, chunk);
|
|
}
|
|
|
|
_flush(callback) {
|
|
if (this._remaining > 0) {
|
|
debug(`EnsureFileSizeStream: ${this._name} injecting ${this._remaining} bytes`);
|
|
this.push(Buffer.alloc(this._remaining, 0));
|
|
}
|
|
callback();
|
|
}
|
|
}
|
|
|
|
function addEntryToPack(pack, header, options) {
|
|
assert.strictEqual(typeof pack, 'object');
|
|
assert.strictEqual(typeof header, 'object');
|
|
assert.strictEqual(typeof options, 'object'); // { input }
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const packEntry = safe(() => pack.entry(header, function (error) {
|
|
if (error) {
|
|
debug(`addToPack: error adding ${header.name} ${header.type} ${error.message}`);
|
|
reject(new BoxError(BoxError.FS_ERROR, error.message));
|
|
} else {
|
|
debug(`addToPack: added ${header.name} ${header.type}`);
|
|
resolve();
|
|
}
|
|
}));
|
|
|
|
if (!packEntry) return reject(new BoxError(BoxError.FS_ERROR, `Failed to add ${header.name}: ${safe.error.message}`));
|
|
|
|
if (options?.input) {
|
|
const ensureFileSizeStream = new EnsureFileSizeStream({ name: header.name, size: header.size });
|
|
safe(stream.pipeline(options.input, ensureFileSizeStream, packEntry), { debug }); // background. rely on pack.entry callback for promise completion
|
|
}
|
|
});
|
|
}
|
|
|
|
async function addPathToPack(pack, localPath, dataLayout) {
|
|
assert.strictEqual(typeof pack, 'object');
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof localPath, 'string');
|
|
|
|
const stats = { fileCount: 0, linkCount: 0, dirCount: 0 };
|
|
|
|
const queue = [ localPath ];
|
|
while (queue.length) {
|
|
// if (pack.destroyed || outStream.destroyed) break;
|
|
const dir = queue.shift();
|
|
debug(`tarPack: processing ${dir}`);
|
|
const [readdirError, entries] = await safe(fs.promises.readdir(dir, { withFileTypes: true }));
|
|
if (!entries) {
|
|
debug(`tarPack: skipping directory ${dir}: ${readdirError.message}`);
|
|
continue;
|
|
}
|
|
const subdirs = [];
|
|
for (const entry of entries) {
|
|
const abspath = path.join(dir, entry.name);
|
|
const headerName = dataLayout.toRemotePath(abspath);
|
|
if (entry.isFile()) {
|
|
const [openError, handle] = await safe(fs.promises.open(abspath, 'r'));
|
|
if (!handle) { debug(`tarPack: skipping file, could not open ${abspath}: ${openError.message}`); continue; }
|
|
const [statError, stat] = await safe(handle.stat());
|
|
if (!stat) { debug(`tarPack: skipping file, could not stat ${abspath}: ${statError.message}`); continue; }
|
|
const header = { name: headerName, type: 'file', mode: stat.mode, size: stat.size, uid: process.getuid(), gid: process.getgid() };
|
|
if (stat.size > 8589934590 || entry.name.length > 99) header.pax = { size: stat.size };
|
|
const input = handle.createReadStream({ autoClose: true });
|
|
await addEntryToPack(pack, header, { input });
|
|
++stats.fileCount;
|
|
} else if (entry.isDirectory()) {
|
|
const header = { name: headerName, type: 'directory', uid: process.getuid(), gid: process.getgid() };
|
|
subdirs.push(abspath);
|
|
await addEntryToPack(pack, header, { /* options */ });
|
|
++stats.dirCount;
|
|
} else if (entry.isSymbolicLink()) {
|
|
const [readlinkError, target] = await safe(fs.promises.readlink(abspath));
|
|
if (!target) { debug(`tarPack: skipping link, could not readlink ${abspath}: ${readlinkError.message}`); continue; }
|
|
const header = { name: headerName, type: 'symlink', linkname: target, uid: process.getuid(), gid: process.getgid() };
|
|
await addEntryToPack(pack, header, { /* options */ });
|
|
++stats.linkCount;
|
|
} else {
|
|
debug(`tarPack: ignoring unknown type ${entry.name} ${entry.type}`);
|
|
}
|
|
}
|
|
|
|
queue.unshift(...subdirs); // add to front of queue and in order of readdir listing
|
|
}
|
|
|
|
return stats;
|
|
}
|
|
|
|
async function tarPack(dataLayout, encryption, uploader, progressCallback) {
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof encryption, 'object');
|
|
assert.strictEqual(typeof uploader, 'object');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
const gzip = zlib.createGzip({});
|
|
const ps = new ProgressStream({ interval: 10000 }); // emit 'progress' every 10 seconds
|
|
ps.on('progress', function (progress) {
|
|
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
|
if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong
|
|
progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` });
|
|
});
|
|
|
|
const pack = tar.pack();
|
|
|
|
const hash = new HashStream();
|
|
|
|
let pipeline = null;
|
|
if (encryption) {
|
|
const encryptStream = new EncryptStream(encryption);
|
|
pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, hash, uploader.stream));
|
|
} else {
|
|
pipeline = safe(stream.pipeline(pack, gzip, ps, hash, uploader.stream));
|
|
}
|
|
|
|
let fileCount = 0;
|
|
for (const localPath of dataLayout.localPaths()) {
|
|
const [error, stats] = await safe(addPathToPack(pack, localPath, dataLayout), { debug });
|
|
if (error) break; // the pipeline will error and we will retry the whole packing all over
|
|
fileCount += stats.fileCount;
|
|
}
|
|
|
|
pack.finalize(); // harmless to call if already in error state
|
|
|
|
const [error] = await pipeline; // already wrapped in safe()
|
|
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`);
|
|
debug(`tarPack: pipeline finished: ${JSON.stringify(ps.stats())}`);
|
|
|
|
await uploader.finish();
|
|
return {
|
|
stats: { fileCount, ...ps.stats() },
|
|
integrity: { size: ps.stats().transferred, fileCount, sha256: hash.digest('hex') }
|
|
};
|
|
}
|
|
|
|
async function tarExtract(inStream, dataLayout, encryption, progressCallback) {
|
|
assert.strictEqual(typeof inStream, 'object');
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof encryption, 'object');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
const extract = tar.extract();
|
|
const now = new Date();
|
|
extract.on('entry', async function (header, entryStream, next) {
|
|
if (path.isAbsolute(header.name)) {
|
|
debug(`tarExtract: ignoring absolute path ${header.name}`);
|
|
return next();
|
|
}
|
|
const abspath = dataLayout.toLocalPath(header.name);
|
|
debug(`tarExtract: ${header.name} ${header.size} ${header.type} to ${abspath}`);
|
|
let error = null;
|
|
if (header.type === 'directory') {
|
|
[error] = await safe(fs.promises.mkdir(abspath, { recursive: true, mode: 0o755 }));
|
|
} else if (header.type === 'file') {
|
|
const output = fs.createWriteStream(abspath);
|
|
[error] = await safe(stream.pipeline(entryStream, output));
|
|
if (!error) [error] = await safe(fs.promises.chmod(abspath, header.mode));
|
|
} else if (header.type === 'symlink') {
|
|
await safe(fs.promises.unlink(abspath)); // remove any link created from previous failed extract
|
|
[error] = await safe(fs.promises.symlink(header.linkname, abspath));
|
|
} else {
|
|
debug(`tarExtract: ignoring unknown entry: ${header.name} ${header.type}`);
|
|
entryStream.resume(); // drain
|
|
}
|
|
|
|
if (error) return next(error);
|
|
|
|
[error] = await safe(fs.promises.lutimes(abspath, now /* atime */, header.mtime)); // for dirs, mtime will get overwritten
|
|
next(error);
|
|
});
|
|
extract.on('finish', () => debug('tarExtract: extract finished'));
|
|
|
|
const gunzip = zlib.createGunzip({});
|
|
const ps = new ProgressStream({ interval: 10000 });
|
|
ps.on('progress', function (progress) {
|
|
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
|
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
|
|
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
|
|
});
|
|
|
|
if (encryption) {
|
|
const decrypt = new DecryptStream(encryption);
|
|
const [error] = await safe(stream.pipeline(inStream, ps, decrypt, gunzip, extract));
|
|
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`);
|
|
} else {
|
|
const [error] = await safe(stream.pipeline(inStream, ps, gunzip, extract));
|
|
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`);
|
|
}
|
|
|
|
debug(`tarExtract: pipeline finished: ${JSON.stringify(ps.stats())}`);
|
|
}
|
|
|
|
async function download(backupTarget, remotePath, dataLayout, progressCallback) {
|
|
assert.strictEqual(typeof backupTarget, 'object');
|
|
assert.strictEqual(typeof remotePath, 'string');
|
|
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`);
|
|
|
|
await promiseRetry({ times: 5, interval: 20000, debug }, async () => {
|
|
progressCallback({ message: `Downloading backup ${remotePath}` });
|
|
|
|
const sourceStream = await backupTargets.storageApi(backupTarget).download(backupTarget.config, remotePath);
|
|
await tarExtract(sourceStream, dataLayout, backupTarget.encryption, progressCallback);
|
|
});
|
|
}
|
|
|
|
async function upload(backupTarget, remotePath, dataLayout, progressCallback) {
|
|
assert.strictEqual(typeof backupTarget, 'object');
|
|
assert.strictEqual(typeof remotePath, 'string');
|
|
assert.strictEqual(typeof dataLayout, 'object');
|
|
assert.strictEqual(typeof progressCallback, 'function');
|
|
|
|
debug(`upload: Uploading ${dataLayout.toString()} to ${remotePath}`);
|
|
|
|
return await promiseRetry({ times: 5, interval: 20000, debug }, async () => {
|
|
progressCallback({ message: `Uploading backup ${remotePath}` });
|
|
|
|
const uploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, remotePath);
|
|
const { stats, integrity } = await tarPack(dataLayout, backupTarget.encryption, uploader, progressCallback);
|
|
|
|
const integrityMap = new Map();
|
|
integrityMap.set(path.basename(remotePath), integrity);
|
|
const integrityDataJsonString = JSON.stringify(Object.fromEntries(integrityMap), null, 2);
|
|
const integrityDataStream = Readable.from(integrityDataJsonString);
|
|
const integrityUploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, `${remotePath}.backupinfo`);
|
|
await stream.pipeline(integrityDataStream, integrityUploader.stream);
|
|
await integrityUploader.finish();
|
|
|
|
const signature = await crypto.sign(null /* algorithm */, integrityDataJsonString, backupTarget.integrityKeyPair.privateKey);
|
|
return { stats, integrity: { signature } };
|
|
});
|
|
}
|
|
|
|
function getFileExtension(encryption) {
|
|
assert.strictEqual(typeof encryption, 'boolean');
|
|
|
|
return encryption ? '.tar.gz.enc' : '.tar.gz';
|
|
}
|
|
|
|
exports = module.exports = {
|
|
download,
|
|
upload,
|
|
getFileExtension,
|
|
|
|
// exported for testing
|
|
_EnsureFileSizeStream: EnsureFileSizeStream
|
|
};
|