diff --git a/package-lock.json b/package-lock.json index 14ea00b9c..95a439e2b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -46,7 +46,6 @@ "progress-stream": "^2.0.0", "qrcode": "^1.5.0", "readdirp": "^3.6.0", - "s3-block-read-stream": "^0.5.0", "safetydance": "^2.2.0", "semver": "^7.3.7", "speakeasy": "^2.0.0", @@ -6517,11 +6516,6 @@ "url": "https://github.com/sponsors/isaacs" } }, - "node_modules/s3-block-read-stream": { - "version": "0.5.0", - "resolved": "https://registry.npmjs.org/s3-block-read-stream/-/s3-block-read-stream-0.5.0.tgz", - "integrity": "sha512-gvreSgWTZJvv2yQe53seTdiv6LYoa0mJYkwYWzuOtXRNhWCcXyRJQPAJjeyDaxme2NYZ00R1jQr76KrvTZCn3w==" - }, "node_modules/safe-buffer": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", @@ -12999,11 +12993,6 @@ "glob": "^7.1.3" } }, - "s3-block-read-stream": { - "version": "0.5.0", - "resolved": "https://registry.npmjs.org/s3-block-read-stream/-/s3-block-read-stream-0.5.0.tgz", - "integrity": "sha512-gvreSgWTZJvv2yQe53seTdiv6LYoa0mJYkwYWzuOtXRNhWCcXyRJQPAJjeyDaxme2NYZ00R1jQr76KrvTZCn3w==" - }, "safe-buffer": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", diff --git a/package.json b/package.json index 9f87ecb8d..1d600793e 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,6 @@ "progress-stream": "^2.0.0", "qrcode": "^1.5.0", "readdirp": "^3.6.0", - "s3-block-read-stream": "^0.5.0", "safetydance": "^2.2.0", "semver": "^7.3.7", "speakeasy": "^2.0.0", diff --git a/src/storage/s3-block-read-stream.js b/src/storage/s3-block-read-stream.js new file mode 100644 index 000000000..f3a0f01a7 --- /dev/null +++ b/src/storage/s3-block-read-stream.js @@ -0,0 +1,110 @@ +'use strict'; + +// initial code from https://github.com/tilfin/s3-block-read-stream/blob/master/LICENSE (MIT) + +const Readable = require('stream').Readable; +const util = require('util'); + +function S3ReadStream(s3, params, options) { + if (!(this instanceof S3ReadStream)) + return new S3ReadStream(s3, params, options); + + const opts = options || {}; + this._s3 = s3; + this._params = params; + this._readSize = 0; + this._fileSize = -1; + this._path = params.Bucket + '/' + params.Key; + + this._interval = opts.interval || 0; // msec + delete opts.interval; + this._blockSize = opts.blockSize || 64 * 1048576; //MB + delete opts.blockSize; + this._log = opts.logCallback || (opts.debug ? function(msg) { console.warn(msg); } : function(){}); + delete opts.logCallback; + + Readable.call(this, opts); +} +util.inherits(S3ReadStream, Readable); +S3ReadStream.prototype._read = function() { + if (this._readSize === this._fileSize) { + this._done(); + } else if (this._readSize) { + setTimeout(() => { + this._nextDownload(); + }, this._interval); + } else { + this._log(`${this._path} - Start`); + this._fetchSize(); + } +}; + +S3ReadStream.prototype._fetchSize = function() { + const params = {}; + for (var key in this._params) { + if (!key.match(/^Response/)) { + params[key] = this._params[key]; + } + } + + this._s3.headObject(params, (err, data) => { + if (err) { + process.nextTick(() => this.emit('error', err)); + return; + } + + const reslen = parseInt(data.ContentLength, 10); + this._log(`${this._path} - File Size: ${reslen}`); + + if (reslen > 0) { + this._fileSize = reslen; + this._nextDownload(); + } else { + this._done(); + } + }); +}; + +S3ReadStream.prototype._downloadRange = function(offset, length) { + const params = Object.assign({}, this._params); + const lastPos = offset + length - 1; + const range = 'bytes=' + offset + '-' + lastPos; + params['Range'] = range; + + this._log(`${this._path} - Download Range: ${range}`); + + this._s3.getObject(params, (err, data) => { + if (err) { + process.nextTick(() => this.emit('error', err)); + return; + } + + const reslen = parseInt(data.ContentLength, 10); + this._log(`${this._path} - Received Size: ${reslen}`); + + if (reslen > 0) { + this._readSize += reslen; + this.push(data.Body); + } else { + this._done(); + } + }); +}; + +S3ReadStream.prototype._nextDownload = function() { + let len = 0; + if (this._readSize + this._blockSize < this._fileSize) { + len = this._blockSize; + } else { + len = this._fileSize - this._readSize; + } + this._downloadRange(this._readSize, len); +}; + +S3ReadStream.prototype._done = function() { + this._readSize = 0; + this.push(null); + this._log(`${this._path} - Done`); +}; + +module.exports = S3ReadStream; diff --git a/src/storage/s3.js b/src/storage/s3.js index 46466eace..07eb822b8 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -37,7 +37,7 @@ const assert = require('assert'), https = require('https'), PassThrough = require('stream').PassThrough, path = require('path'), - S3BlockReadStream = require('s3-block-read-stream'), + S3BlockReadStream = require('./s3-block-read-stream.js'), safe = require('safetydance'), util = require('util'), _ = require('underscore');