storage: pass limits object to backend
This commit is contained in:
@@ -91,7 +91,7 @@ async function removeBackup(site, backup, progressCallback) {
|
||||
[removeError] = await safe(backupSites.storageApi(site).remove(site.config, remotePath));
|
||||
} else {
|
||||
progressCallback({ message: `${backup.remotePath}: Removing directory ${remotePath}`});
|
||||
[removeError] = await safe(backupSites.storageApi(site).removeDir(site.config, remotePath, progressCallback));
|
||||
[removeError] = await safe(backupSites.storageApi(site).removeDir(site.config, site.limits, remotePath, progressCallback));
|
||||
}
|
||||
|
||||
if (removeError) {
|
||||
@@ -258,7 +258,7 @@ async function removeOldAppSnapshots(site) {
|
||||
if (ext) {
|
||||
await safe(backupSites.storageApi(site).remove(site.config, remotePath), { debug });
|
||||
} else {
|
||||
await safe(backupSites.storageApi(site).removeDir(site.config, remotePath, progressCallback), { debug });
|
||||
await safe(backupSites.storageApi(site).removeDir(site.config, site.limits, remotePath, progressCallback), { debug });
|
||||
}
|
||||
|
||||
await backupSites.setSnapshotInfo(site, appId, null /* info */);
|
||||
|
||||
@@ -107,7 +107,7 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) {
|
||||
|
||||
if (change.operation === 'removedir') {
|
||||
debug(`sync: removing directory ${fullPath}`);
|
||||
await backupSites.storageApi(backupSite).removeDir(backupSite.config, fullPath, progressCallback);
|
||||
await backupSites.storageApi(backupSite).removeDir(backupSite.config, backupSite.limits, fullPath, progressCallback);
|
||||
} else if (change.operation === 'remove') {
|
||||
debug(`sync: removing ${fullPath}`);
|
||||
await backupSites.storageApi(backupSite).remove(backupSite.config, fullPath);
|
||||
@@ -115,7 +115,7 @@ async function sync(backupSite, remotePath, dataLayout, progressCallback) {
|
||||
await promiseRetry({ times: 5, interval: 20000, debug }, async (retryCount) => {
|
||||
progressCallback({ message: `Adding ${change.path}` + (retryCount > 1 ? ` (Try ${retryCount})` : '') });
|
||||
debug(`sync: adding ${change.path} position ${change.position} try ${retryCount}`);
|
||||
const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, fullPath);
|
||||
const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, backupSite.limits, fullPath);
|
||||
const result = await addFile(dataLayout.toLocalPath('./' + change.path), backupSite.encryption, uploader, progressCallback);
|
||||
integrityMap.set(change.path, result.integrity); // .integrity can be null when file disappeared on us
|
||||
destPathIntegrityMap.set(destPath, result.integrity);
|
||||
@@ -308,7 +308,7 @@ async function copy(backupSite, fromPath, toPath, progressCallback) {
|
||||
assert.strictEqual(typeof toPath, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
await backupSites.storageApi(backupSite).copyDir(backupSite.config, fromPath, toPath, progressCallback);
|
||||
await backupSites.storageApi(backupSite).copyDir(backupSite.config, backupSite.limits, fromPath, toPath, progressCallback);
|
||||
}
|
||||
|
||||
function getFileExtension(encryption) {
|
||||
|
||||
@@ -264,7 +264,7 @@ async function upload(backupSite, remotePath, dataLayout, progressCallback) {
|
||||
return await promiseRetry({ times: 5, interval: 20000, debug }, async () => {
|
||||
progressCallback({ message: `Uploading backup ${remotePath}` });
|
||||
|
||||
const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, remotePath);
|
||||
const uploader = await backupSites.storageApi(backupSite).upload(backupSite.config, backupSite.limits, remotePath);
|
||||
const { stats, integrity } = await tarPack(dataLayout, backupSite.encryption, uploader, progressCallback);
|
||||
|
||||
// use '.' instead of remote path since the backup can be moved to another path
|
||||
|
||||
+1
-1
@@ -78,7 +78,7 @@ async function uploadBackupInfo(backupSite, remotePath, integrityMap) {
|
||||
const integrityDataJsonString = JSON.stringify(Object.fromEntries(sortedIntegrityMap), null, 2);
|
||||
const integrityDataStream = Readable.from(integrityDataJsonString);
|
||||
// unencrypted for easy verification without having to decrypt anything
|
||||
const integrityUploader = await backupSites.storageApi(backupSite).upload(backupSite.config, `${remotePath}.backupinfo`);
|
||||
const integrityUploader = await backupSites.storageApi(backupSite).upload(backupSite.config, backupSite.limits, `${remotePath}.backupinfo`);
|
||||
await stream.pipeline(integrityDataStream, integrityUploader.stream);
|
||||
await integrityUploader.finish();
|
||||
|
||||
|
||||
@@ -103,8 +103,9 @@ function hasChownSupportSync(config) {
|
||||
}
|
||||
}
|
||||
|
||||
async function upload(config, remotePath) {
|
||||
async function upload(config, limits, remotePath) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const fullRemotePath = path.join(getRootPath(config), remotePath);
|
||||
@@ -234,8 +235,9 @@ async function copy(config, fromPath, toPath, progressCallback) {
|
||||
return await copyInternal(config, fromPath, toPath, { recursive: false }, progressCallback);
|
||||
}
|
||||
|
||||
async function copyDir(config, fromPath, toPath, progressCallback) {
|
||||
async function copyDir(config, limits, fromPath, toPath, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof fromPath, 'string');
|
||||
assert.strictEqual(typeof toPath, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
@@ -258,8 +260,9 @@ async function remove(config, remotePath) {
|
||||
}
|
||||
}
|
||||
|
||||
async function removeDir(config, remotePathPrefix, progressCallback) {
|
||||
async function removeDir(config, limits, remotePathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof remotePathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
|
||||
+9
-5
@@ -67,8 +67,9 @@ async function getStatus(config) {
|
||||
return { state: 'active', message: '' };
|
||||
}
|
||||
|
||||
async function upload(config, remotePath) {
|
||||
async function upload(config, limits, remotePath) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const fullRemotePath = path.join(config.prefix, remotePath);
|
||||
@@ -156,14 +157,15 @@ async function copy(config, fullFromPath, fullToPath, progressCallback) {
|
||||
if (copyError) throw new BoxError(BoxError.EXTERNAL_ERROR, copyError.message);
|
||||
}
|
||||
|
||||
async function copyDir(config, fromPath, toPath, progressCallback) {
|
||||
async function copyDir(config, limits, fromPath, toPath, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof fromPath, 'string');
|
||||
assert.strictEqual(typeof toPath, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const batchSize = 1000;
|
||||
const concurrency = config.limits?.copyConcurrency || 10;
|
||||
const concurrency = limits.copyConcurrency || 10;
|
||||
let total = 0;
|
||||
|
||||
let marker = null;
|
||||
@@ -192,12 +194,14 @@ async function remove(config, remotePath) {
|
||||
if (error) debug('removeBackups: Unable to remove %s (%s). Not fatal.', fullRemotePath, error.message);
|
||||
}
|
||||
|
||||
async function removeDir(config, remotePathPrefix, progressCallback) {
|
||||
async function removeDir(config, limits, remotePathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof remotePathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const batchSize = 1000, concurrency = config.limits?.deleteConcurrency || 10; // https://googleapis.dev/nodejs/storage/latest/Bucket.html#deleteFiles
|
||||
const batchSize = 1000;
|
||||
const concurrency = limits.deleteConcurrency || 10; // https://googleapis.dev/nodejs/storage/latest/Bucket.html#deleteFiles
|
||||
let total = 0;
|
||||
|
||||
let marker = null;
|
||||
|
||||
@@ -62,8 +62,9 @@ async function getStatus(config) {
|
||||
throw new BoxError(BoxError.NOT_IMPLEMENTED, 'getStatus is not implemented');
|
||||
}
|
||||
|
||||
async function upload(config, backupFilePath) {
|
||||
async function upload(config, limits, backupFilePath) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
|
||||
// Result: { stream, finish() callback }
|
||||
@@ -95,8 +96,9 @@ async function copy(config, oldFilePath, newFilePath, progressCallback) {
|
||||
throw new BoxError(BoxError.NOT_IMPLEMENTED, 'copy is not implemented');
|
||||
}
|
||||
|
||||
async function copyDir(config, oldFilePath, newFilePath, progressCallback) {
|
||||
async function copyDir(config, limits, oldFilePath, newFilePath, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
assert.strictEqual(typeof newFilePath, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
@@ -123,8 +125,9 @@ async function remove(config, filename) {
|
||||
throw new BoxError(BoxError.NOT_IMPLEMENTED, 'remove is not implemented');
|
||||
}
|
||||
|
||||
async function removeDir(config, pathPrefix, progressCallback) {
|
||||
async function removeDir(config, limits, pathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
|
||||
+9
-6
@@ -169,8 +169,9 @@ async function getStatus(config) {
|
||||
return { state: 'active', message: '' };
|
||||
}
|
||||
|
||||
async function upload(config, remotePath) {
|
||||
async function upload(config, limits, remotePath) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const s3 = createS3Client(config, { retryStrategy: RETRY_STRATEGY });
|
||||
@@ -179,7 +180,7 @@ async function upload(config, remotePath) {
|
||||
// uploader will buffer at most queueSize * partSize bytes into memory at any given time.
|
||||
// scaleway only supports 1000 parts per object (https://www.scaleway.com/en/docs/s3-multipart-upload/)
|
||||
// s3: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html (max 10k parts and no size limit on the last part!)
|
||||
const partSize = config.limits?.uploadPartSize || (config._provider === 'scaleway-objectstorage' ? 100 * 1024 * 1024 : 10 * 1024 * 1024);
|
||||
const partSize = limits.uploadPartSize || (config._provider === 'scaleway-objectstorage' ? 100 * 1024 * 1024 : 10 * 1024 * 1024);
|
||||
|
||||
const passThrough = new PassThrough();
|
||||
|
||||
@@ -487,14 +488,15 @@ async function copy(config, fromPath, toPath, progressCallback) {
|
||||
return await copyInternal(config, fullFromPath, fullToPath, data.ContentLength, progressCallback);
|
||||
}
|
||||
|
||||
async function copyDir(config, fromPath, toPath, progressCallback) {
|
||||
async function copyDir(config, limits, fromPath, toPath, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof fromPath, 'string');
|
||||
assert.strictEqual(typeof toPath, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
let total = 0;
|
||||
const concurrency = config.limits?.copyConcurrency || (config._provider === 's3' ? 500 : 10);
|
||||
const concurrency = limits.copyConcurrency || (config._provider === 's3' ? 500 : 10);
|
||||
progressCallback({ message: `Copying ${fromPath} to ${toPath} with concurrency of ${concurrency}` });
|
||||
|
||||
let marker = null;
|
||||
@@ -548,8 +550,9 @@ function chunk(array, size) {
|
||||
return result;
|
||||
}
|
||||
|
||||
async function removeDir(config, remotePathPrefix, progressCallback) {
|
||||
async function removeDir(config, limits, remotePathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof config, 'object');
|
||||
assert.strictEqual(typeof limits, 'object');
|
||||
assert.strictEqual(typeof remotePathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
@@ -564,7 +567,7 @@ async function removeDir(config, remotePathPrefix, progressCallback) {
|
||||
const entries = batch.entries;
|
||||
total += entries.length;
|
||||
|
||||
const chunkSize = config.limits?.deleteConcurrency || (config._provider !== 'digitalocean-spaces' ? 1000 : 100); // throttle objects in each request
|
||||
const chunkSize = limits.deleteConcurrency || (config._provider !== 'digitalocean-spaces' ? 1000 : 100); // throttle objects in each request
|
||||
const chunks = chunk(entries, chunkSize);
|
||||
|
||||
await async.eachSeries(chunks, async function deleteFiles(objects) {
|
||||
|
||||
Reference in New Issue
Block a user