storage: rework listDir api to be a generator (like) function

This commit is contained in:
Girish Ramakrishnan
2025-02-12 18:46:54 +01:00
parent da0dcf65b3
commit 9888aa8c08
9 changed files with 143 additions and 180 deletions
+28 -41
View File
@@ -37,7 +37,6 @@ const assert = require('assert'),
path = require('path'),
Readable = require('stream').Readable,
safe = require('safetydance'),
util = require('util'),
_ = require('underscore');
let aws = AwsSdk;
@@ -262,12 +261,11 @@ async function download(apiConfig, backupFilePath) {
return new S3MultipartDownloadStream(s3, params, { blockSize: 64 * 1024 * 1024 });
}
function listDir(apiConfig, dir, batchSize, iteratorCallback, callback) {
async function listDir(apiConfig, dir, batchSize, marker) {
assert.strictEqual(typeof apiConfig, 'object');
assert.strictEqual(typeof dir, 'string');
assert.strictEqual(typeof batchSize, 'number');
assert.strictEqual(typeof iteratorCallback, 'function');
assert.strictEqual(typeof callback, 'function');
assert(typeof marker !== 'undefined');
const credentials = getS3Config(apiConfig);
@@ -277,28 +275,13 @@ function listDir(apiConfig, dir, batchSize, iteratorCallback, callback) {
Prefix: dir,
MaxKeys: batchSize
};
if (marker) listParams.Marker = marker;
let done = false;
async.whilst((testDone) => testDone(null, !done), function listAndDownload(whilstCallback) {
s3.listObjects(listParams, function (error, listData) {
if (error) return whilstCallback(new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects in ${dir}. Message: ${error.message} HTTP Code: ${error.code}`));
if (listData.Contents.length === 0) { done = true; return whilstCallback(); }
const entries = listData.Contents.map(function (c) { return { fullPath: c.Key, size: c.Size }; });
iteratorCallback(entries, function (error) {
if (error) return whilstCallback(error);
if (!listData.IsTruncated) { done = true; return whilstCallback(); }
listParams.Marker = listData.Contents[listData.Contents.length - 1].Key; // NextMarker is returned only with delimiter
whilstCallback();
});
});
}, callback);
const [error, listData] = await safe(s3.listObjects(listParams).promise());
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects in ${dir}. Message: ${error.message} HTTP Code: ${error.code}`);
if (listData.Contents.length === 0) return { entries: [], marker: null }; // no more
const entries = listData.Contents.map(function (c) { return { fullPath: c.Key, size: c.Size }; });
return { entries, marker: !listData.IsTruncated ? null : listData.Contents[listData.Contents.length - 1].Key };
}
// https://github.com/aws/aws-sdk-js/blob/2b6bcbdec1f274fe931640c1b61ece999aae7a19/lib/util.js#L41
@@ -427,18 +410,17 @@ async function copy(apiConfig, oldFilePath, newFilePath, progressCallback) {
const concurrency = apiConfig.limits?.copyConcurrency || (apiConfig.provider === 's3' ? 500 : 10);
progressCallback({ message: `Copying with concurrency of ${concurrency}` });
const listDirAsync = util.promisify(listDir);
let marker = null;
while (true) {
const batch = await listDir(apiConfig, oldFilePath, 1000, marker);
total += batch.entries.length;
progressCallback({ message: `Copying files from ${total-batch.entries.length}-${total}` });
await async.eachLimit(batch.entries, concurrency, copyFile);
if (!batch.marker) break;
marker = batch.marker;
}
const [copyError] = await safe(listDirAsync(apiConfig, oldFilePath, 1000, function listDirIterator(entries, done) {
total += entries.length;
progressCallback({ message: `Copying files from ${total-entries.length}-${total}` });
async.eachLimit(entries, concurrency, copyFile, done);
}));
progressCallback({ message: `Copied ${total} files with error: ${copyError}` });
if (copyError) throw copyError;
progressCallback({ message: `Copied ${total} files` });
}
async function remove(apiConfig, filename) {
@@ -484,17 +466,19 @@ async function removeDir(apiConfig, pathPrefix, progressCallback) {
const credentials = getS3Config(apiConfig);
const s3 = new aws.S3(credentials);
const listDirAsync = util.promisify(listDir);
let total = 0;
let marker = null;
while (true) {
const batch = await listDir(apiConfig, pathPrefix, 1000, marker);
await listDirAsync(apiConfig, pathPrefix, 1000, function listDirIterator(entries, done) {
const entries = batch.entries;
total += entries.length;
const chunkSize = apiConfig.limits?.deleteConcurrency || (apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100); // throttle objects in each request
const chunks = chunk(entries, chunkSize);
async.eachSeries(chunks, async function deleteFiles(objects) {
await async.eachSeries(chunks, async function deleteFiles(objects) {
const deleteParams = {
Bucket: apiConfig.bucket,
Delete: {
@@ -510,8 +494,11 @@ async function removeDir(apiConfig, pathPrefix, progressCallback) {
progressCallback({ message: `Unable to remove ${deleteParams.Key} ${error.message || error.code}` });
throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${deleteParams.Key}. error: ${error.message}`);
}
}, done);
});
});
if (!batch.marker) break;
marker = batch.marker;
}
progressCallback({ message: `Removed ${total} files` });
}