diff --git a/src/storage/s3-block-read-stream.js b/src/storage/s3-block-read-stream.js deleted file mode 100644 index f3a0f01a7..000000000 --- a/src/storage/s3-block-read-stream.js +++ /dev/null @@ -1,110 +0,0 @@ -'use strict'; - -// initial code from https://github.com/tilfin/s3-block-read-stream/blob/master/LICENSE (MIT) - -const Readable = require('stream').Readable; -const util = require('util'); - -function S3ReadStream(s3, params, options) { - if (!(this instanceof S3ReadStream)) - return new S3ReadStream(s3, params, options); - - const opts = options || {}; - this._s3 = s3; - this._params = params; - this._readSize = 0; - this._fileSize = -1; - this._path = params.Bucket + '/' + params.Key; - - this._interval = opts.interval || 0; // msec - delete opts.interval; - this._blockSize = opts.blockSize || 64 * 1048576; //MB - delete opts.blockSize; - this._log = opts.logCallback || (opts.debug ? function(msg) { console.warn(msg); } : function(){}); - delete opts.logCallback; - - Readable.call(this, opts); -} -util.inherits(S3ReadStream, Readable); -S3ReadStream.prototype._read = function() { - if (this._readSize === this._fileSize) { - this._done(); - } else if (this._readSize) { - setTimeout(() => { - this._nextDownload(); - }, this._interval); - } else { - this._log(`${this._path} - Start`); - this._fetchSize(); - } -}; - -S3ReadStream.prototype._fetchSize = function() { - const params = {}; - for (var key in this._params) { - if (!key.match(/^Response/)) { - params[key] = this._params[key]; - } - } - - this._s3.headObject(params, (err, data) => { - if (err) { - process.nextTick(() => this.emit('error', err)); - return; - } - - const reslen = parseInt(data.ContentLength, 10); - this._log(`${this._path} - File Size: ${reslen}`); - - if (reslen > 0) { - this._fileSize = reslen; - this._nextDownload(); - } else { - this._done(); - } - }); -}; - -S3ReadStream.prototype._downloadRange = function(offset, length) { - const params = Object.assign({}, this._params); - const lastPos = offset + length - 1; - const range = 'bytes=' + offset + '-' + lastPos; - params['Range'] = range; - - this._log(`${this._path} - Download Range: ${range}`); - - this._s3.getObject(params, (err, data) => { - if (err) { - process.nextTick(() => this.emit('error', err)); - return; - } - - const reslen = parseInt(data.ContentLength, 10); - this._log(`${this._path} - Received Size: ${reslen}`); - - if (reslen > 0) { - this._readSize += reslen; - this.push(data.Body); - } else { - this._done(); - } - }); -}; - -S3ReadStream.prototype._nextDownload = function() { - let len = 0; - if (this._readSize + this._blockSize < this._fileSize) { - len = this._blockSize; - } else { - len = this._fileSize - this._readSize; - } - this._downloadRange(this._readSize, len); -}; - -S3ReadStream.prototype._done = function() { - this._readSize = 0; - this.push(null); - this._log(`${this._path} - Done`); -}; - -module.exports = S3ReadStream; diff --git a/src/storage/s3.js b/src/storage/s3.js index 07eb822b8..a9ecb0705 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -35,9 +35,8 @@ const assert = require('assert'), debug = require('debug')('box:storage/s3'), EventEmitter = require('events'), https = require('https'), - PassThrough = require('stream').PassThrough, path = require('path'), - S3BlockReadStream = require('./s3-block-read-stream.js'), + Readable = require('stream').Readable, safe = require('safetydance'), util = require('util'), _ = require('underscore'); @@ -173,6 +172,87 @@ async function exists(apiConfig, backupFilePath) { } } +// Download the object in small parts. By downloading small parts, we reduce the chance of sporadic network errors when downloading large objects +// We can retry each part individually, but we haven't had the need for this yet +class S3MultipartDownloadStream extends Readable { + constructor (s3, params, options) { + super(options); + this._s3 = s3; + this._params = params; + this._readSize = 0; + this._fileSize = -1; + this._path = params.Bucket + '/' + params.Key; + + this._blockSize = options.blockSize || 64 * 1048576; // MB + } + + _done() { + this._readSize = 0; + this.push(null); // EOF + } + + _handleError(error) { + if (S3_NOT_FOUND(error)) { + this.destroy(new BoxError(BoxError.NOT_FOUND, `Backup not found: ${this._path}`)); + } else { + debug(`download: ${this._path} s3 stream error.`, error); + this.destroy(new BoxError(BoxError.EXTERNAL_ERROR, `Error multipartDownload ${this._path}. Message: ${error.message} HTTP Code: ${error.code}`)); + } + } + + _downloadRange(offset, length) { + const params = Object.assign({}, this._params); + const lastPos = offset + length - 1; + const range = `bytes=${offset}-${lastPos}`; + params['Range'] = range; + + this._s3.getObject(params, (error, data) => { + if (error) return this._handleError(error); + + const length = parseInt(data.ContentLength, 10); + + if (length > 0) { + this._readSize += length; + this.push(data.Body); + } else { + this._done(); + } + }); + } + + _nextDownload() { + let len = 0; + if (this._readSize + this._blockSize < this._fileSize) { + len = this._blockSize; + } else { + len = this._fileSize - this._readSize; + } + this._downloadRange(this._readSize, len); + } + + _fetchSize() { + this._s3.headObject(this._params, (error, data) => { + if (error) return this._handleError(error); + + const length = parseInt(data.ContentLength, 10); + + if (length > 0) { + this._fileSize = length; + this._nextDownload(); + } else { + this._done(); + } + }); + } + + _read() { + if (this._readSize === this._fileSize) return this._done(); + if (this._readSize === 0) return this._fetchSize(); + + this._nextDownload(); + } +} + function download(apiConfig, backupFilePath, callback) { assert.strictEqual(typeof apiConfig, 'object'); assert.strictEqual(typeof backupFilePath, 'string'); @@ -187,21 +267,8 @@ function download(apiConfig, backupFilePath, callback) { const s3 = new aws.S3(credentials); - const ps = new PassThrough(); - const multipartDownload = new S3BlockReadStream(s3, params, { blockSize: 64 * 1024 * 1024 /*, logCallback: debug */ }); - - multipartDownload.on('error', function (error) { - if (S3_NOT_FOUND(error)) { - ps.emit('error', new BoxError(BoxError.NOT_FOUND, `Backup not found: ${backupFilePath}`)); - } else { - debug(`download: ${apiConfig.bucket}:${backupFilePath} s3 stream error.`, error); - ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, `Error multipartDownload ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`)); - } - }); - - multipartDownload.pipe(ps); - - callback(null, ps); + const multipartDownloadStream = new S3MultipartDownloadStream(s3, params, { blockSize: 64 * 1024 * 1024 }); + return callback(null, multipartDownloadStream); } function listDir(apiConfig, dir, batchSize, iteratorCallback, callback) {