backups: root ~~canal~~ path surgery
remove rootPath and getBackupFilePath from the backup target and make this backend specific.
This commit is contained in:
@@ -112,7 +112,7 @@ function createS3Client(apiConfig, options) {
|
||||
if (apiConfig.endpoint) clientConfig.endpoint = apiConfig.endpoint;
|
||||
|
||||
// s3 endpoint names come from the SDK
|
||||
const isHttps = clientConfig.endpoint?.startsWith('https://') || apiConfig.provider === 's3';
|
||||
const isHttps = clientConfig.endpoint?.startsWith('https://') || apiConfig._provider === 's3';
|
||||
if (isHttps) {
|
||||
if (apiConfig.acceptSelfSignedCerts || apiConfig.bucket.includes('.')) {
|
||||
requestHandler.agent = new https.Agent({ rejectUnauthorized: false });
|
||||
@@ -137,7 +137,7 @@ function createS3Client(apiConfig, options) {
|
||||
// });
|
||||
|
||||
// This ensures it runs after default checksums might be added, but before signing
|
||||
if (options.deleteObjects && apiConfig.provider !== 's3') {
|
||||
if (options.deleteObjects && apiConfig._provider !== 's3') {
|
||||
// flexibleChecksumsMiddleware is only present when the request has a body. Only use this for DeleteObjects call. Other requests without a body will crash
|
||||
client.middlewareStack.addRelativeTo(md5Middleware, {
|
||||
relation: 'after',
|
||||
@@ -156,9 +156,9 @@ async function getAvailableSize(apiConfig) {
|
||||
return Number.POSITIVE_INFINITY;
|
||||
}
|
||||
|
||||
async function upload(apiConfig, backupFilePath) {
|
||||
async function upload(apiConfig, remotePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
|
||||
@@ -166,7 +166,7 @@ async function upload(apiConfig, backupFilePath) {
|
||||
// uploader will buffer at most queueSize * partSize bytes into memory at any given time.
|
||||
// scaleway only supports 1000 parts per object (https://www.scaleway.com/en/docs/s3-multipart-upload/)
|
||||
// s3: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html (max 10k parts and no size limit on the last part!)
|
||||
const partSize = apiConfig.limits?.uploadPartSize || (apiConfig.provider === 'scaleway-objectstorage' ? 100 * 1024 * 1024 : 10 * 1024 * 1024);
|
||||
const partSize = apiConfig.limits?.uploadPartSize || (apiConfig._provider === 'scaleway-objectstorage' ? 100 * 1024 * 1024 : 10 * 1024 * 1024);
|
||||
|
||||
const passThrough = new PassThrough();
|
||||
|
||||
@@ -174,7 +174,7 @@ async function upload(apiConfig, backupFilePath) {
|
||||
client: s3,
|
||||
params: {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: backupFilePath,
|
||||
Key: path.join(apiConfig.prefix, remotePath),
|
||||
Body: passThrough
|
||||
},
|
||||
partSize,
|
||||
@@ -196,33 +196,35 @@ async function upload(apiConfig, backupFilePath) {
|
||||
};
|
||||
}
|
||||
|
||||
async function exists(apiConfig, backupFilePath) {
|
||||
async function exists(apiConfig, remotePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: null });
|
||||
|
||||
if (!backupFilePath.endsWith('/')) { // check for file
|
||||
const fullRemotePath = path.join(apiConfig.prefix, remotePath);
|
||||
|
||||
if (!fullRemotePath.endsWith('/')) { // check for file
|
||||
const params = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: backupFilePath
|
||||
Key: fullRemotePath
|
||||
};
|
||||
|
||||
const [error, response] = await safe(s3.headObject(params));
|
||||
if (error && S3_NOT_FOUND(error)) return false;
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error headObject ${backupFilePath}. ${formatError(error)}`);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error headObject ${fullRemotePath}. ${formatError(error)}`);
|
||||
if (!response || typeof response.Metadata !== 'object') throw new BoxError(BoxError.EXTERNAL_ERROR, 'not a s3 endpoint');
|
||||
|
||||
return true;
|
||||
} else { // list dir contents
|
||||
const listParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Prefix: backupFilePath,
|
||||
Prefix: fullRemotePath,
|
||||
MaxKeys: 1
|
||||
};
|
||||
|
||||
const [error, listData] = await safe(s3.listObjectsV2(listParams));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects ${backupFilePath}. ${formatError(error)}`);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects ${fullRemotePath}. ${formatError(error)}`);
|
||||
|
||||
return listData.KeyCount !== 0 || listData.Contents.length !== 0;
|
||||
}
|
||||
@@ -308,37 +310,38 @@ class S3MultipartDownloadStream extends Readable {
|
||||
}
|
||||
}
|
||||
|
||||
async function download(apiConfig, backupFilePath) {
|
||||
async function download(apiConfig, remotePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof backupFilePath, 'string');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const params = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: backupFilePath
|
||||
Key: path.join(apiConfig.prefix, remotePath)
|
||||
};
|
||||
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
return new S3MultipartDownloadStream(s3, params, { blockSize: 64 * 1024 * 1024 });
|
||||
}
|
||||
|
||||
async function listDir(apiConfig, dir, batchSize, marker) {
|
||||
async function listDir(apiConfig, remotePath, batchSize, marker) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof dir, 'string');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
assert.strictEqual(typeof batchSize, 'number');
|
||||
assert(typeof marker !== 'undefined');
|
||||
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
const fullRemotePath = path.join(apiConfig.prefix, remotePath);
|
||||
const listParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Prefix: dir,
|
||||
Prefix: fullRemotePath,
|
||||
MaxKeys: batchSize
|
||||
};
|
||||
if (marker) listParams.ContinuationToken = marker;
|
||||
|
||||
const [error, listData] = await safe(s3.listObjectsV2(listParams));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects in ${dir}. ${formatError(error)}`);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error listing objects in ${fullRemotePath}. ${formatError(error)}`);
|
||||
if (listData.KeyCount === 0 || listData.Contents.length === 0) return { entries: [], marker: null }; // no more
|
||||
const entries = listData.Contents.map(function (c) { return { fullPath: c.Key, size: c.Size }; });
|
||||
const entries = listData.Contents.map(function (c) { return { fullPath: path.relative(fullRemotePath, c.Key), size: c.Size }; });
|
||||
return { entries, marker: !listData.IsTruncated ? null : listData.NextContinuationToken };
|
||||
}
|
||||
|
||||
@@ -355,62 +358,62 @@ function encodeCopySource(bucket, path) {
|
||||
return `/${bucket}/${output}`;
|
||||
}
|
||||
|
||||
async function copyFile(apiConfig, oldFilePath, newFilePath, entry, progressCallback) {
|
||||
async function copyFile(apiConfig, fullFromPath, fullToPath, fileSize, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
assert.strictEqual(typeof newFilePath, 'string');
|
||||
assert.strictEqual(typeof entry, 'object');
|
||||
assert.strictEqual(typeof fullFromPath, 'string');
|
||||
assert.strictEqual(typeof fullToPath, 'string');
|
||||
assert.strictEqual(typeof fileSize, 'number');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY }); // https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
|
||||
const relativePath = path.relative(oldFilePath, entry.fullPath);
|
||||
|
||||
function throwError(error) {
|
||||
if (error) debug(`copy: s3 copy error when copying ${entry.fullPath}: ${error}`);
|
||||
if (error) debug(`copy: s3 copy error when copying ${fullFromPath}: ${error}`);
|
||||
|
||||
if (error && S3_NOT_FOUND(error)) throw new BoxError(BoxError.NOT_FOUND, `Old backup not found: ${entry.fullPath}`);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error copying ${entry.fullPath} (${entry.size} bytes): ${error.Code || ''} ${error}`);
|
||||
if (error && S3_NOT_FOUND(error)) throw new BoxError(BoxError.NOT_FOUND, `Old backup not found: ${fullFromPath}`);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error copying ${fullFromPath} (${fileSize} bytes): ${error.Code || ''} ${error}`);
|
||||
}
|
||||
|
||||
const copyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath)
|
||||
Key: fullToPath
|
||||
};
|
||||
|
||||
// S3 copyObject has a file size limit of 5GB so if we have larger files, we do a multipart copy
|
||||
const largeFileLimit = (apiConfig.provider === 'vultr-objectstorage' || apiConfig.provider === 'exoscale-sos' || apiConfig.provider === 'backblaze-b2' || apiConfig.provider === 'digitalocean-spaces') ? 1024 * 1024 * 1024 : 3 * 1024 * 1024 * 1024;
|
||||
const provider = apiConfig._provider;
|
||||
const largeFileLimit = (provider === 'vultr-objectstorage' || provider === 'exoscale-sos' || provider === 'backblaze-b2' || provider === 'digitalocean-spaces') ? 1024 * 1024 * 1024 : 3 * 1024 * 1024 * 1024;
|
||||
|
||||
if (entry.size < largeFileLimit) {
|
||||
progressCallback({ message: `Copying ${relativePath || oldFilePath}` });
|
||||
if (fileSize < largeFileLimit) {
|
||||
progressCallback({ message: `Copying ${fullFromPath}` });
|
||||
|
||||
copyParams.CopySource = encodeCopySource(apiConfig.bucket, entry.fullPath);
|
||||
copyParams.CopySource = encodeCopySource(apiConfig.bucket, fullFromPath);
|
||||
const [copyError] = await safe(s3.copyObject(copyParams));
|
||||
if (copyError) return throwError(copyError);
|
||||
return;
|
||||
}
|
||||
|
||||
progressCallback({ message: `Copying (multipart) ${relativePath || oldFilePath}` });
|
||||
progressCallback({ message: `Copying (multipart) ${fullFromPath}` });
|
||||
|
||||
const [createMultipartError, multipart] = await safe(s3.createMultipartUpload(copyParams));
|
||||
if (createMultipartError) return throwError(createMultipartError);
|
||||
|
||||
// Exoscale (96M) was suggested by exoscale. 1GB for others is arbitrary size
|
||||
const chunkSize = apiConfig.provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024;
|
||||
const chunkSize = provider === 'exoscale-sos' ? 96 * 1024 * 1024 : 1024 * 1024 * 1024;
|
||||
const uploadId = multipart.UploadId;
|
||||
const uploadedParts = [], ranges = [];
|
||||
|
||||
let cur = 0;
|
||||
while (cur + chunkSize < entry.size) {
|
||||
while (cur + chunkSize < fileSize) {
|
||||
ranges.push({ startBytes: cur, endBytes: cur + chunkSize - 1 });
|
||||
cur += chunkSize;
|
||||
}
|
||||
ranges.push({ startBytes: cur, endBytes: entry.size-1 });
|
||||
ranges.push({ startBytes: cur, endBytes: fileSize-1 });
|
||||
|
||||
const [copyError] = await safe(async.eachOfLimit(ranges, 3, async function copyChunk(range, index) {
|
||||
const partCopyParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
CopySource: encodeCopySource(apiConfig.bucket, entry.fullPath), // See aws-sdk-js/issues/1302
|
||||
Key: fullToPath,
|
||||
CopySource: encodeCopySource(apiConfig.bucket, fullFromPath), // See aws-sdk-js/issues/1302
|
||||
CopySourceRange: 'bytes=' + range.startBytes + '-' + range.endBytes,
|
||||
PartNumber: index+1,
|
||||
UploadId: uploadId
|
||||
@@ -429,17 +432,17 @@ async function copyFile(apiConfig, oldFilePath, newFilePath, entry, progressCall
|
||||
if (copyError) {
|
||||
const abortParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
Key: fullToPath,
|
||||
UploadId: uploadId
|
||||
};
|
||||
progressCallback({ message: `Aborting multipart copy of ${relativePath || oldFilePath}` });
|
||||
progressCallback({ message: `Aborting multipart copy of ${fullFromPath}` });
|
||||
await safe(s3.abortMultipartUpload(abortParams), { debug }); // ignore any abort errors
|
||||
return throwError(copyError);
|
||||
}
|
||||
|
||||
const completeMultipartParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: path.join(newFilePath, relativePath),
|
||||
Key: fullToPath,
|
||||
MultipartUpload: { Parts: uploadedParts },
|
||||
UploadId: uploadId
|
||||
};
|
||||
@@ -450,22 +453,26 @@ async function copyFile(apiConfig, oldFilePath, newFilePath, entry, progressCall
|
||||
if (completeMultipartError) return throwError(completeMultipartError);
|
||||
}
|
||||
|
||||
async function copy(apiConfig, oldFilePath, newFilePath, progressCallback) {
|
||||
async function copy(apiConfig, fromPath, toPath, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof oldFilePath, 'string');
|
||||
assert.strictEqual(typeof newFilePath, 'string');
|
||||
assert.strictEqual(typeof fromPath, 'string');
|
||||
assert.strictEqual(typeof toPath, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
let total = 0;
|
||||
const concurrency = apiConfig.limits?.copyConcurrency || (apiConfig.provider === 's3' ? 500 : 10);
|
||||
const concurrency = apiConfig.limits?.copyConcurrency || (apiConfig._provider === 's3' ? 500 : 10);
|
||||
progressCallback({ message: `Copying with concurrency of ${concurrency}` });
|
||||
|
||||
let marker = null;
|
||||
while (true) {
|
||||
const batch = await listDir(apiConfig, oldFilePath, 1000, marker);
|
||||
const batch = await listDir(apiConfig, fromPath, 1000, marker); // returned entries are relative to fromPath
|
||||
total += batch.entries.length;
|
||||
progressCallback({ message: `Copying files from ${total-batch.entries.length}-${total}` });
|
||||
await async.eachLimit(batch.entries, concurrency, async (entry) => await copyFile(apiConfig, oldFilePath, newFilePath, entry, progressCallback));
|
||||
await async.eachLimit(batch.entries, concurrency, async (entry) => {
|
||||
const fullFromPath = path.join(apiConfig.prefix, fromPath, entry.fullPath);
|
||||
const fullToPath = path.join(apiConfig.prefix, toPath, entry.fullPath);
|
||||
await copyFile(apiConfig, fullFromPath, fullToPath, entry.size, progressCallback);
|
||||
});
|
||||
if (!batch.marker) break;
|
||||
marker = batch.marker;
|
||||
}
|
||||
@@ -473,20 +480,22 @@ async function copy(apiConfig, oldFilePath, newFilePath, progressCallback) {
|
||||
progressCallback({ message: `Copied ${total} files` });
|
||||
}
|
||||
|
||||
async function remove(apiConfig, filename) {
|
||||
async function remove(apiConfig, remotePath) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof filename, 'string');
|
||||
assert.strictEqual(typeof remotePath, 'string');
|
||||
|
||||
const s3 = createS3Client(apiConfig, { retryStrategy: RETRY_STRATEGY });
|
||||
|
||||
const fullRemotePath = path.join(apiConfig.prefix, remotePath);
|
||||
|
||||
const deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Key: filename
|
||||
Key: fullRemotePath
|
||||
};
|
||||
|
||||
// deleteObject does not return error if key is not found
|
||||
const [error] = await safe(s3.deleteObject(deleteParams));
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${filename}. ${formatError(error)}`);
|
||||
if (error) throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove ${fullRemotePath}. ${formatError(error)}`);
|
||||
}
|
||||
|
||||
function chunk(array, size) {
|
||||
@@ -505,9 +514,9 @@ function chunk(array, size) {
|
||||
return result;
|
||||
}
|
||||
|
||||
async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
async function removeDir(apiConfig, remotePathPrefix, progressCallback) {
|
||||
assert.strictEqual(typeof apiConfig, 'object');
|
||||
assert.strictEqual(typeof pathPrefix, 'string');
|
||||
assert.strictEqual(typeof remotePathPrefix, 'string');
|
||||
assert.strictEqual(typeof progressCallback, 'function');
|
||||
|
||||
// only use this client for DeleteObjects call. It forces md5 checksum and for anything else, it might crash
|
||||
@@ -516,30 +525,30 @@ async function removeDir(apiConfig, pathPrefix, progressCallback) {
|
||||
let total = 0;
|
||||
let marker = null;
|
||||
while (true) {
|
||||
const batch = await listDir(apiConfig, pathPrefix, 1000, marker);
|
||||
const batch = await listDir(apiConfig, remotePathPrefix, 1000, marker); // returns entries relative to remotePathPrefix
|
||||
|
||||
const entries = batch.entries;
|
||||
total += entries.length;
|
||||
|
||||
const chunkSize = apiConfig.limits?.deleteConcurrency || (apiConfig.provider !== 'digitalocean-spaces' ? 1000 : 100); // throttle objects in each request
|
||||
const chunkSize = apiConfig.limits?.deleteConcurrency || (apiConfig._provider !== 'digitalocean-spaces' ? 1000 : 100); // throttle objects in each request
|
||||
const chunks = chunk(entries, chunkSize);
|
||||
|
||||
await async.eachSeries(chunks, async function deleteFiles(objects) {
|
||||
const deleteParams = {
|
||||
Bucket: apiConfig.bucket,
|
||||
Delete: {
|
||||
Objects: objects.map(function (o) { return { Key: o.fullPath }; })
|
||||
Objects: objects.map(function (o) { return { Key: path.join(apiConfig.prefix, o.fullPath) }; })
|
||||
}
|
||||
};
|
||||
|
||||
const firstPath = objects[0].fullPath, lastPath = objects[objects.length-1].fullPath;
|
||||
progressCallback({ message: `Removing ${objects.length} files from ${firstPath} to ${lastPath}` });
|
||||
const fullFirstPath = path.join(apiConfig.prefix, objects[0].fullPath), fullLastPath = path.join(apiConfig.prefix, objects[objects.length-1].fullPath);
|
||||
progressCallback({ message: `Removing ${objects.length} files from ${fullFirstPath} to ${fullLastPath}` });
|
||||
|
||||
// deleteObjects does not return error if key is not found
|
||||
const [error] = await safe(deleteObjectsS3Client.deleteObjects(deleteParams));
|
||||
if (error) {
|
||||
progressCallback({ message: `Unable to remove from ${firstPath} to ${lastPath} ${error.message || error.Code}` });
|
||||
throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove from ${firstPath} to ${lastPath}. error: ${error.message}`);
|
||||
progressCallback({ message: `Unable to remove from ${fullFirstPath} to ${fullLastPath} ${error.message || error.Code}` });
|
||||
throw new BoxError(BoxError.EXTERNAL_ERROR, `Unable to remove from ${fullFirstPath} to ${fullLastPath}. error: ${error.message}`);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -618,7 +627,7 @@ async function verifyConfig({ id, provider, config }) {
|
||||
if (delError) throw new BoxError(BoxError.EXTERNAL_ERROR, `Error del object cloudron-testfile. ${formatError(delError)}`);
|
||||
|
||||
const newConfig = _.pick(config, ['accessKeyId', 'secretAccessKey', 'bucket', 'prefix', 'signatureVersion', 'acceptSelfSignedCerts', 'endpoint', 's3ForcePathStyle' ]);
|
||||
return { provider, ...newConfig };
|
||||
return { _provider: provider, ...newConfig };
|
||||
}
|
||||
|
||||
async function setup(apiConfig) {
|
||||
@@ -631,11 +640,11 @@ async function teardown(apiConfig) {
|
||||
|
||||
function removePrivateFields(apiConfig) {
|
||||
apiConfig.secretAccessKey = constants.SECRET_PLACEHOLDER;
|
||||
delete apiConfig.provider;
|
||||
delete apiConfig._provider;
|
||||
return apiConfig;
|
||||
}
|
||||
|
||||
function injectPrivateFields(newConfig, currentConfig) {
|
||||
if (newConfig.secretAccessKey === constants.SECRET_PLACEHOLDER) newConfig.secretAccessKey = currentConfig.secretAccessKey;
|
||||
newConfig.provider = currentConfig.provider;
|
||||
newConfig._provider = currentConfig._provider;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user