rsync: fix upload logic to match new upload api

This commit is contained in:
Girish Ramakrishnan
2024-07-08 14:56:55 +02:00
parent 7d017d83d6
commit baf598099f
2 changed files with 53 additions and 63 deletions

View File

@@ -15,16 +15,16 @@ const assert = require('assert'),
DataLayout = require('../datalayout.js'),
{ DecryptStream } = require('../hush.js'),
debug = require('debug')('box:backupformat/rsync'),
{ EncryptStream } = require('../hush.js'),
fs = require('fs'),
hush = require('../hush.js'),
once = require('../once.js'),
path = require('path'),
ProgressStream = require('../progress-stream.js'),
promiseRetry = require('../promise-retry.js'),
safe = require('safetydance'),
shell = require('../shell.js'),
storage = require('../storage.js'),
stream = require('stream'),
stream = require('stream/promises'),
syncer = require('../syncer.js'),
util = require('util');
@@ -38,6 +38,46 @@ function getBackupFilePath(backupConfig, remotePath) {
return path.join(backupConfig.rootPath, remotePath);
}
async function addFile(sourceFile, encryption, uploader, progressCallback) {
assert.strictEqual(typeof sourceFile, 'string');
assert.strictEqual(typeof encryption, 'object');
assert.strictEqual(typeof uploader, 'object');
assert.strictEqual(typeof progressCallback, 'function');
// make sure file can be opened for reading before we start the pipeline. otherwise, we end up with
// destinations dirs/file which are owned by root (this process id) and cannot be copied (run as normal user)
const [openError, sourceHandle] = await safe(fs.promises.open(sourceFile, 'r'));
if (openError) {
debug(`addFile: ignoring disappeared file: ${sourceFile}`);
return;
}
const sourceStream = sourceHandle.createReadStream(sourceFile, { autoClose: true });
const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds
ps.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Uploading ${sourceFile}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong
});
let pipeline = null;
if (encryption) {
const encryptStream = new EncryptStream(encryption);
pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, uploader.stream));
} else {
pipeline = safe(stream.pipeline(sourceStream, ps, uploader.stream));
}
const [error] = await safe(pipeline);
if (error && error.message.includes('ENOENT')) { // ignore error if file disappears
}
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`);
// debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`);
await uploader.finish();
}
function sync(backupConfig, remotePath, dataLayout, progressCallback, callback) {
assert.strictEqual(typeof backupConfig, 'object');
assert.strictEqual(typeof remotePath, 'string');
@@ -47,10 +87,8 @@ function sync(backupConfig, remotePath, dataLayout, progressCallback, callback)
// the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB
const concurrency = backupConfig.limits?.syncConcurrency || (backupConfig.provider === 's3' ? 20 : 10);
const removeDir = util.callbackify(storage.api(backupConfig.provider).removeDir);
const remove = util.callbackify(storage.api(backupConfig.provider).remove);
syncer.sync(dataLayout, function processTask(task, iteratorCallback) {
syncer.sync(dataLayout, async function processTask(task) {
debug('sync: processing task: %j', task);
// the empty task.path is special to signify the directory
const destPath = task.path && backupConfig.encryptedFilenames ? hush.encryptFilePath(task.path, backupConfig.encryption) : task.path;
@@ -58,37 +96,19 @@ function sync(backupConfig, remotePath, dataLayout, progressCallback, callback)
if (task.operation === 'removedir') {
debug(`Removing directory ${backupFilePath}`);
return removeDir(backupConfig, backupFilePath, progressCallback, iteratorCallback);
await storage.api(backupConfig.provider).removeDir(backupConfig, backupFilePath, progressCallback);
} else if (task.operation === 'remove') {
debug(`Removing ${backupFilePath}`);
return remove(backupConfig, backupFilePath, iteratorCallback);
}
let retryCount = 0;
async.retry({ times: 5, interval: 20000 }, function (retryCallback) {
retryCallback = once(retryCallback); // protect again upload() erroring much later after read stream error
++retryCount;
if (task.operation === 'add') {
await storage.api(backupConfig.provider).remove(backupConfig, backupFilePath);
} else if (task.operation === 'add') {
await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => {
progressCallback({ message: `Adding ${task.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
debug(`Adding ${task.path} position ${task.position} try ${retryCount}`);
const stream = hush.createReadStream(dataLayout.toLocalPath('./' + task.path), backupConfig.encryption);
stream.on('error', (error) => retryCallback(error.message.includes('ENOENT') ? null : error)); // ignore error if file disappears
stream.on('progress', function (progress) {
const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024);
if (!transferred && !speed) return progressCallback({ message: `Uploading ${task.path}` }); // 0M@0MBps looks wrong
progressCallback({ message: `Uploading ${task.path}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong
});
// only create the destination path when we have confirmation that the source is available. otherwise, we end up with
// files owned as 'root' and the cp later will fail
stream.on('open', function () {
storage.api(backupConfig.provider).upload(backupConfig, backupFilePath, stream, function (error) {
debug(error ? `Error uploading ${task.path} try ${retryCount}: ${error.message}` : `Uploaded ${task.path}`);
retryCallback(error);
});
});
}
}, iteratorCallback);
const uploader = await storage.api(backupConfig.provider).upload(backupConfig, backupFilePath);
await addFile(dataLayout.toLocalPath('./' + task.path), backupConfig.encryption, uploader, progressCallback);
});
}
}, concurrency, function (error) {
if (error) return callback(new BoxError(BoxError.EXTERNAL_ERROR, error.message));
@@ -205,7 +225,7 @@ function downloadDir(backupConfig, backupFilePath, dataLayout, progressCallback,
progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` });
const [pipelineError] = await safe(stream.promises.pipeline(streams));
const [pipelineError] = await safe(stream.pipeline(streams));
if (pipelineError) {
progressCallback({ message: `Download error ${entry.fullPath} to ${destFilePath}: ${pipelineError.message}` });
throw pipelineError;