storage: make remove and removeDir async
This commit is contained in:
+10
-19
@@ -227,38 +227,29 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
return events;
|
||||
}
|
||||
|
||||
function remove(apiConfig, filename, callback) {
|
||||
async function remove(apiConfig, filename) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
var stat = safe.fs.statSync(filename);
|
||||
if (!stat) return callback();
|
||||
const stat = safe.fs.statSync(filename);
|
||||
if (!stat) return;
|
||||
|
||||
if (stat.isFile()) {
|
||||
if (!safe.fs.unlinkSync(filename)) return callback(new BoxError(BoxError.EXTERNAL_ERROR, safe.error.message));
|
||||
if (!safe.fs.unlinkSync(filename)) throw new BoxError(BoxError.EXTERNAL_ERROR, safe.error.message);
|
||||
} else if (stat.isDirectory()) {
|
||||
if (!safe.fs.rmSync(filename, { recursive: true })) return callback(new BoxError(BoxError.EXTERNAL_ERROR, safe.error.message));
|
||||
if (!safe.fs.rmSync(filename, { recursive: true })) throw new BoxError(BoxError.EXTERNAL_ERROR, safe.error.message);
|
||||
}
|
||||
|
||||
callback(null);
|
||||
}
|
||||
|
||||
function removeDir(apiConfig, pathPrefix) {
|
||||
async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
var events = new EventEmitter();
|
||||
progressCallback({ message: `Removing directory ${pathPrefix}` });
|
||||
|
||||
process.nextTick(() => events.emit('progress', `Removing directory ${pathPrefix}`));
|
||||
|
||||
shell.spawn('removeDir', '/bin/rm', [ '-rf', pathPrefix ], { }, function (error) {
|
||||
if (error) return events.emit('done', new BoxError(BoxError.EXTERNAL_ERROR, error.message));
|
||||
|
||||
events.emit('done', null);
|
||||
});
|
||||
|
||||
return events;
|
||||
const [error] = await safe(shell.promises.spawn('removeDir', '/bin/rm', [ '-rf', pathPrefix ], { }));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, error.message);
|
||||
}
|
||||
|
||||
function validateBackupTarget(folder) {
|
||||
|
||||
+14
-25
@@ -34,7 +34,8 @@ const assert = require('assert'),
|
||||
EventEmitter = require('events'),
|
||||
PassThrough = require('stream').PassThrough,
|
||||
path = require('path'),
|
||||
safe = require('safetydance');
|
||||
safe = require('safetydance'),
|
||||
util = require('util');
|
||||
|
||||
let GCS = require('@google-cloud/storage').Storage;
|
||||
|
||||
@@ -221,44 +222,32 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
return events;
|
||||
}
|
||||
|
||||
function remove(apiConfig, filename, callback) {
|
||||
async function remove(apiConfig, filename) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
getBucket(apiConfig)
|
||||
.file(filename)
|
||||
.delete(function (error) {
|
||||
if (error) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filename, error.message);
|
||||
|
||||
callback(null);
|
||||
});
|
||||
const [error] = await safe(getBucket(apiConfig).file(filename).delete());
|
||||
if (error) debug('removeBackups: Unable to remove %s (%s). Not fatal.', filename, error.message);
|
||||
}
|
||||
|
||||
function removeDir(apiConfig, pathPrefix) {
|
||||
async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
|
||||
var events = new EventEmitter();
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const batchSize = 1000, concurrency = apiConfig.deleteConcurrency || 10; // https://googleapis.dev/nodejs/storage/latest/Bucket.html#deleteFiles
|
||||
var total = 0;
|
||||
let total = 0;
|
||||
|
||||
listDir(apiConfig, pathPrefix, batchSize, function (entries, done) {
|
||||
const listDirAsync = util.promisify(listDir);
|
||||
|
||||
await listDirAsync(apiConfig, pathPrefix, batchSize, function (entries, done) {
|
||||
total += entries.length;
|
||||
|
||||
events.emit('progress', `Removing ${entries.length} files from ${entries[0].fullPath} to ${entries[entries.length-1].fullPath}. total: ${total}`);
|
||||
progressCallback({ message: `Removing ${entries.length} files from ${entries[0].fullPath} to ${entries[entries.length-1].fullPath}. total: ${total}` });
|
||||
|
||||
async.eachLimit(entries, concurrency, function (entry, iteratorCallback) {
|
||||
remove(apiConfig, entry.fullPath, iteratorCallback);
|
||||
}, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', `Deleted ${total} files`);
|
||||
|
||||
process.nextTick(() => events.emit('done', error));
|
||||
async.eachLimit(entries, concurrency, async (entry) => await remove(apiConfig, entry.fullPath), done);
|
||||
});
|
||||
|
||||
return events;
|
||||
progressCallback({ progress: `Deleted ${total} files` });
|
||||
}
|
||||
|
||||
async function remount(apiConfig) {
|
||||
|
||||
@@ -119,24 +119,21 @@ function listDir(apiConfig, dir, batchSize, iteratorCallback, callback) {
|
||||
callback(new BoxError(BoxError.NOT_IMPLEMENTED, 'listDir is not implemented'));
|
||||
}
|
||||
|
||||
function remove(apiConfig, filename, callback) {
|
||||
async function remove(apiConfig, filename) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
// Result: none
|
||||
|
||||
callback(new BoxError(BoxError.NOT_IMPLEMENTED, 'remove is not implemented'));
|
||||
throw new BoxError(BoxError.NOT_IMPLEMENTED, 'remove is not implemented');
|
||||
}
|
||||
|
||||
function removeDir(apiConfig, pathPrefix) {
|
||||
async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
// Result: none
|
||||
var events = new EventEmitter();
|
||||
process.nextTick(function () { events.emit('done', new BoxError(BoxError.NOT_IMPLEMENTED, 'removeDir is not implemented')); });
|
||||
return events;
|
||||
throw new BoxError(BoxError.NOT_IMPLEMENTED, 'removeDir is not implemented');
|
||||
}
|
||||
|
||||
async function remount(apiConfig) {
|
||||
|
||||
+5
-11
@@ -104,25 +104,19 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
return events;
|
||||
}
|
||||
|
||||
function remove(apiConfig, filename, callback) {
|
||||
async function remove(apiConfig, filename) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
debug('remove: %s', filename);
|
||||
|
||||
callback(null);
|
||||
debug(`remove: ${filename}`);
|
||||
}
|
||||
|
||||
function removeDir(apiConfig, pathPrefix) {
|
||||
function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
debug('removeDir: %s', pathPrefix);
|
||||
|
||||
var events = new EventEmitter();
|
||||
process.nextTick(function () { events.emit('done', null); });
|
||||
return events;
|
||||
debug(`removeDir: ${pathPrefix}`);
|
||||
}
|
||||
|
||||
async function remount(apiConfig) {
|
||||
|
||||
+20
-29
@@ -39,6 +39,7 @@ const assert = require('assert'),
|
||||
path = require('path'),
|
||||
S3BlockReadStream = require('s3-block-read-stream'),
|
||||
safe = require('safetydance'),
|
||||
util = require('util'),
|
||||
_ = require('underscore');
|
||||
|
||||
let aws = AwsSdk;
|
||||
@@ -385,10 +386,9 @@ function copy(apiConfig, oldFilePath, newFilePath) {
|
||||
return events;
|
||||
}
|
||||
|
||||
function remove(apiConfig, filename, callback) {
|
||||
async function remove(apiConfig, filename) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
@@ -402,57 +402,48 @@ function remove(apiConfig, filename, callback) {
|
||||
};
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
s3.deleteObjects(deleteParams, function (error) {
|
||||
if (error) return callback(new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${deleteParams.Key}. error: ${error.message || error.code}`)); // DO sets 'code'
|
||||
|
||||
callback(null);
|
||||
});
|
||||
const [error] = await safe(s3.deleteObjects(deleteParams).promise());
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${deleteParams.Key}. error: ${error.message}`);
|
||||
}
|
||||
|
||||
function removeDir(apiConfig, pathPrefix) {
|
||||
async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const events = new EventEmitter();
|
||||
let total = 0;
|
||||
|
||||
const credentials = getS3Config(apiConfig);
|
||||
|
||||
const s3 = new aws.S3(credentials);
|
||||
const listDirAsync = util.promisify(listDir);
|
||||
|
||||
listDir(apiConfig, pathPrefix, 1000, function listDirIterator(entries, done) {
|
||||
let total = 0;
|
||||
|
||||
await listDirAsync(apiConfig, pathPrefix, 1000, function listDirIterator(entries, done) {
|
||||
total += entries.length;
|
||||
|
||||
const chunkSize = apiConfig.deleteConcurrency || (apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100); // throttle objects in each request
|
||||
var chunks = chunk(entries, chunkSize);
|
||||
const chunks = chunk(entries, chunkSize);
|
||||
|
||||
async.eachSeries(chunks, function deleteFiles(objects, iteratorCallback) {
|
||||
var deleteParams = {
|
||||
async.eachSeries(chunks, async function deleteFiles(objects) {
|
||||
const deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Delete: {
|
||||
Objects: objects.map(function (o) { return { Key: o.fullPath }; })
|
||||
}
|
||||
};
|
||||
|
||||
events.emit('progress', `Removing ${objects.length} files from ${objects[0].fullPath} to ${objects[objects.length-1].fullPath}`);
|
||||
progressCallback({ message: `Removing ${objects.length} files from ${objects[0].fullPath} to ${objects[objects.length-1].fullPath}` });
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
s3.deleteObjects(deleteParams, function (error /*, deleteData */) {
|
||||
if (error) {
|
||||
events.emit('progress', `Unable to remove ${deleteParams.Key} ${error.message || error.code}`);
|
||||
return iteratorCallback(new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${deleteParams.Key}. error: ${error.message || error.code}`)); // DO sets 'code'
|
||||
}
|
||||
|
||||
iteratorCallback(null);
|
||||
});
|
||||
const [error] = await safe(s3.deleteObjects(deleteParams).promise());
|
||||
if (error) {
|
||||
progressCallback({ message: `Unable to remove ${deleteParams.Key} ${error.message || error.code}` });
|
||||
throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${deleteParams.Key}. error: ${error.message}`);
|
||||
}
|
||||
}, done);
|
||||
}, function (error) {
|
||||
events.emit('progress', `Removed ${total} files`);
|
||||
|
||||
process.nextTick(() => events.emit('done', error));
|
||||
});
|
||||
|
||||
return events;
|
||||
progressCallback({ message: `Removed ${total} files` });
|
||||
}
|
||||
|
||||
async function remount(apiConfig) {
|
||||
|
||||
Reference in New Issue
Block a user