backups: download is now async

This commit is contained in:
Girish Ramakrishnan
2023-07-25 09:56:58 +05:30
parent 2bec56145e
commit 9a359a27f5
2 changed files with 39 additions and 45 deletions

View File

@@ -174,37 +174,36 @@ function downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback,
fs.mkdir(path.dirname(destFilePath), { recursive: true }, function (error) {
if (error) return done(new BoxError(BoxError.FS_ERROR, error.message));
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
storage.api(backupConfig.provider).download(backupConfig, entry.fullPath, function (error, sourceStream) {
if (error) {
progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${error.message}` });
return retryCallback(error);
}
async.retry({ times: 5, interval: 20000 }, async function (retryCallback) {
const [downloadError, sourceStream] = await safe(storage.api(backupConfig.provider).download(backupConfig, entry.fullPath));
if (downloadError) {
progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${downloadError.message}` });
return retryCallback(downloadError);
}
let destStream = hush.createWriteStream(destFilePath, backupConfig.encryption);
const destStream = hush.createWriteStream(destFilePath, backupConfig.encryption);
// protect against multiple errors. must destroy the write stream so that a previous retry does not write
let closeAndRetry = once((error) => {
if (error) progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${error.message}` });
else progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} finished` });
sourceStream.destroy();
destStream.destroy();
retryCallback(error);
});
destStream.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` });
});
destStream.on('error', closeAndRetry);
sourceStream.on('error', closeAndRetry);
progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` });
sourceStream.pipe(destStream, { end: true }).on('done', closeAndRetry);
// protect against multiple errors. must destroy the write stream so that a previous retry does not write
const closeAndRetry = once((error) => {
if (error) progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${error.message}` });
else progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} finished` });
sourceStream.destroy();
destStream.destroy();
retryCallback(error);
});
destStream.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` });
});
destStream.on('error', closeAndRetry);
sourceStream.on('error', closeAndRetry);
progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` });
sourceStream.pipe(destStream, { end: true }).on('done', closeAndRetry);
}, done);
});
}

View File

@@ -15,6 +15,7 @@ const assert = require('assert'),
once = require('../once.js'),
path = require('path'),
ProgressStream = require('../progress-stream.js'),
promiseRetry = require('../promise-retry.js'),
storage = require('../storage.js'),
tar = require('tar-fs'),
zlib = require('zlib');
@@ -143,26 +144,20 @@ async function download(backupConfig, remotePath, dataLayout, progressCallback)
const backupFilePath = getBackupFilePath(backupConfig, remotePath);
return new Promise((resolve, reject) => {
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
progressCallback({ message: `Downloading backup ${remotePath}` });
await promiseRetry({ times: 5, interval: 20000, debug }, async () => {
progressCallback({ message: `Downloading backup ${remotePath}` });
storage.api(backupConfig.provider).download(backupConfig, backupFilePath, function (error, sourceStream) {
if (error) return retryCallback(error);
const sourceStream = await storage.api(backupConfig.provider).download(backupConfig, backupFilePath);
const ps = tarExtract(sourceStream, dataLayout, backupConfig.encryption);
const ps = tarExtract(sourceStream, dataLayout, backupConfig.encryption);
ps.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
});
ps.on('error', retryCallback);
ps.on('done', retryCallback);
return await new Promise((resolve, reject) => {
ps.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong
progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` });
});
}, (error) => {
if (error) return reject(error);
resolve();
ps.on('error', reject);
ps.on('done', resolve);
});
});
}