Files
cloudron-box/src/backupformat/tgz.js
Girish Ramakrishnan 3a21191fba tgz: fix error handling
2024-07-10 19:10:24 +02:00

225 lines
10 KiB
JavaScript

'use strict';
exports = module.exports = {
getBackupFilePath,
download,
upload
};
const assert = require('assert'),
BoxError = require('../boxerror.js'),
DataLayout = require('../datalayout.js'),
debug = require('debug')('box:backupformat/tgz'),
{ DecryptStream, EncryptStream } = require('../hush.js'),
fs = require('fs'),
path = require('path'),
ProgressStream = require('../progress-stream.js'),
promiseRetry = require('../promise-retry.js'),
safe = require('safetydance'),
storage = require('../storage.js'),
stream = require('stream/promises'),
tar = require('tar-stream'),
zlib = require('zlib');
function getBackupFilePath(backupConfig, remotePath) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof remotePath, 'string');
const rootPath = backupConfig.rootPath;
const fileType = backupConfig.encryption ? '.tar.gz.enc' : '.tar.gz';
// we don't have a rootPath for noop
if (backupConfig.provider === 'noop') return remotePath + fileType;
return path.join(rootPath, remotePath + fileType);
}
function addToPack(pack, header, options) {
return new Promise((resolve, reject) => {
const packEntry = safe(() => pack.entry(header, function (error) {
if (error) {
debug(`addToPack: error adding ${header.name} ${header.type} ${error.message}`);
reject(new BoxError(BoxError.FS_ERROR, error.message));
} else {
debug(`addToPack: added ${header.name} ${header.type}`);
resolve();
}
}));
if (!packEntry) {
return reject(new BoxError(BoxError.FS_ERROR, `Failed to add ${header.name}: ${safe.error.message}`));
}
if (options?.input) {
safe(stream.pipeline(options.input, packEntry), { debug }); // background. rely on pack.entry callback for promise completion
}
});
}
async function tarPack(dataLayout, encryption, uploader, progressCallback) {
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof encryption, 'object');
assert.strictEqual(typeof uploader, 'object');
assert.strictEqual(typeof progressCallback, 'function');
const gzip = zlib.createGzip({});
const ps = new ProgressStream({ interval: 10000 }); // emit 'progress' every 10 seconds
ps.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong
progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` });
});
const pack = tar.pack();
let pipeline = null;
if (encryption) {
const encryptStream = new EncryptStream(encryption);
pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, uploader.stream));
} else {
pipeline = safe(stream.pipeline(pack, gzip, ps, uploader.stream));
}
for (const localPath of dataLayout.localPaths()) {
const queue = [ localPath ];
while (queue.length) {
// if (pack.destroyed || outStream.destroyed) break;
const dir = queue.shift();
debug(`tarPack: processing ${dir}`);
const [readdirError, entries] = await safe(fs.promises.readdir(dir, { withFileTypes: true }));
if (!entries) {
debug(`tarPack: skipping directory ${dir}: ${readdirError.message}`);
continue;
}
const subdirs = [];
for (const entry of entries) {
const abspath = path.join(dir, entry.name);
const headerName = dataLayout.toRemotePath(abspath);
if (entry.isFile()) {
const [openError, handle] = await safe(fs.promises.open(abspath, 'r'));
if (!handle) { debug(`tarPack: skipping file, could not open ${abspath}: ${openError.message}`); continue; }
const [statError, stat] = await safe(handle.stat());
if (!stat) { debug(`tarPack: skipping file, could not stat ${abspath}: ${statError.message}`); continue; }
const header = { name: headerName, type: 'file', size: stat.size, uid: process.getuid(), gid: process.getgid() };
if (stat.size > 8589934590 || entry.name.length > 99) header.pax = { size: stat.size };
const input = handle.createReadStream({ autoClose: true });
await addToPack(pack, header, { input });
} else if (entry.isDirectory()) {
const header = { name: headerName, type: 'directory', uid: process.getuid(), gid: process.getgid() };
await addToPack(pack, header);
subdirs.push(abspath);
} else if (entry.isSymbolicLink()) {
const [readlinkError, target] = await safe(fs.promises.readlink(abspath));
if (!target) { debug(`tarPack: skipping link, could not readlink ${abspath}: ${readlinkError.message}`); continue; }
const header = { name: headerName, type: 'symlink', linkname: target, uid: process.getuid(), gid: process.getgid() };
await addToPack(pack, header);
} else {
debug(`tarPack: ignoring unknown type ${entry.name} ${entry.type}`);
}
}
queue.unshift(...subdirs); // add to front of queue and in order of readdir listing
}
}
pack.finalize();
const [error] = await pipeline; // already wrapped in safe()
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`);
debug(`tarPack: pipeline finished: ${JSON.stringify(ps.stats())}`);
await uploader.finish();
}
async function tarExtract(inStream, dataLayout, encryption, progressCallback) {
assert.strictEqual(typeof inStream, 'object');
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof encryption, 'object');
assert.strictEqual(typeof progressCallback, 'function');
const extract = tar.extract();
const now = new Date();
extract.on('entry', async function (header, entryStream, next) {
if (path.isAbsolute(header.name)) {
debug(`tarExtract: ignoring absolute path ${header.name}`);
return next();
}
const abspath = dataLayout.toLocalPath(header.name);
debug(`tarExtract: ${header.name} ${header.size} ${header.type} to ${abspath}`);
let error = null;
if (header.type === 'directory') {
[error] = await safe(fs.promises.mkdir(abspath, { recursive: true, mode: 0o755 }));
} else if (header.type === 'file') {
const output = fs.createWriteStream(abspath);
[error] = await safe(stream.pipeline(entryStream, output));
} else if (header.type === 'symlink') {
await safe(fs.promises.unlink(abspath)); // remove any link created from previous failed extract
[error] = await safe(fs.promises.symlink(header.linkname, abspath));
} else {
debug(`tarExtract: ignoring unknown entry: ${header.name} ${header.type}`);
entryStream.resume(); // drain
}
if (error) return next(error);
[error] = await safe(fs.promises.lutimes(abspath, now /* atime */, header.mtime)); // for dirs, mtime will get overwritten
next(error);
});
extract.on('finish', () => debug('tarExtract: extract finished'));
const gunzip = zlib.createGunzip({});
const ps = new ProgressStream({ interval: 10000 });
ps.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
});
if (encryption) {
const decrypt = new DecryptStream(encryption);
const [error] = await safe(stream.pipeline(inStream, ps, decrypt, gunzip, extract));
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`);
} else {
const [error] = await safe(stream.pipeline(inStream, ps, gunzip, extract));
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`);
}
debug(`tarExtract: pipeline finished: ${ps.stats()}`);
}
async function download(backupConfig, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof remotePath, 'string');
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
assert.strictEqual(typeof progressCallback, 'function');
debug(`download: Downloading ${remotePath} to ${dataLayout.toString()}`);
const backupFilePath = getBackupFilePath(backupConfig, remotePath);
await promiseRetry({ times: 5, interval: 20000, debug }, async () => {
progressCallback({ message: `Downloading backup ${backupFilePath}` });
const sourceStream = await storage.api(backupConfig.provider).download(backupConfig, backupFilePath);
await tarExtract(sourceStream, dataLayout, backupConfig.encryption, progressCallback);
});
}
async function upload(backupConfig, remotePath, dataLayout, progressCallback) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof remotePath, 'string');
assert.strictEqual(typeof dataLayout, 'object');
assert.strictEqual(typeof progressCallback, 'function');
debug(`upload: Uploading ${dataLayout.toString()} to ${remotePath}`);
const backupFilePath = getBackupFilePath(backupConfig, remotePath);
await promiseRetry({ times: 5, interval: 20000, debug }, async () => {
progressCallback({ message: `Uploading backup ${backupFilePath}` });
const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath);
await tarPack(dataLayout, backupConfig.encryption, uploader, progressCallback);
});
}