diff --git a/CHANGES b/CHANGES index 63314b1f4..dec8820de 100644 --- a/CHANGES +++ b/CHANGES @@ -2875,4 +2875,5 @@ * oidc: use cloudron name as provider name * groups: add eventlog * resources: allow mounting devices into apps +* remove global lock diff --git a/migrations/20241206163257-locks-create-table.js b/migrations/20241206163257-locks-create-table.js new file mode 100644 index 000000000..36ca6cc7d --- /dev/null +++ b/migrations/20241206163257-locks-create-table.js @@ -0,0 +1,18 @@ +'use strict'; + +'use strict'; + +exports.up = async function (db) { + const cmd = 'CREATE TABLE IF NOT EXISTS locks(' + + 'id VARCHAR(128) NOT NULL UNIQUE,' + + 'dataJson TEXT,' + + 'version INT DEFAULT 1,' + + 'ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP' + + ') CHARACTER SET utf8 COLLATE utf8_bin'; + + await db.runSql(cmd); +}; + +exports.down = async function (db) { + await db.runSql('DROP TABLE locks'); +}; diff --git a/migrations/schema.sql b/migrations/schema.sql index 137a2b792..efa018aca 100644 --- a/migrations/schema.sql +++ b/migrations/schema.sql @@ -323,3 +323,10 @@ CREATE TABLE IF NOT EXISTS oidcClients( loginRedirectUri VARCHAR(256) DEFAULT "", tokenSignatureAlgorithm VARCHAR(128) DEFAULT "", PRIMARY KEY(id)); + +CREATE TABLE IF NOT EXISTS locks( + id VARCHAR(128) NOT NULL UNIQUE, + dataJson TEXT, + version INT DEFAULT 1 + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP); + diff --git a/src/apptaskmanager.js b/src/apptaskmanager.js index 26d62e3a5..d9beabd15 100644 --- a/src/apptaskmanager.js +++ b/src/apptaskmanager.js @@ -8,84 +8,70 @@ const assert = require('assert'), BoxError = require('./boxerror.js'), debug = require('debug')('box:apptaskmanager'), fs = require('fs'), - locker = require('./locker.js'), - safe = require('safetydance'), + locks = require('./locks.js'), path = require('path'), paths = require('./paths.js'), + safe = require('safetydance'), scheduler = require('./scheduler.js'), tasks = require('./tasks.js'); const gActiveTasks = {}; // indexed by app id const gPendingTasks = []; -let gInitialized = false; const TASK_CONCURRENCY = 3; -function waitText(lockOperation) { - if (lockOperation === locker.OP_BOX_UPDATE) return 'Waiting for Cloudron to finish updating. See the Settings view'; - if (lockOperation === locker.OP_INFRA_START) return 'Waiting for Platform Services to start. See the Services view'; - if (lockOperation === locker.OP_FULL_BACKUP) return 'Waiting for Cloudron to finish backup. See the Backups view'; +let gTimerId = null; - return ''; // cannot happen -} - -function initializeSync() { - gInitialized = true; - locker.on('unlocked', startNextTask); -} - -// callback is called when task is finished function scheduleTask(appId, taskId, options, onFinished) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskId, 'string'); assert.strictEqual(typeof options, 'object'); assert.strictEqual(typeof onFinished, 'function'); - if (!gInitialized) initializeSync(); - if (appId in gActiveTasks) { - return onFinished(new BoxError(BoxError.CONFLICT, `Task for ${appId} is already active`)); - } - - if (Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) { - debug(`Reached concurrency limit, queueing task id ${taskId}`); - tasks.update(taskId, { percent: 1, message: 'Waiting for other app tasks to complete' }); - gPendingTasks.push({ appId, taskId, options, onFinished }); + onFinished(new BoxError(BoxError.CONFLICT, `Task for ${appId} is already active`)); return; } - const lockError = locker.recursiveLock(locker.OP_APPTASK); + tasks.update(taskId, { percent: 1, message: 'Queued' }); + gPendingTasks.push({ appId, taskId, options, onFinished }); - if (lockError) { - debug(`Could not get lock. ${lockError.message}, queueing task id ${taskId}`); - tasks.update(taskId, { percent: 1, message: waitText(lockError.operation) }); - gPendingTasks.push({ appId, taskId, options, onFinished }); - return; + if (!gTimerId) gTimerId = setTimeout(drain, 1000); +} + +async function drain() { + debug(`drain: ${gPendingTasks.length} apptasks pending`); + + for (let i = 0; i < gPendingTasks.length; i++) { + const space = Object.keys(gActiveTasks).length - TASK_CONCURRENCY; + if (space == 0) { + debug('At concurrency limit, cannot drain anymore'); + break; + } + + const { appId, taskId, options, onFinished } = gPendingTasks[i]; + + const [lockError] = await safe(locks.acquire(`${locks.TYPE_APP_PREFIX}${appId}`)); + if (lockError) continue; + + gPendingTasks.splice(i, 1); + gActiveTasks[appId] = {}; + + const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log'); + + if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory + + scheduler.suspendJobs(appId); + + tasks.startTask(taskId, Object.assign(options, { logFile }), async function (error, result) { + onFinished(error, result); + + delete gActiveTasks[appId]; + await locks.release(`${locks.TYPE_APP_PREFIX}${appId}`); + scheduler.resumeJobs(appId); + }); } - gActiveTasks[appId] = {}; - - const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log'); - - if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory - - scheduler.suspendJobs(appId); - - tasks.startTask(taskId, Object.assign(options, { logFile }), function (error, result) { - onFinished(error, result); - - delete gActiveTasks[appId]; - locker.unlock(locker.OP_APPTASK); // unlock event will trigger next task - - scheduler.resumeJobs(appId); - }); -} - -function startNextTask() { - if (gPendingTasks.length === 0) return; - - assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY); - - const t = gPendingTasks.shift(); - scheduleTask(t.appId, t.taskId, t.options, t.onFinished); + gTimerId = null; + if (gPendingTasks.length) gTimerId = setTimeout(drain, 1000); // check for released locks } diff --git a/src/backups.js b/src/backups.js index 5bb964eb5..125dcc806 100644 --- a/src/backups.js +++ b/src/backups.js @@ -63,7 +63,7 @@ const assert = require('assert'), debug = require('debug')('box:backups'), eventlog = require('./eventlog.js'), hat = require('./hat.js'), - locker = require('./locker.js'), + locks = require('./locks.js'), mounts = require('./mounts.js'), path = require('path'), paths = require('./paths.js'), @@ -239,8 +239,8 @@ async function setState(id, state) { } async function startBackupTask(auditSource) { - const error = locker.lock(locker.OP_FULL_BACKUP); - if (error) throw new BoxError(BoxError.BAD_STATE, `Cannot backup now: ${error.message}`); + const [error] = await safe(locks.acquire(locks.TYPE_BACKUP_TASK)); + if (error) throw new BoxError(BoxError.BAD_STATE, `Another backup task is in progress: ${error.message}`); const backupConfig = await getConfig(); @@ -251,7 +251,8 @@ async function startBackupTask(auditSource) { await eventlog.add(eventlog.ACTION_BACKUP_START, auditSource, { taskId }); tasks.startTask(taskId, { timeout: 24 * 60 * 60 * 1000 /* 24 hours */, nice: 15, memoryLimit, oomScoreAdjust: -999 }, async function (error, backupId) { - locker.unlock(locker.OP_FULL_BACKUP); + await locks.release(locks.TYPE_BACKUP_TASK); + await locks.releaseByTaskId(taskId); const errorMessage = error ? error.message : ''; const timedOut = error ? error.code === tasks.ETIMEOUT : false; diff --git a/src/backuptask.js b/src/backuptask.js index 7ce5bd9a1..248a668e2 100644 --- a/src/backuptask.js +++ b/src/backuptask.js @@ -24,6 +24,7 @@ const apps = require('./apps.js'), database = require('./database.js'), debug = require('debug')('box:backuptask'), df = require('./df.js'), + locks = require('./locks.js'), path = require('path'), paths = require('./paths.js'), safe = require('safetydance'), @@ -117,8 +118,9 @@ async function restore(backupConfig, remotePath, progressCallback) { debug('restore: download completed, importing database'); await database.importFromFile(`${dataLayout.localRoot()}/box.mysqldump`); - debug('restore: database imported'); + + await locks.releaseAll(); } async function downloadApp(app, restoreConfig, progressCallback) { @@ -504,9 +506,11 @@ async function fullBackup(options, progressCallback) { continue; // nothing to backup } + await locks.wait(`${locks.TYPE_APP_PREFIX}${app.id}`); const startTime = new Date(); const appBackupId = await backupAppWithTag(app, tag, options, (progress) => progressCallback({ percent, message: progress.message })); debug(`fullBackup: app ${app.fqdn} backup finished. Took ${(new Date() - startTime)/1000} seconds`); + await locks.release(`${locks.TYPE_APP_PREFIX}${app.id}`); if (appBackupId) appBackupIds.push(appBackupId); // backupId can be null if in BAD_STATE and never backed up } diff --git a/src/locker.js b/src/locker.js deleted file mode 100644 index 6d6f511b0..000000000 --- a/src/locker.js +++ /dev/null @@ -1,75 +0,0 @@ -'use strict'; - -const assert = require('assert'), - BoxError = require('./boxerror.js'), - debug = require('debug')('box:locker'), - EventEmitter = require('events').EventEmitter, - util = require('util'); - -function Locker() { - this._operation = null; - this._timestamp = null; - this._watcherId = -1; - this._lockDepth = 0; // recursive locks -} -util.inherits(Locker, EventEmitter); - -// these are mutually exclusive operations -Locker.prototype.OP_BOX_UPDATE = 'box_update'; -Locker.prototype.OP_INFRA_START = 'infra_start'; -Locker.prototype.OP_FULL_BACKUP = 'full_backup'; -Locker.prototype.OP_APPTASK = 'apptask'; - -Locker.prototype.lock = function (operation) { - assert.strictEqual(typeof operation, 'string'); - - if (this._operation !== null) { - const error = new BoxError(BoxError.CONFLICT, `Locked for ${this._operation}`); - error.operation = this._operation; - return error; - } - - this._operation = operation; - ++this._lockDepth; - this._timestamp = new Date(); - this._watcherId = setInterval(() => { debug('Lock unreleased %s', this._operation); }, 1000 * 60 * 5); - - debug('Acquired : %s', this._operation); - - this.emit('locked', this._operation); - - return null; -}; - -Locker.prototype.recursiveLock = function (operation) { - if (this._operation === operation) { - ++this._lockDepth; - debug('Re-acquired : %s Depth : %s', this._operation, this._lockDepth); - return null; - } - - return this.lock(operation); -}; - -Locker.prototype.unlock = function (operation) { - assert.strictEqual(typeof operation, 'string'); - - if (this._operation !== operation) throw new BoxError(BoxError.BAD_STATE, 'Mismatched unlock. Current lock is for ' + this._operation); // throw because this is a programming error - - if (--this._lockDepth === 0) { - debug('Released : %s', this._operation); - - this._operation = null; - this._timestamp = null; - clearInterval(this._watcherId); - this._watcherId = -1; - } else { - debug('Recursive lock released : %s. Depth : %s', this._operation, this._lockDepth); - } - - this.emit('unlocked', operation); - - return null; -}; - -exports = module.exports = new Locker(); diff --git a/src/locks.js b/src/locks.js new file mode 100644 index 000000000..2f1287fc8 --- /dev/null +++ b/src/locks.js @@ -0,0 +1,119 @@ +'use strict'; + +exports = module.exports = { + setTaskId, + + acquire, + wait, + + release, + releaseAll, + releaseByTaskId, + + TYPE_APP_PREFIX: 'app_', + TYPE_UPDATE: 'update', + TYPE_UPDATE_TASK: 'update_task', + TYPE_BACKUP_TASK: 'backup_task' +}; + +const assert = require('assert'), + BoxError = require('./boxerror.js'), + database = require('./database.js'), + debug = require('debug')('box:locks'), + promiseRetry = require('./promise-retry.js'); + +let gTaskId = null; + +function setTaskId(taskId) { + assert.strictEqual(typeof taskId, 'string'); + gTaskId = taskId; +} + +async function read() { + const result = await database.query('SELECT version, dataJson FROM locks'); + return { version: result[0].version, data: JSON.parse(result[0].dataJson) }; +} + +async function write(value) { + assert.strictEqual(typeof value.version, 'number'); + assert.strictEqual(typeof value.data, 'object'); + + const result = await database.query('UPDATE locks SET dataJson=?, version=version+1 WHERE id=? AND version=?', [ JSON.stringify(value.data), 'platform', value.version ]); + if (result.affectedRows !== 1) throw new BoxError(BoxError.CONFLICT, 'Someone updated before we did'); + debug(`write: current locks: ${JSON.stringify(value.data)}`); +} + +function canAcquire(data, type) { + assert.strictEqual(typeof data, 'object'); + assert.strictEqual(typeof type, 'string'); + + if (type === exports.TYPE_UPDATE) { + if (Object.keys(data).some(k => k.startsWith('app-'))) return new BoxError(BoxError.BAD_STATE, 'One or more apptasks are active'); + } else if (type.startsWith(exports.TYPE_APP_PREFIX)) { + if (exports.TYPE_UPDATE in data) return new BoxError(BoxError.BAD_STATE, 'Update is active'); + } else if (type === exports.TYPE_BACKUP_TASK) { + if (exports.TYPE_UPDATE_TASK in data) return new BoxError(BoxError.BAD_STATE, 'Update task is active'); + } else if (type === exports.TYPE_UPDATE_TASK) { + if (exports.TYPE_BACKUP_TASK in data) return new BoxError(BoxError.BAD_STATE, 'Backup task is active'); + } + + return null; +} + +async function acquire(type) { + assert.strictEqual(typeof type, 'string'); + + await promiseRetry({ times: Number.MAX_SAFE_INTEGER, interval: 100, debug, retry: (error) => error.reason === BoxError.CONFLICT }, async () => { + const { version, data } = await read(); + if (type in data) throw new BoxError(BoxError.BAD_STATE, `Locked by ${data[type]}`); + const error = canAcquire(data, type); + if (error) throw error; + data[type] = gTaskId; + await write({ version, data }); + debug(`acquire: ${type}`); + }); +} + +async function wait(type) { + assert.strictEqual(typeof type, 'string'); + + await promiseRetry({ times: Number.MAX_SAFE_INTEGER, interval: 10000, debug }, async () => await acquire(type)); +} + +async function release(type) { + assert.strictEqual(typeof type, 'string'); + + await promiseRetry({ times: Number.MAX_SAFE_INTEGER, interval: 100, debug, retry: (error) => error.reason === BoxError.CONFLICT }, async () => { + const { version, data } = await read(); + if (!(type in data)) throw new BoxError(BoxError.BAD_STATE, `Lock ${type} was never acquired`); + if (data[type] !== gTaskId) throw new BoxError(BoxError.BAD_STATE, `Task ${gTaskId} attempted to release lock ${type} acquired by ${data[type]}`); + delete data[type]; + await write({ version, data }); + debug(`release: ${type}`); + }); +} + +async function releaseAll() { + await database.query('DELETE FROM locks'); + await database.query('INSERT INTO locks (id, dataJson) VALUES (?, ?)', [ 'platform', JSON.stringify({}) ]); + debug('releaseAll: all locks released'); +} + +async function releaseByTaskId(taskId) { + assert.strictEqual(typeof taskId, 'string'); + + await promiseRetry({ times: Number.MAX_SAFE_INTEGER, interval: 100, debug, retry: (error) => error.reason === BoxError.CONFLICT }, async () => { + const { version, data } = await read(); + + for (const type of Object.keys(data)) { + if (data[type] === taskId) { + debug(`releaseByTaskId: task ${taskId} forgot to unlock ${type}`); + delete data[type]; + } + } + + await write({ version, data }); + + debug(`releaseByTaskId: ${taskId}`); + }); +} diff --git a/src/platform.js b/src/platform.js index b2bdb22bc..c1a567d01 100644 --- a/src/platform.js +++ b/src/platform.js @@ -25,7 +25,7 @@ const apps = require('./apps.js'), dockerProxy = require('./dockerproxy.js'), fs = require('fs'), infra = require('./infra_version.js'), - locker = require('./locker.js'), + locks = require('./locks.js'), oidc = require('./oidc.js'), paths = require('./paths.js'), reverseProxy = require('./reverseproxy.js'), @@ -167,9 +167,6 @@ async function startInfra(restoreOptions) { debug(`startInfra: updating infrastructure from ${existingInfra.version} to ${infra.version}`); - const error = locker.lock(locker.OP_INFRA_START); - if (error) throw error; - for (let attempt = 0; attempt < 5; attempt++) { try { if (existingInfra.version !== infra.version) { @@ -193,8 +190,6 @@ async function startInfra(restoreOptions) { } } - locker.unlock(locker.OP_INFRA_START); - await onInfraReady(true /* infraChanged */); } @@ -203,6 +198,7 @@ async function initialize() { await database.initialize(); await tasks.stopAllTasks(); + await locks.releaseAll(); // always generate webadmin config since we have no versioning mechanism for the ejs const dashboardLocation = await dashboard.getLocation(); diff --git a/src/taskworker.js b/src/taskworker.js index e69fbd2b0..dc8cb002b 100755 --- a/src/taskworker.js +++ b/src/taskworker.js @@ -11,6 +11,7 @@ const apptask = require('./apptask.js'), dyndns = require('./dyndns.js'), externalLdap = require('./externalldap.js'), fs = require('fs'), + locks = require('./locks.js'), mailServer = require('./mailserver.js'), net = require('net'), reverseProxy = require('./reverseproxy.js'), @@ -78,6 +79,7 @@ async function main() { await setupLogging(); await setupNetworking(); await database.initialize(); + locks.setTaskId(taskId); } catch (initError) { console.error(initError); return process.exit(50); diff --git a/src/test/common.js b/src/test/common.js index 53bf0fe39..a941c0518 100644 --- a/src/test/common.js +++ b/src/test/common.js @@ -9,6 +9,7 @@ const apps = require('../apps.js'), domains = require('../domains.js'), expect = require('expect.js'), fs = require('fs'), + locks = require('../locks.js'), mailer = require('../mailer.js'), mailServer = require('../mailserver.js'), nock = require('nock'), @@ -237,6 +238,7 @@ async function setup() { const userId = await users.add(user.email, user, auditSource); user.id = userId; await tasks.stopAllTasks(); + await locks.releaseAll(); } async function cleanup() { diff --git a/src/test/locks-test.js b/src/test/locks-test.js new file mode 100644 index 000000000..54026bc3d --- /dev/null +++ b/src/test/locks-test.js @@ -0,0 +1,51 @@ +'use strict'; + +/* global it, describe, before, after */ + +const BoxError = require('../boxerror.js'), + common = require('./common.js'), + expect = require('expect.js'), + safe = require('safetydance'), + locks = require('../locks.js'); + +describe('Locks', function () { + this.timeout(20000); + + const { setup, cleanup } = common; + + before(setup); + after(cleanup); + + it('can release all locks', async function () { + await locks.releaseAll(); + }); + + it('acquire lock foo', async function () { + await locks.acquire('foo'); + }); + + it('cannot reacquire lock foo', async function () { + const [error] = await safe(locks.acquire('foo')); + expect(error.reason).to.be(BoxError.BAD_STATE); + }); + + it('cannot reacquire after release', async function () { + await locks.release('foo'); + await locks.acquire('foo'); + }); + + it('task gets a lock', async function () { + locks.setTaskId('42'); + await locks.acquire('tasklock'); + }); + + it('can wait for a lock', async function () { + setTimeout(() => locks.releaseByTaskId('42'), 3000); // release in 3 seconds + + const startTime = Date.now(); + await locks.wait('tasklock'); // retries only in 10s + const endTime = Date.now(); + expect(endTime - startTime).to.be.greaterThan(9900); + expect(endTime - startTime).to.be.lessThan(10100); + }); +}); diff --git a/src/updater.js b/src/updater.js index ada180db2..abef21011 100644 --- a/src/updater.js +++ b/src/updater.js @@ -24,7 +24,7 @@ const apps = require('./apps.js'), df = require('./df.js'), eventlog = require('./eventlog.js'), fs = require('fs'), - locker = require('./locker.js'), + locks = require('./locks.js'), os = require('os'), path = require('path'), paths = require('./paths.js'), @@ -172,11 +172,12 @@ async function update(boxUpdateInfo, options, progressCallback) { await checkFreeDiskSpace(2*1024*1024*1024); // check again in case backup is in same disk } + await locks.wait(locks.TYPE_UPDATE); + debug(`Updating box with ${boxUpdateInfo.sourceTarballUrl}`); - progressCallback({ percent: 70, message: 'Installing update' }); - - await shell.promises.sudo([ UPDATE_CMD, packageInfo.file, process.stdout.logFile ], {}); // run installer.sh from new box code as a separate service + const [error] = await safe(shell.promises.sudo([ UPDATE_CMD, packageInfo.file, process.stdout.logFile ], {})); // run installer.sh from new box code as a separate service + if (error) await locks.release(locks.TYPE_UPDATE); // Do not add any code here. The installer script will stop the box code any instant } @@ -205,8 +206,8 @@ async function updateToLatest(options, auditSource) { await checkUpdateRequirements(boxUpdateInfo); - const error = locker.lock(locker.OP_BOX_UPDATE); - if (error) throw new BoxError(BoxError.BAD_STATE, `Cannot update now: ${error.message}`); + const [error] = await safe(locks.acquire(locks.TYPE_UPDATE_TASK)); + if (error) throw new BoxError(BoxError.BAD_STATE, `Another update task is in progress: ${error.message}`); const backupConfig = await backups.getConfig(); const memoryLimit = backupConfig.limits?.memoryLimit ? Math.max(backupConfig.limits.memoryLimit/1024/1024, 400) : 400; @@ -215,7 +216,8 @@ async function updateToLatest(options, auditSource) { await eventlog.add(eventlog.ACTION_UPDATE, auditSource, { taskId, boxUpdateInfo }); tasks.startTask(taskId, { timeout: 20 * 60 * 60 * 1000 /* 20 hours */, nice: 15, memoryLimit }, async (error) => { - locker.unlock(locker.OP_BOX_UPDATE); + await locks.release(locks.TYPE_UPDATE_TASK); + await locks.releaseByTaskId(taskId); debug('Update failed with error. %o', error);