tgz: extract using tar-stream directly
we used have a fork of tar-fs. using tar-stream directly gives us more control
This commit is contained in:
@@ -7,17 +7,18 @@ exports = module.exports = {
|
||||
};
|
||||
|
||||
const assert = require('assert'),
|
||||
async = require('async'),
|
||||
BoxError = require('../boxerror.js'),
|
||||
DataLayout = require('../datalayout.js'),
|
||||
debug = require('debug')('box:backupformat/tgz'),
|
||||
{ DecryptStream, EncryptStream } = require('../hush.js'),
|
||||
once = require('../once.js'),
|
||||
fs = require('fs'),
|
||||
path = require('path'),
|
||||
ProgressStream = require('../progress-stream.js'),
|
||||
promiseRetry = require('../promise-retry.js'),
|
||||
safe = require('safetydance'),
|
||||
storage = require('../storage.js'),
|
||||
tar = require('tar-fs'),
|
||||
stream = require('stream/promises'),
|
||||
tar = require('tar-stream'),
|
||||
zlib = require('zlib');
|
||||
|
||||
function getBackupFilePath(backupConfig, remotePath) {
|
||||
@@ -33,108 +34,157 @@ function getBackupFilePath(backupConfig, remotePath) {
|
||||
return path.join(rootPath, remotePath + fileType);
|
||||
}
|
||||
|
||||
function tarPack(dataLayout, encryption) {
|
||||
function addToPack(pack, header, options) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const packEntry = safe(() => pack.entry(header, function (error) {
|
||||
if (error) {
|
||||
debug(`addToPack: error adding ${header.name} ${header.type} ${error.message}`);
|
||||
reject(error);
|
||||
} else {
|
||||
debug(`addToPack: added ${header.name} ${header.type}`);
|
||||
resolve();
|
||||
}
|
||||
}));
|
||||
|
||||
if (packEntry && options?.input) {
|
||||
safe(stream.pipeline(options.input, packEntry)); // background
|
||||
}
|
||||
|
||||
if (!packEntry) {
|
||||
reject(new Error(`Failed to add ${header.name}: ${safe.error.message}`));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function tarPack(dataLayout, encryption, uploader, progressCallback) {
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof encryption, 'object');
|
||||
|
||||
const pack = tar.pack('/', {
|
||||
dereference: false, // pack the symlink and not what it points to
|
||||
entries: dataLayout.localPaths(),
|
||||
ignoreStatError: (path, err) => {
|
||||
debug(`tarPack: error stat'ing ${path} - ${err.code}`);
|
||||
return err.code === 'ENOENT'; // ignore if file or dir got removed (probably some temporary file)
|
||||
},
|
||||
map: function(header) {
|
||||
header.name = dataLayout.toRemotePath(header.name);
|
||||
// the tar pax format allows us to encode filenames > 100 and size > 8GB (see #640)
|
||||
// https://www.systutorials.com/docs/linux/man/5-star/
|
||||
if (header.size > 8589934590 || header.name > 99) header.pax = { size: header.size };
|
||||
return header;
|
||||
},
|
||||
strict: false // do not error for unknown types (skip fifo, char/block devices)
|
||||
});
|
||||
assert.strictEqual(typeof uploader, 'object');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const gzip = zlib.createGzip({});
|
||||
const ps = new ProgressStream({ interval: 10000 }); // emit 'progress' every 10 seconds
|
||||
|
||||
pack.on('error', function (error) {
|
||||
debug('tarPack: tar stream error. %o', error);
|
||||
ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
ps.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
|
||||
gzip.on('error', function (error) {
|
||||
debug('tarPack: gzip stream error. %o', error);
|
||||
ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
const pack = tar.pack();
|
||||
|
||||
let pipeline = null;
|
||||
if (encryption) {
|
||||
const encryptStream = new EncryptStream(encryption);
|
||||
encryptStream.on('error', function (error) {
|
||||
debug('tarPack: encrypt stream error. %o', error);
|
||||
ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
|
||||
pack.pipe(gzip).pipe(encryptStream).pipe(ps);
|
||||
pipeline = safe(stream.pipeline(pack, gzip, encryptStream, ps, uploader.stream));
|
||||
} else {
|
||||
pack.pipe(gzip).pipe(ps);
|
||||
pipeline = safe(stream.pipeline(pack, gzip, ps, uploader.stream));
|
||||
}
|
||||
|
||||
return ps;
|
||||
for (const localPath of dataLayout.localPaths()) {
|
||||
const queue = [ localPath ];
|
||||
while (queue.length) {
|
||||
// if (pack.destroyed || outStream.destroyed) break;
|
||||
const dir = queue.shift();
|
||||
debug(`tarPack: processing ${dir}`);
|
||||
const [readdirError, entries] = await safe(fs.promises.readdir(dir, { withFileTypes: true }));
|
||||
if (!entries) {
|
||||
debug(`tarPack: skipping directory ${dir}: ${readdirError.message}`);
|
||||
continue;
|
||||
}
|
||||
const subdirs = [];
|
||||
for (const entry of entries) {
|
||||
const abspath = path.join(dir, entry.name);
|
||||
const headerName = dataLayout.toRemotePath(abspath);
|
||||
if (entry.isFile()) {
|
||||
const [openError, handle] = await safe(fs.promises.open(abspath, 'r'));
|
||||
if (!handle) { debug(`tarPack: skipping file, could not open ${abspath}: ${openError.message}`); continue; }
|
||||
const [statError, stat] = await safe(handle.stat());
|
||||
if (!stat) { debug(`tarPack: skipping file, could not stat ${abspath}: ${statError.message}`); continue; }
|
||||
const header = { name: headerName, type: 'file', size: stat.size, uid: process.getuid(), gid: process.getgid() };
|
||||
if (stat.size > 8589934590 || entry.name.length > 99) header.pax = { size: stat.size };
|
||||
const input = handle.createReadStream({ autoClose: true });
|
||||
await addToPack(pack, header, { input });
|
||||
} else if (entry.isDirectory()) {
|
||||
const header = { name: headerName, type: 'directory', uid: process.getuid(), gid: process.getgid() };
|
||||
await addToPack(pack, header);
|
||||
subdirs.push(abspath);
|
||||
} else if (entry.isSymbolicLink()) {
|
||||
const [readlinkError, target] = await safe(fs.promises.readlink(abspath));
|
||||
if (!target) { debug(`tarPack: skipping link, could not readlink ${abspath}: ${readlinkError.message}`); continue; }
|
||||
const header = { name: headerName, type: 'symlink', linkname: target, uid: process.getuid(), gid: process.getgid() };
|
||||
await addToPack(pack, header);
|
||||
} else {
|
||||
debug(`tarPack: ignoring unknown type ${entry.name} ${entry.type}`);
|
||||
}
|
||||
}
|
||||
|
||||
queue.unshift(...subdirs); // add to front of queue and in order of readdir listing
|
||||
}
|
||||
}
|
||||
|
||||
pack.finalize();
|
||||
|
||||
const [error] = await safe(pipeline);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`);
|
||||
debug(`tarPack: pipeline finished: ${JSON.stringify(ps.stats())}`);
|
||||
|
||||
await uploader.finish();
|
||||
}
|
||||
|
||||
function tarExtract(inStream, dataLayout, encryption) {
|
||||
async function tarExtract(inStream, dataLayout, encryption, progressCallback) {
|
||||
assert.strictEqual(typeof inStream, 'object');
|
||||
assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout');
|
||||
assert.strictEqual(typeof encryption, 'object');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const extract = tar.extract();
|
||||
const now = new Date();
|
||||
extract.on('entry', async function (header, entryStream, next) {
|
||||
if (path.isAbsolute(header.name)) {
|
||||
debug(`tarExtract: ignoring absolute path ${header.name}`);
|
||||
return next();
|
||||
}
|
||||
const abspath = dataLayout.toLocalPath(header.name);
|
||||
debug(`tarExtract: ${header.name} ${header.size} ${header.type} to ${abspath}`);
|
||||
let error = null;
|
||||
if (header.type === 'directory') {
|
||||
[error] = await safe(fs.promises.mkdir(abspath, { recursive: true, mode: 0o755 }));
|
||||
} else if (header.type === 'file') {
|
||||
const output = fs.createWriteStream(abspath);
|
||||
[error] = await safe(stream.pipeline(entryStream, output));
|
||||
} else if (header.type === 'symlink') {
|
||||
await safe(fs.promises.unlink(abspath)); // remove any link created from previous failed extract
|
||||
[error] = await safe(fs.promises.symlink(header.linkname, abspath));
|
||||
} else {
|
||||
debug(`tarExtract: ignoring unknown entry: ${header.name} ${header.type}`);
|
||||
entryStream.resume(); // drain
|
||||
}
|
||||
|
||||
if (error) return next(error);
|
||||
|
||||
[error] = await safe(fs.promises.lutimes(abspath, now /* atime */, header.mtime)); // for dirs, mtime will get overwritten
|
||||
next(error);
|
||||
});
|
||||
extract.on('finish', () => debug('tarExtract: extract finished'));
|
||||
|
||||
const gunzip = zlib.createGunzip({});
|
||||
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
|
||||
const extract = tar.extract('/', {
|
||||
map: function (header) {
|
||||
header.name = dataLayout.toLocalPath(header.name);
|
||||
return header;
|
||||
},
|
||||
dmode: 500 // ensure directory is writable
|
||||
});
|
||||
|
||||
const emitError = once((error) => {
|
||||
inStream.destroy();
|
||||
ps.emit('error', error);
|
||||
});
|
||||
|
||||
inStream.on('error', function (error) {
|
||||
debug('tarExtract: input stream error. %o', error);
|
||||
emitError(new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
|
||||
gunzip.on('error', function (error) {
|
||||
debug('tarExtract: gunzip stream error. %o', error);
|
||||
emitError(new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
|
||||
extract.on('error', function (error) {
|
||||
debug('tarExtract: extract stream error. %o', error);
|
||||
emitError(new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
});
|
||||
|
||||
extract.on('finish', function () {
|
||||
debug('tarExtract: done.');
|
||||
// we use a separate event because ps is a through2 stream which emits 'finish' event indicating end of inStream and not extract
|
||||
ps.emit('done');
|
||||
const ps = new ProgressStream({ interval: 10000 });
|
||||
ps.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
|
||||
if (encryption) {
|
||||
const decrypt = new DecryptStream(encryption);
|
||||
decrypt.on('error', function (error) {
|
||||
debug('tarExtract: decrypt stream error.', error);
|
||||
emitError(new BoxError(BoxError.EXTERNAL_ERROR, `Failed to decrypt: ${error.message}`));
|
||||
});
|
||||
inStream.pipe(ps).pipe(decrypt).pipe(gunzip).pipe(extract);
|
||||
const [error] = await safe(stream.pipeline(inStream, ps, decrypt, gunzip, extract));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`);
|
||||
} else {
|
||||
inStream.pipe(ps).pipe(gunzip).pipe(extract);
|
||||
const [error] = await safe(stream.pipeline(inStream, ps, gunzip, extract));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarExtract pipeline error: ${error.message}`);
|
||||
}
|
||||
|
||||
return ps;
|
||||
debug(`tarExtract: pipeline finished: ${ps.stats()}`);
|
||||
}
|
||||
|
||||
async function download(backupConfig, remotePath, dataLayout, progressCallback) {
|
||||
@@ -151,17 +201,7 @@ async function download(backupConfig, remotePath, dataLayout, progressCallback)
|
||||
progressCallback({ message: `Downloading backup ${backupFilePath}` });
|
||||
|
||||
const sourceStream = await storage.api(backupConfig.provider).download(backupConfig, backupFilePath);
|
||||
const ps = tarExtract(sourceStream, dataLayout, backupConfig.encryption);
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
ps.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
ps.on('error', reject);
|
||||
ps.on('done', resolve);
|
||||
});
|
||||
await tarExtract(sourceStream, dataLayout, backupConfig.encryption, progressCallback);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -173,23 +213,12 @@ async function upload(backupConfig, remotePath, dataLayout, progressCallback) {
|
||||
|
||||
debug(`upload: Uploading ${dataLayout.toString()} to ${remotePath}`);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
|
||||
retryCallback = once(retryCallback); // protect again upload() erroring much later after tar stream error
|
||||
const backupFilePath = getBackupFilePath(backupConfig, remotePath);
|
||||
|
||||
const tarStream = tarPack(dataLayout, backupConfig.encryption);
|
||||
await promiseRetry({ times: 5, interval: 20000, debug }, async () => {
|
||||
progressCallback({ message: `Uploading backup ${backupFilePath}` });
|
||||
|
||||
tarStream.on('progress', function (progress) {
|
||||
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
|
||||
if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong
|
||||
progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` });
|
||||
});
|
||||
tarStream.on('error', retryCallback); // already returns BoxError
|
||||
|
||||
storage.api(backupConfig.provider).upload(backupConfig, getBackupFilePath(backupConfig, remotePath), tarStream, retryCallback);
|
||||
}, (error) => {
|
||||
if (error) return reject(error);
|
||||
resolve();
|
||||
});
|
||||
const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath);
|
||||
await tarPack(dataLayout, backupConfig.encryption, uploader, progressCallback);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user