'use strict'; exports = module.exports = { download, upload, getFileExtension, _saveFsMetadata: saveFsMetadata, _restoreFsMetadata: restoreFsMetadata }; const assert = require('assert'), async = require('async'), backupTargets = require('../backuptargets.js'), BoxError = require('../boxerror.js'), DataLayout = require('../datalayout.js'), { DecryptStream } = require('../hush.js'), debug = require('debug')('box:backupformat/rsync'), { EncryptStream } = require('../hush.js'), fs = require('fs'), hush = require('../hush.js'), path = require('path'), paths = require('../paths.js'), ProgressStream = require('../progress-stream.js'), promiseRetry = require('../promise-retry.js'), safe = require('safetydance'), shell = require('../shell.js')('backupformat/rsync'), stream = require('stream/promises'), syncer = require('../syncer.js'); async function addFile(sourceFile, encryption, uploader, progressCallback) { assert.strictEqual(typeof sourceFile, 'string'); assert.strictEqual(typeof encryption, 'object'); assert.strictEqual(typeof uploader, 'object'); assert.strictEqual(typeof progressCallback, 'function'); // make sure file can be opened for reading before we start the pipeline. otherwise, we end up with // destinations dirs/file which are owned by root (this process id) and cannot be copied (run as normal user) const [openError, sourceHandle] = await safe(fs.promises.open(sourceFile, 'r')); if (openError) { debug(`addFile: ignoring disappeared file: ${sourceFile}`); return; } const sourceStream = sourceHandle.createReadStream(sourceFile, { autoClose: true }); const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: `Uploading ${sourceFile}` }); // 0M@0MBps looks wrong progressCallback({ message: `Uploading ${sourceFile}: ${transferred}M@${speed}MBps` }); // 0M@0MBps looks wrong }); let pipeline = null; if (encryption) { const encryptStream = new EncryptStream(encryption); pipeline = safe(stream.pipeline(sourceStream, encryptStream, ps, uploader.stream)); } else { pipeline = safe(stream.pipeline(sourceStream, ps, uploader.stream)); } const [error] = await safe(pipeline); if (error && error.message.includes('ENOENT')) { // ignore error if file disappears } if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `tarPack pipeline error: ${error.message}`); // debug(`addFile: pipeline finished: ${JSON.stringify(ps.stats())}`); await uploader.finish(); } async function processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback) { debug('sync: processing task: %j', change); // the empty task.path is special to signify the directory const destPath = change.path && backupTarget.encryption?.encryptedFilenames ? hush.encryptFilePath(change.path, backupTarget.encryption) : change.path; const backupFilePath = path.join(backupTargets.getBackupFilePath(backupTarget, remotePath), destPath); if (change.operation === 'removedir') { debug(`Removing directory ${backupFilePath}`); await backupTargets.storageApi(backupTarget).removeDir(backupTarget.config, backupFilePath, progressCallback); } else if (change.operation === 'remove') { debug(`Removing ${backupFilePath}`); await backupTargets.storageApi(backupTarget).remove(backupTarget.config, backupFilePath); } else if (change.operation === 'add') { await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => { progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') }); debug(`Adding ${change.path} position ${change.position} try ${retryCount}`); const uploader = await backupTargets.storageApi(backupTarget).upload(backupTarget.config, backupFilePath); await addFile(dataLayout.toLocalPath('./' + change.path), backupTarget.encryption, uploader, progressCallback); }); } } async function sync(backupTarget, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupTarget, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof progressCallback, 'function'); // the number here has to take into account the s3.upload partSize (which is 10MB). So 20=200MB const concurrency = backupTarget.limits?.syncConcurrency || (backupTarget.provider === 's3' ? 20 : 10); const cacheFile = path.join(paths.BACKUP_INFO_DIR, backupTarget.id, `${dataLayout.getBasename()}.sync.cache`); const changes = await syncer.sync(dataLayout, { cacheFile }); debug(`sync: processing ${changes.delQueue.length} deletes and ${changes.addQueue.length} additions`); const [delError] = await safe(async.eachLimit(changes.delQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback))); debug('sync: done processing deletes. error: %o', delError); if (delError) throw delError; const [addError] = await safe(async.eachLimit(changes.addQueue, concurrency, async (change) => await processSyncerChange(change, backupTarget, remotePath, dataLayout, progressCallback))); debug('sync: done processing adds. error: %o', addError); if (addError) throw addError; await syncer.finalize(changes); } // this is not part of 'snapshotting' because we need root access to traverse async function saveFsMetadata(dataLayout, metadataFile) { assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof metadataFile, 'string'); // contains paths prefixed with './' const metadata = { emptyDirs: [], execFiles: [], symlinks: [] }; // we assume small number of files. spawnSync will raise a ENOBUFS error after maxBuffer for (const lp of dataLayout.localPaths()) { const [emptyDirsError, emptyDirs] = await safe(shell.spawn('find', [lp, '-type', 'd', '-empty'], { encoding: 'utf8', maxLines: 50000 })); if (emptyDirsError && emptyDirsError.stdoutLineCount >= 50000) throw new BoxError(BoxError.FS_ERROR, `Too many empty directories. Run "find ${lp} -type d -empty" to investigate`); if (emptyDirsError) throw emptyDirsError; if (emptyDirs.length) metadata.emptyDirs = metadata.emptyDirs.concat(emptyDirs.trim().split('\n').map((ed) => dataLayout.toRemotePath(ed))); const [execFilesError, execFiles] = await safe(shell.spawn('find', [lp, '-type', 'f', '-executable'], { encoding: 'utf8', maxLines: 20000 })); if (execFilesError && execFilesError.stdoutLineCount >= 20000) throw new BoxError(BoxError.FS_ERROR, `Too many executable files. Run "find ${lp} -type f -executable" to investigate`); if (execFilesError) throw execFilesError; if (execFiles.length) metadata.execFiles = metadata.execFiles.concat(execFiles.trim().split('\n').map((ef) => dataLayout.toRemotePath(ef))); const [symlinkFilesError, symlinkFiles] = await safe(shell.spawn('find', [lp, '-type', 'l'], { encoding: 'utf8', maxLines: 20000 })); if (symlinkFilesError && symlinkFilesError.stdoutLineCount >= 20000) throw new BoxError(BoxError.FS_ERROR, `Too many symlinks. Run "find ${lp} -type l" to investigate`); if (symlinkFilesError) throw symlinkFilesError; if (symlinkFiles.length) metadata.symlinks = metadata.symlinks.concat(symlinkFiles.trim().split('\n').map((sl) => { const target = safe.fs.readlinkSync(sl); return { path: dataLayout.toRemotePath(sl), target }; })); } if (!safe.fs.writeFileSync(metadataFile, JSON.stringify(metadata, null, 4))) throw new BoxError(BoxError.FS_ERROR, `Error writing fs metadata: ${safe.error.message}`); } async function restoreFsMetadata(dataLayout, metadataFile) { assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof metadataFile, 'string'); debug(`Recreating empty directories in ${dataLayout.toString()}`); const metadataJson = safe.fs.readFileSync(metadataFile, 'utf8'); if (metadataJson === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error loading fsmetadata.json:' + safe.error.message); const metadata = safe.JSON.parse(metadataJson); if (metadata === null) throw new BoxError(BoxError.EXTERNAL_ERROR, 'Error parsing fsmetadata.json:' + safe.error.message); for (const emptyDir of metadata.emptyDirs) { const [mkdirError] = await safe(fs.promises.mkdir(dataLayout.toLocalPath(emptyDir), { recursive: true })); if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to create path: ${mkdirError.message}`); } for (const execFile of metadata.execFiles) { const [chmodError] = await safe(fs.promises.chmod(dataLayout.toLocalPath(execFile), parseInt('0755', 8))); if (chmodError) throw new BoxError(BoxError.FS_ERROR, `unable to chmod: ${chmodError.message}`); } for (const symlink of (metadata.symlinks || [])) { if (!symlink.target) continue; // the path may not exist if we had a directory full of symlinks const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(dataLayout.toLocalPath(symlink.path)), { recursive: true })); if (mkdirError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink (mkdir): ${mkdirError.message}`); const [symlinkError] = await safe(fs.promises.symlink(symlink.target, dataLayout.toLocalPath(symlink.path), 'file')); if (symlinkError) throw new BoxError(BoxError.FS_ERROR, `unable to symlink: ${symlinkError.message}`); } } async function downloadDir(backupTarget, backupFilePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupTarget, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof progressCallback, 'function'); const encryptedFilenames = backupTarget.encryption?.encryptedFilenames || false; debug(`downloadDir: ${backupFilePath} to ${dataLayout.toString()}. encryption filenames: ${encryptedFilenames} content: ${!!backupTarget.encryption}`); async function downloadFile(entry) { let relativePath = path.relative(backupFilePath, entry.fullPath); if (encryptedFilenames) { const { error, result } = hush.decryptFilePath(relativePath, backupTarget.encryption); if (error) throw new BoxError(BoxError.CRYPTO_ERROR, 'Unable to decrypt file'); relativePath = result; } const destFilePath = dataLayout.toLocalPath('./' + relativePath); const [mkdirError] = await safe(fs.promises.mkdir(path.dirname(destFilePath), { recursive: true })); if (mkdirError) throw new BoxError(BoxError.FS_ERROR, mkdirError.message); await promiseRetry({ times: 3, interval: 20000 }, async function () { const [downloadError, sourceStream] = await safe(backupTargets.storageApi(backupTarget).download(backupTarget.config, entry.fullPath)); if (downloadError) { progressCallback({ message: `Download ${entry.fullPath} to ${destFilePath} errored: ${downloadError.message}` }); throw downloadError; } const ps = new ProgressStream({ interval: 10000 }); // display a progress every 10 seconds ps.on('progress', function (progress) { const transferred = Math.round(progress.transferred/1024/1024), speed = Math.round(progress.speed/1024/1024); if (!transferred && !speed) return progressCallback({ message: `Downloading ${entry.fullPath}` }); // 0M@0MBps looks wrong progressCallback({ message: `Downloading ${entry.fullPath}: ${transferred}M@${speed}MBps` }); }); const destStream = fs.createWriteStream(destFilePath); const streams = [ sourceStream, ps ]; if (backupTarget.encryption) { const decryptStream = new DecryptStream(backupTarget.encryption); streams.push(decryptStream); } streams.push(destStream); progressCallback({ message: `Downloading ${entry.fullPath} to ${destFilePath}` }); const [pipelineError] = await safe(stream.pipeline(streams)); if (pipelineError) { progressCallback({ message: `Download error ${entry.fullPath} to ${destFilePath}: ${pipelineError.message}` }); throw pipelineError; } progressCallback({ message: `Download finished ${entry.fullPath} to ${destFilePath}` }); }); } // https://www.digitalocean.com/community/questions/rate-limiting-on-spaces?answer=40441 const concurrency = backupTarget.limits?.downloadConcurrency || (backupTarget.provider === 's3' ? 30 : 10); let marker = null; while (true) { const batch = await backupTargets.storageApi(backupTarget).listDir(backupTarget.config, backupFilePath, marker === null ? 1 : 1000, marker); // try with one file first. if that works out, we continue faster await async.eachLimit(batch.entries, concurrency, downloadFile); if (!batch.marker) break; marker = batch.marker; } } async function download(backupTarget, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupTarget, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert(dataLayout instanceof DataLayout, 'dataLayout must be a DataLayout'); assert.strictEqual(typeof progressCallback, 'function'); const backupFilePath = backupTargets.getBackupFilePath(backupTarget, remotePath); debug(`download: Downloading ${backupFilePath} to ${dataLayout.toString()}`); await downloadDir(backupTarget, backupFilePath, dataLayout, progressCallback); await restoreFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`); } async function upload(backupTarget, remotePath, dataLayout, progressCallback) { assert.strictEqual(typeof backupTarget, 'object'); assert.strictEqual(typeof remotePath, 'string'); assert.strictEqual(typeof dataLayout, 'object'); assert.strictEqual(typeof progressCallback, 'function'); await saveFsMetadata(dataLayout, `${dataLayout.localRoot()}/fsmetadata.json`); await sync(backupTarget, remotePath, dataLayout, progressCallback); } function getFileExtension(encryption) { assert.strictEqual(typeof encryption, 'boolean'); return ''; }