diff --git a/package.json b/package.json index 61751c4a5..db210beea 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "password-generator": "^2.0.2", "progress-stream": "^2.0.0", "proxy-middleware": "^0.13.0", + "recursive-readdir": "^2.2.1", "s3-block-read-stream": "^0.2.0", "safetydance": "^0.7.1", "semver": "^4.3.6", diff --git a/src/storage/gcs.js b/src/storage/gcs.js index 7bb56d125..2af6eb9ab 100644 --- a/src/storage/gcs.js +++ b/src/storage/gcs.js @@ -1,10 +1,13 @@ 'use strict'; exports = module.exports = { - backup: backup, - restore: restore, - copyBackup: copyBackup, - removeBackups: removeBackups, + upload: upload, + download: download, + downloadDir: downloadDir, + copy: copy, + + remove: remove, + removeDir: removeDir, backupDone: backupDone, @@ -18,12 +21,15 @@ exports = module.exports = { var assert = require('assert'), GCS = require('@google-cloud/storage'), BackupsError = require('../backups.js').BackupsError, + chunk = require('lodash.chunk'), debug = require('debug')('box:storage/gcs'), once = require('once'), PassThrough = require('stream').PassThrough, + EventEmitter = require('events'), + mkdirp = require('mkdirp'), + fs = require('fs'), path = require('path'), - async = require('async'), - targz = require('./targz.js'); + async = require('async'); // test only var originalGCS; @@ -54,111 +60,224 @@ function getBackupCredentials(backupConfig) { } return config; } - function getBucket(apiConfig) { var credentials = getBackupCredentials(apiConfig); return GCS(credentials).bucket(apiConfig.bucket); } -function getBackupFilePath(apiConfig, backupId) { - assert.strictEqual(typeof apiConfig, 'object'); - assert.strictEqual(typeof backupId, 'string'); - - const FILE_TYPE = apiConfig.key ? '.tar.gz.enc' : '.tar.gz'; - - return path.join(apiConfig.prefix, backupId.endsWith(FILE_TYPE) ? backupId : backupId+FILE_TYPE); -} - // storage api -function backup(apiConfig, backupId, sourceDir, callback) { + + +function upload(apiConfig, backupFilePath, sourceStream, callback) { assert.strictEqual(typeof apiConfig, 'object'); - assert.strictEqual(typeof backupId, 'string'); - assert.strictEqual(typeof sourceDir, 'string'); + assert.strictEqual(typeof backupFilePath, 'string'); + assert.strictEqual(typeof sourceStream, 'object'); assert.strictEqual(typeof callback, 'function'); - callback = once(callback); + function done(error) { + if (error) { + debug('[%s] upload: gcp upload error.', backupFilePath, error); + return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, 'Error uploading ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}')); + } - var backupFilePath = getBackupFilePath(apiConfig, backupId); + callback(null); + } - debug('[%s] backup: %s -> %s', backupId, sourceDir, backupFilePath); - - var bucket = getBucket(apiConfig); - var uploadingFile = bucket.file(backupFilePath); - - var uploadStream = uploadingFile.createWriteStream({resumable: false}) - .on('finish', callback.bind(null, null)) - .on('error', function(e){ - if (e) callback(new BackupsError(BackupsError.EXTERNAL_ERROR, e.message)); - }) - ; - targz.create([{ source: sourceDir, destination: '.' }], apiConfig.key || null, uploadStream, callback); - return uploadStream; + return sourceStream.pipe( + getBucket(apiConfig) + .file(backupFilePath) + .createWriteStream({resumable: false}) + .on('finish', done) + .on('error', function(e){ + if (e) done(e); + }) + ); } -function restore(apiConfig, backupId, destination, callback) { +function download(apiConfig, backupFilePath, callback) { assert.strictEqual(typeof apiConfig, 'object'); - assert.strictEqual(typeof backupId, 'string'); - assert.strictEqual(typeof destination, 'string'); + assert.strictEqual(typeof backupFilePath, 'string'); assert.strictEqual(typeof callback, 'function'); - callback = once(callback); - - var backupFilePath = getBackupFilePath(apiConfig, backupId); - - debug('[%s] restore: %s -> %s', backupId, backupFilePath, destination); - var file = getBucket(apiConfig).file(backupFilePath); + var ps = new PassThrough(); var readStream = file.createReadStream() - .on('error', function(e){ - if (e && e.code == 404) return callback(new BackupsError(BackupsError.NOT_FOUND, e)); - callback(new BackupsError(BackupsError.EXTERNAL_ERROR, e)); + .on('error', function(error){ + if (error && error.code == 404){ + ps.emit('error', new BackupsError(BackupsError.NOT_FOUND)); + } else { + debug('[%s] download: gcp stream error.', backupFilePath, error); + ps.emit('error', new BackupsError(BackupsError.EXTERNAL_ERROR, error)); + } }) ; - targz.extract(readStream, destination, apiConfig.key || null, callback); + readStream.pipe(ps); + + callback(null, ps); } -function copyBackup(apiConfig, oldBackupId, newBackupId, callback) { - assert.strictEqual(typeof apiConfig, 'object'); - assert.strictEqual(typeof oldBackupId, 'string'); - assert.strictEqual(typeof newBackupId, 'string'); - assert.strictEqual(typeof callback, 'function'); - +function listDir(apiConfig, backupFilePath, batchSize, iteratorCallback, callback){ var bucket = getBucket(apiConfig); - bucket - .file(getBackupFilePath(apiConfig, oldBackupId)) - .copy(getBackupFilePath(apiConfig, newBackupId), function(error, newFile, apiResponse){ - if (error && error.code == 404) return callback(new BackupsError(BackupsError.NOT_FOUND, 'Old backup not found')); + var query = {prefix: backupFilePath, maxResults: batchSize, autoPaginate: false}; + + async.forever(function listAndDownload(foreverCallback) { + bucket.getFiles(query, function (error, files, nextQuery) { if (error) { - debug('copyBackup: gcs copy error.', e); - return callback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + debug('remove: Failed to list %s. Not fatal.', error); + return foreverCallback(error); } + var arr = batchSize === 1 ? files : chunk(files, batchSize); + if (arr.length === 0) return foreverCallback(new Error('Done')); + + debug('emitting '+arr.length+' files found'); + iteratorCallback(arr, function (error) { + if (error) return foreverCallback(error); + + if (arr.length < batchSize) return foreverCallback(new Error('Done')); + + query = nextQuery; + foreverCallback(); + }); + }); + }, function (error) { + if (error.message === 'Done') return callback(null); + + callback(error); + }); +} + +function downloadDir(apiConfig, backupFilePath, destDir) { + assert.strictEqual(typeof apiConfig, 'object'); + assert.strictEqual(typeof backupFilePath, 'string'); + assert.strictEqual(typeof destDir, 'string'); + + var events = new EventEmitter(); + var total = 0; + + function downloadFile(file, iteratorCallback) { + var relativePath = path.relative(backupFilePath, file.name); + + events.emit('progress', 'Downloading ${relativePath}'); + + mkdirp(path.dirname(path.join(destDir, relativePath)), function (error) { + if (error) return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + + download(apiConfig, file.name, function (error, sourceStream) { + if (error) return iteratorCallback(error); + + var destStream = fs.createWriteStream(path.join(destDir, relativePath)); + + destStream.on('open', function () { + sourceStream.pipe(destStream); + }); + + destStream.on('error', function (error) { + return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + }); + + destStream.on('finish', iteratorCallback); + }); + }); + } + + const concurrency = 10, batchSize = 1; + + listDir(apiConfig, backupFilePath, batchSize, function (objects, done) { + total += objects.length; + async.eachLimit(objects, concurrency, downloadFile, done); + }, function (error) { + events.emit('progress', 'Downloaded ${total} files'); + + events.emit('done', error); + }); + + return events; +} + +function copy(apiConfig, oldFilePath, newFilePath) { + assert.strictEqual(typeof apiConfig, 'object'); + assert.strictEqual(typeof oldFilePath, 'string'); + assert.strictEqual(typeof newFilePath, 'string'); + + var events = new EventEmitter(), retryCount = 0; + + function copyFile(file, iteratorCallback){ + + var relativePath = path.relative(oldFilePath, file.name); + + file.copy(path.join(newFilePath, relativePath), function(error, newFile, apiResponse){ + if (error && error.code == 404) return iteratorCallback(new BackupsError(BackupsError.NOT_FOUND, 'Old backup not found')); + if (error) { + debug('copyBackup: gcs copy error', error); + return iteratorCallback(new BackupsError(BackupsError.EXTERNAL_ERROR, error.message)); + } + iteratorCallback(null); + }); + + events.emit('progress', 'Copying (multipart) ${relativePath}'); + } + + const batchSize = 1; + var total = 0, concurrency = 4; + + listDir(apiConfig, oldFilePath, batchSize, function (files, done) { + total += files.length; + + if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5); + events.emit('progress', '${retryCount} errors. concurrency set to ${concurrency}'); + retryCount = 0; + + async.eachLimit(files, concurrency, copyFile, done); + }, function (error) { + events.emit('progress', 'Copied ${total} files'); + + events.emit('done', error); + }); + + return events; +} + +function remove(apiConfig, filename, callback) { + assert.strictEqual(typeof apiConfig, 'object'); + assert.strictEqual(typeof filename, 'string'); + assert.strictEqual(typeof callback, 'function'); + + getBucket(apiConfig) + .file(filename) + .delete(function(e){ + if (e) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filename, e.message); + else debug('removeBackups: Deleted: %s', filename); callback(null); }); } -function removeBackups(apiConfig, backupIds, callback) { +function removeDir(apiConfig, pathPrefix) { assert.strictEqual(typeof apiConfig, 'object'); - assert(Array.isArray(backupIds)); - assert.strictEqual(typeof callback, 'function'); + assert.strictEqual(typeof pathPrefix, 'string'); - var bucket = getBucket(apiConfig); + var events = new EventEmitter(), retryCount = 0; - var removeQueue = []; - backupIds.forEach(function (backupId) { - removeQueue.push(function(cb){ - var filePath = getBackupFilePath(apiConfig, backupId); - bucket.file(filePath).delete(function(e){ - if (e) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filePath, e.message); - else debug('removeBackups: Deleted: %s', filePath); - cb(typeof e == 'undefined'); - }); - }); + const batchSize = 1; + var total = 0, concurrency = 4; + + listDir(apiConfig, oldFilePath, batchSize, function (files, done) { + total += files.length; + + if (retryCount === 0) concurrency = Math.min(concurrency + 1, 10); else concurrency = Math.max(concurrency - 1, 5); + events.emit('progress', '${retryCount} errors. concurrency set to ${concurrency}'); + retryCount = 0; + + async.eachLimit(files, concurrency, remove.bind(null, apiConfig), done); + }, function (error) { + events.emit('progress', 'Deleted ${total} files'); + + events.emit('done', error); }); - async.series(removeQueue, callback); + return events; } function testConfig(apiConfig, callback) { diff --git a/src/test/storage-test.js b/src/test/storage-test.js index 36cb82344..61ceb7221 100644 --- a/src/test/storage-test.js +++ b/src/test/storage-test.js @@ -14,8 +14,12 @@ var BackupsError = require('../backups.js').BackupsError, os = require('os'), path = require('path'), rimraf = require('rimraf'), + mkdirp = require('mkdirp'), + recursive_readdir = require("recursive-readdir"), s3 = require('../storage/s3.js'), - gcs = require('../storage/gcs.js'); + debug = require('debug')('box:storage-test'), + gcs = require('../storage/gcs.js'), + chunk = require('lodash.chunk'); describe('Storage', function () { describe('filesystem', function () { @@ -293,11 +297,7 @@ describe('Storage', function () { describe('gcs', function () { this.timeout(10000); - var gBackupId_1 = 'someprefix/one'; - var gBackupId_2 = 'someprefix/two'; var gTmpFolder; - var gSourceFolder; - var gDestinationFolder; var gBackupConfig = { provider: 'gcs', key: '', @@ -311,10 +311,11 @@ describe('Storage', function () { }; var GCSMockBasePath = path.join(os.tmpdir(), 'gcs-backup-test-buckets/'); - before(function (done) { + before(function () { var mockGCS = function(cfg){ return {bucket: function(b){ - return {file: function(filename){ + var file = function(filename){ + var ensurePathWritable = function (filename) { filename = GCSMockBasePath + filename; mkdirp.sync(path.dirname(filename)); @@ -322,15 +323,17 @@ describe('Storage', function () { }; return { + name: filename, createReadStream: function(cfg){ return fs.createReadStream(ensurePathWritable(filename)) .on('error', function(e){ + console.log('error createReadStream: '+filename); if (e.code != 404) { e.code = 404; this.emit('error', e); } }) - ; + ; }, createWriteStream: function(cfg){ return fs.createWriteStream(ensurePathWritable(filename)); @@ -349,100 +352,107 @@ describe('Storage', function () { .pipe(fs.createWriteStream(ensurePathWritable(dst))) .on('end', cb) .on('error', notFoundHandler) - ; + ; } }; - }} + }; + + return { + file: file, + getFiles: function(q, cb){ + var target = GCSMockBasePath + q.prefix; + recursive_readdir(target, function(e, files){ + + var pageToken = q.pageToken || 0; + + var chunkedFiles = chunk(files, q.maxResults); + if (q.pageToken >= chunkedFiles.length) return cb(null, []); + + var gFiles = chunkedFiles[pageToken].map(function(f){ + return file(path.relative(GCSMockBasePath, f)); //convert to google + }); + + q.pageToken = pageToken + 1; + cb(null, gFiles, q); + }); + } + } }}; }; gcs._mockInject(mockGCS); - - setup(function (error) { - expect(error).to.be(null); - - gTmpFolder = fs.mkdtempSync(path.join(os.tmpdir(), 'gcs-backup-test_')); - gSourceFolder = path.join(__dirname, 'storage'); - gDestinationFolder = path.join(gTmpFolder, 'destination/'); - - settings.setBackupConfig(gBackupConfig, function (error) { - expect(error).to.be(null); - - done(); - }); - }); }); after(function (done) { gcs._mockRestore(); rimraf.sync(GCSMockBasePath); - - cleanup(function (error) { - expect(error).to.be(null); - rimraf(gTmpFolder, done); - }); + done(); }); it('can backup', function (done) { - gcs.backup(gBackupConfig, gBackupId_1, gSourceFolder, function (error) { + var sourceFile = path.join(__dirname, 'storage/data/test.txt'); + var sourceStream = fs.createReadStream(sourceFile); + var destKey = 'uploadtest/test.txt'; + gcs.upload(gBackupConfig, destKey, sourceStream, function (error) { expect(error).to.be(null); done(); }); }); - it('can restore', function (done) { - gcs.restore(gBackupConfig, gBackupId_1, gDestinationFolder, function (error) { + it('can download file', function (done) { + var sourceKey = 'uploadtest/test.txt'; + gcs.download(gBackupConfig, sourceKey, function (error, stream) { expect(error).to.be(null); - - compareDirectories(path.join(gSourceFolder, 'data'), path.join(gDestinationFolder, 'data'), function (error) { - expect(error).to.equal(null); - - compareDirectories(path.join(gSourceFolder, 'addon'), path.join(gDestinationFolder, 'addon'), function (error) { - expect(error).to.equal(null); - - rimraf(gDestinationFolder, done); - }); - }); + expect(stream).to.be.an('object'); + done(); }); }); - it('can copy backup', function (done) { - // will be verified after removing the first and restoring from the copy - gcs.copyBackup(gBackupConfig, gBackupId_1, gBackupId_2, done); + it('download dir copies contents of source dir', function (done) { + var sourceFile = path.join(__dirname, 'storage/data/test.txt'); + var sourceKey = ''; + var destDir = path.join(os.tmpdir(), 'gcs-destdir'); + rimraf.sync(destDir+'/*'); + + var events = gcs.downloadDir(gBackupConfig, sourceKey, destDir); + events.on('done', function (error) { + expect(error).to.be(null); + expect(fs.statSync(path.join(destDir, 'uploadtest/test.txt')).size).to.be(fs.statSync(sourceFile).size); + done(); + }); }); - it('can remove backup', function (done) { - // will be verified with next test trying to restore the removed one - gcs.removeBackups(gBackupConfig, [ gBackupId_1 ], done); - }); + it('can copy', function (done) { + fs.writeFileSync(path.join(GCSMockBasePath, 'uploadtest/C++.gitignore'), 'special', 'utf8'); - it('cannot restore deleted backup', function (done) { - gcs.restore(gBackupConfig, gBackupId_1, gDestinationFolder, function (error) { - expect(error).to.be.an('object'); - expect(error.reason).to.equal(BackupsError.NOT_FOUND); + var sourceKey = 'uploadtest'; + + var events = gcs.copy(gBackupConfig, sourceKey, 'uploadtest-copy'); + events.on('done', function (error) { + var sourceFile = path.join(__dirname, 'storage/data/test.txt'); + expect(error).to.be(null); + expect(fs.statSync(path.join(GCSMockBasePath, 'uploadtest-copy/test.txt')).size).to.be(fs.statSync(sourceFile).size); + + expect(fs.statSync(path.join(GCSMockBasePath, 'uploadtest-copy/C++.gitignore')).size).to.be(7); done(); }); }); - it('can restore backup copy', function (done) { - gcs.restore(gBackupConfig, gBackupId_2, gDestinationFolder, function (error) { + it('can remove file', function (done) { + gcs.remove(gBackupConfig, 'uploadtest-copy/test.txt', function (error) { expect(error).to.be(null); - - compareDirectories(path.join(gSourceFolder, 'data'), path.join(gDestinationFolder, 'data'), function (error) { - expect(error).to.equal(null); - - compareDirectories(path.join(gSourceFolder, 'addon'), path.join(gDestinationFolder, 'addon'), function (error) { - expect(error).to.equal(null); - - rimraf(gDestinationFolder, done); - }); - }); + expect(fs.existsSync(path.join(GCSMockBasePath, 'uploadtest-copy/test.txt'))).to.be(false); + done(); }); }); - it('can remove backup copy', function (done) { - gcs.removeBackups(gBackupConfig, [ gBackupId_2 ], done); + it('can remove non-existent dir', function (done) { + gcs.remove(gBackupConfig, 'blah', function (error) { + expect(error).to.be(null); + done(); + }); }); + }); });