diff --git a/src/backupformat/rsync.js b/src/backupformat/rsync.js index dab38c030..443df3bd3 100644 --- a/src/backupformat/rsync.js +++ b/src/backupformat/rsync.js @@ -41,7 +41,10 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: `Uploading ${sourceFile}` }); // 0M@0MBps looks wrong - progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong + progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); + }); + ps.on('heartbeat', function ({ elapsed, transferred }) { + progressCallback({ message: `Still uploading ${sourceFile} (${elapsed}s, ${Math.round(transferred/1024/1024)}M)` }); }); // careful not to have async code between here and pipeline() for 'error' handling @@ -57,7 +60,7 @@ async function addFile(sourceFile, encryption, uploader, progressCallback) { } const [error] = await safe(pipelinePromise); - if (error && !error.message.includes('ENOENT')) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); // ignore error if file disappears + if (error && !error.message.includes('ENOENT')) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error for ${sourceFile}: ${error.message}`); // ignore error if file disappears // debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`); await uploader.finish(); @@ -174,28 +177,35 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) { destPathIntegrityMap.set(destPath, integrity); } + let completedAdds = 0; + function reportUploadProgress(detail) { + const summary = `Uploaded ${completedAdds}/${addQueue.length} files`; + progressCallback({ message: detail ? `${summary}. ${detail}` : summary }); + } + async function processSyncerChange(change) { // the empty task.path is special to signify the directory const destPath = change.path && backupSite.encryption?.encryptedFilenames ? hush.encryptFilePath(change.path, backupSite.encryption) : change.path; const fullPath = path.join(remotePath, destPath); if (change.operation === 'removedir') { - debug(`sync: removing directory ${fullPath}`); await backupSites.storageApi(backupSite).removeDir(backupSite.config, backupSite.limits, fullPath, progressCallback); } else if (change.operation === 'remove') { - debug(`sync: removing ${fullPath}`); await backupSites.storageApi(backupSite).remove(backupSite.config, fullPath); } else if (change.operation === 'add') { await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => { - progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') }); - debug(`sync: adding ${change.path} position ${change.position} try ${retryCount}`); + reportUploadProgress(`Current: ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '')); + if (retryCount > 1) debug(`sync: retrying ${change.path} position ${change.position} try ${retryCount}`); const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, backupSite.limits, fullPath); - const result = await addFile(dataLayout.toLocalPath('./' + change.path), backupSite.encryption, uploader, progressCallback); + const result = await addFile(dataLayout.toLocalPath('./' + change.path), backupSite.encryption, uploader, (progress) => { + reportUploadProgress(progress.message); + }); integrityMap.set(change.path, result.integrity); // .integrity can be null when file disappeared on us destPathIntegrityMap.set(destPath, result.integrity); aggregatedStats.transferred += result.stats.transferred; aggregatedStats.size += result.stats.transferred; }); + ++completedAdds; } } @@ -207,6 +217,8 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) { debug('sync: done processing adds. error: %o', addError); if (addError) throw addError; + progressCallback({ message: `Uploaded ${completedAdds} files` }); + await syncer.finalize(integrityMap, cacheFile); return { @@ -225,19 +237,21 @@ async function downloadDir(backupSite, remotePath, dataLayout, progressCallback) debug(`downloadDir: ${remotePath} to ${dataLayout.toString()}. encryption filenames: ${encryptedFilenames}. encrypted files: ${!!backupSite.encryption}`); + let completedFiles = 0, totalFiles = 0; let lastProgressTime = 0; - function throttledProgressCallback(progress) { + function reportProgress(detail) { const now = Date.now(); if (now - lastProgressTime < 5000) return; lastProgressTime = now; - progressCallback(progress); + const summary = `Downloaded ${completedFiles}/${totalFiles} files`; + progressCallback({ message: detail ? `${summary}. ${detail}` : summary }); } async function downloadFile(entry) { let relativePath = path.relative(remotePath, entry.path); if (encryptedFilenames) { const { error, result } = hush.decryptFilePath(relativePath, backupSite.encryption); - if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file'); + if (error) throw new BoxError(BoxError.CRYPTO_ERROR, `Unable to decrypt file ${entry.path}`); relativePath = result; } const destFilePath = dataLayout.toLocalPath('./' + relativePath); @@ -246,7 +260,7 @@ async function downloadDir(backupSite, remotePath, dataLayout, progressCallback) if (mkdirError) throw new BoxError(BoxError.FS_ERROR, mkdirError.message); await promiseRetry({ times: 3, interval: 20000 }, async function () { - throttledProgressCallback({ message: `Downloading ${entry.path} to ${destFilePath}` }); + reportProgress(`Current: ${relativePath}`); const [downloadError, sourceStream] = await safe(backupSites.storageApi(backupSite).download(backupSite.config, entry.path)); if (downloadError) { @@ -257,8 +271,10 @@ async function downloadDir(backupSite, remotePath, dataLayout, progressCallback) const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); - if (!transferred && !speed) return throttledProgressCallback({ message: `Downloading ${entry.path}` }); // 0M@0MBps looks wrong - throttledProgressCallback({ message: `Downloading ${entry.path}: ${transferred}M@${speed}MBps` }); + if (transferred || speed) reportProgress(`Current: ${relativePath} ${transferred}M@${speed}MBps`); + }); + ps.on('heartbeat', function ({ elapsed, transferred }) { + progressCallback({ message: `Still downloading ${relativePath} (${elapsed}s, ${Math.round(transferred/1024/1024)}M)` }); }); const destStream = fs.createWriteStream(destFilePath); @@ -278,6 +294,8 @@ async function downloadDir(backupSite, remotePath, dataLayout, progressCallback) throw pipelineError; } }); + + ++completedFiles; } // https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441 @@ -285,10 +303,13 @@ async function downloadDir(backupSite, remotePath, dataLayout, progressCallback) let marker = null; while (true) { const batch = await backupSites.storageApi(backupSite).listDir(backupSite.config, remotePath, marker === null ? 1 : 1000, marker); // try with one file first. if that works out, we continue faster + totalFiles += batch.entries.length; await async.eachLimit(batch.entries, concurrency, downloadFile); if (!batch.marker) break; marker = batch.marker; } + + progressCallback({ message: `Downloaded ${completedFiles} files` }); } async function download(backupSite, remotePath, dataLayout, progressCallback) { diff --git a/src/backupformat/tgz.js b/src/backupformat/tgz.js index 78ca7efdf..3b5fcaa3e 100644 --- a/src/backupformat/tgz.js +++ b/src/backupformat/tgz.js @@ -66,7 +66,6 @@ function addEntryToPack(pack, header, options) { debug(`addToPack: error adding ${header.name} ${header.type} ${error.message}`); reject(new BoxError(BoxError.FS_ERROR, error.message)); } else { - debug(`addToPack: added ${header.name} ${header.type}`); resolve(); } })); @@ -91,7 +90,6 @@ async function addPathToPack(pack, localPath, dataLayout) { while (queue.length) { // if (pack.destroyed || outStream.destroyed) break; const dir = queue.shift(); - debug(`tarPack: processing ${dir}`); const [readdirError, entries] = await safe(fs.promises.readdir(dir, { withFileTypes: true })); if (!entries) { debug(`tarPack: skipping directory ${dir}: ${readdirError.message}`); @@ -146,6 +144,9 @@ async function tarPack(dataLayout, encryption, uploader, progressCallback) { if (!transferred && !speed) return progressCallback({ message: 'Uploading backup' }); // 0M@0MBps looks wrong progressCallback({ message: `Uploading backup ${transferred}M@${speed}MBps` }); }); + ps.on('heartbeat', function ({ elapsed, transferred }) { + progressCallback({ message: `Still uploading backup (${elapsed}s, ${Math.round(transferred/1024/1024)}M)` }); + }); // careful not to have async code between here and pipeline() for 'error' handling const pack = tar.pack(); @@ -166,6 +167,7 @@ async function tarPack(dataLayout, encryption, uploader, progressCallback) { if (error) break; // the pipeline will error and we will retry the whole packing all over fileCount += stats.fileCount; } + debug(`tarPack: packed ${fileCount} files`); pack.finalize(); // harmless to call if already in error state @@ -190,13 +192,14 @@ async function tarExtract(inStream, dataLayout, encryption, progressCallback) { const extract = tar.extract(); const now = new Date(); + let entryCount = 0; extract.on('entry', async function (header, entryStream, next) { if (path.isAbsolute(header.name)) { debug(`tarExtract: ignoring absolute path ${header.name}`); return next(); } + ++entryCount; const abspath = dataLayout.toLocalPath(header.name); - debug(`tarExtract: ${header.name} ${header.size} ${header.type} to ${abspath}`); let error = null; if (header.type === 'directory') { [error] = await safe(fs.promises.mkdir(abspath, { recursive: true, mode: 0o755 })); @@ -217,7 +220,7 @@ async function tarExtract(inStream, dataLayout, encryption, progressCallback) { [error] = await safe(fs.promises.lutimes(abspath, now /* atime */, header.mtime)); // for dirs, mtime will get overwritten next(error); }); - extract.on('finish', () => debug('tarExtract: extract finished')); + extract.on('finish', () => debug(`tarExtract: extracted ${entryCount} entries`)); const gunzip = zlib.createGunzip({}); const ps = new ProgressStream({ interval: 10000 }); @@ -226,6 +229,9 @@ async function tarExtract(inStream, dataLayout, encryption, progressCallback) { if (!transferred && !speed) return progressCallback({ message: 'Downloading backup' }); // 0M@0MBps looks wrong progressCallback({ message: `Downloading ${transferred}M@${speed}MBps` }); }); + ps.on('heartbeat', function ({ elapsed, transferred }) { + progressCallback({ message: `Still downloading backup (${elapsed}s, ${Math.round(transferred/1024/1024)}M)` }); + }); if (encryption) { const decrypt = new DecryptStream(encryption); diff --git a/src/progress-stream.js b/src/progress-stream.js index 824782efc..6e2de9870 100644 --- a/src/progress-stream.js +++ b/src/progress-stream.js @@ -9,15 +9,17 @@ class ProgressStream extends TransformStream { #started; #startTime; #interval; + #heartbeatInterval; constructor(options) { super(); - this.#options = Object.assign({ interval: 10 * 1000 }, options); + this.#options = Object.assign({ interval: 10 * 1000, heartbeatInterval: 60 * 1000 }, options); this.#transferred = 0; this.#delta = 0; this.#started = false; this.#startTime = null; this.#interval = null; + this.#heartbeatInterval = null; } stats() { @@ -33,10 +35,15 @@ class ProgressStream extends TransformStream { this.#delta = 0; this.emit('progress', { speed, transferred: this.#transferred }); }, this.#options.interval); + this.#heartbeatInterval = setInterval(() => { + const elapsed = Math.round((Date.now() - this.#startTime) / 1000); + this.emit('heartbeat', { elapsed, transferred: this.#transferred }); + }, this.#options.heartbeatInterval); } _stop() { clearInterval(this.#interval); + clearInterval(this.#heartbeatInterval); } _transform(chunk, encoding, callback) { diff --git a/src/storage/s3.js b/src/storage/s3.js index 86bf7d6b4..64a2a1877 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -174,16 +174,16 @@ async function upload(config, limits, remotePath) { }; const managedUpload = constants.TEST ? new globalThis.S3MockUpload(options) : new Upload(options); - managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`)); + // managedUpload.on('httpUploadProgress', (progress) => debug(`Upload progress: ${JSON.stringify(progress)}`)); uploadPromise = managedUpload.done(); return passThrough; }, async finish() { if (!uploadPromise) return; // stream was never created - const [error, data] = await safe(uploadPromise); + const [error/*,data*/] = await safe(uploadPromise); if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Upload error: code: ${error.Code} message: ${error.message}`); // sometimes message is null - debug(`Upload finished. ${JSON.stringify(data)}`); + // debug(`Upload finished. ${JSON.stringify(data)}`); // { ETag, $metadata:{httpStatusCode,requestId,attempts,totalRetryDelay},Bucket,Key} } }; } @@ -660,7 +660,7 @@ async function copyDir(config, limits, fromPath, toPath, progressCallback) { await async.eachLimit(batch.entries, concurrency, async (entry) => { const fullFromPath = path.join(config.prefix, entry.path); const fullToPath = path.join(config.prefix, toPath, path.relative(fromPath, entry.path)); - await copyInternal(config, fullFromPath, fullToPath, entry.size, progressCallback); + await copyInternal(config, fullFromPath, fullToPath, entry.size, () => {}); // swallow the progress }); if (!batch.marker) break; marker = batch.marker;