Add S3MultipartDownloadStream

This extends the modern Readable class
This commit is contained in:
Girish Ramakrishnan
2022-04-29 18:23:56 -07:00
parent 1c1d247a24
commit 2280b7eaf5
2 changed files with 84 additions and 127 deletions

View File

@@ -35,9 +35,8 @@ const assert = require('assert'),
debug = require('debug')('box:storage/s3'),
EventEmitter = require('events'),
https = require('https'),
PassThrough = require('stream').PassThrough,
path = require('path'),
S3BlockReadStream = require('./s3-block-read-stream.js'),
Readable = require('stream').Readable,
safe = require('safetydance'),
util = require('util'),
_ = require('underscore');
@@ -173,6 +172,87 @@ async function exists(apiConfig, backupFilePath) {
}
}
// Download the object in small parts. By downloading small parts, we reduce the chance of sporadic network errors when downloading large objects
// We can retry each part individually, but we haven't had the need for this yet
class S3MultipartDownloadStream extends Readable {
constructor (s3, params, options) {
super(options);
this._s3 = s3;
this._params = params;
this._readSize = 0;
this._fileSize = -1;
this._path = params.Bucket + '/' + params.Key;
this._blockSize = options.blockSize || 64 * 1048576; // MB
}
_done() {
this._readSize = 0;
this.push(null); // EOF
}
_handleError(error) {
if (S3_NOT_FOUND(error)) {
this.destroy(new BoxError(BoxError.NOT_FOUND, `Backup not found: ${this._path}`));
} else {
debug(`download: ${this._path} s3 stream error.`, error);
this.destroy(new BoxError(BoxError.EXTERNAL_ERROR, `Error multipartDownload ${this._path}. Message: ${error.message} HTTP Code: ${error.code}`));
}
}
_downloadRange(offset, length) {
const params = Object.assign({}, this._params);
const lastPos = offset + length - 1;
const range = `bytes=${offset}-${lastPos}`;
params['Range'] = range;
this._s3.getObject(params, (error, data) => {
if (error) return this._handleError(error);
const length = parseInt(data.ContentLength, 10);
if (length > 0) {
this._readSize += length;
this.push(data.Body);
} else {
this._done();
}
});
}
_nextDownload() {
let len = 0;
if (this._readSize + this._blockSize < this._fileSize) {
len = this._blockSize;
} else {
len = this._fileSize - this._readSize;
}
this._downloadRange(this._readSize, len);
}
_fetchSize() {
this._s3.headObject(this._params, (error, data) => {
if (error) return this._handleError(error);
const length = parseInt(data.ContentLength, 10);
if (length > 0) {
this._fileSize = length;
this._nextDownload();
} else {
this._done();
}
});
}
_read() {
if (this._readSize === this._fileSize) return this._done();
if (this._readSize === 0) return this._fetchSize();
this._nextDownload();
}
}
function download(apiConfig, backupFilePath, callback) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof backupFilePath, 'string');
@@ -187,21 +267,8 @@ function download(apiConfig, backupFilePath, callback) {
const s3 = new aws.S3(credentials);
const ps = new PassThrough();
const multipartDownload = new S3BlockReadStream(s3, params, { blockSize: 64 * 1024 * 1024 /*, logCallback: debug */ });
multipartDownload.on('error', function (error) {
if (S3_NOT_FOUND(error)) {
ps.emit('error', new BoxError(BoxError.NOT_FOUND, `Backup not found: ${backupFilePath}`));
} else {
debug(`download: ${apiConfig.bucket}:${backupFilePath} s3 stream error.`, error);
ps.emit('error', new BoxError(BoxError.EXTERNAL_ERROR, `Error multipartDownload ${backupFilePath}. Message: ${error.message} HTTP Code: ${error.code}`));
}
});
multipartDownload.pipe(ps);
callback(null, ps);
const multipartDownloadStream = new S3MultipartDownloadStream(s3, params, { blockSize: 64 * 1024 * 1024 });
return callback(null, multipartDownloadStream);
}
function listDir(apiConfig, dir, batchSize, iteratorCallback, callback) {