Files
cloudron-box/src/apptaskmanager.js
T

95 lines
3.1 KiB
JavaScript
Raw Normal View History

2019-08-28 15:00:55 -07:00
'use strict';
exports = module.exports = {
start,
scheduleTask
2019-08-28 15:00:55 -07:00
};
2025-08-14 11:17:38 +05:30
const assert = require('node:assert'),
2019-12-04 10:29:06 -08:00
BoxError = require('./boxerror.js'),
2019-09-19 16:40:31 -07:00
debug = require('debug')('box:apptaskmanager'),
2025-08-14 11:17:38 +05:30
fs = require('node:fs'),
2024-12-07 14:35:45 +01:00
locks = require('./locks.js'),
2025-08-14 11:17:38 +05:30
path = require('node:path'),
2019-08-28 15:00:55 -07:00
paths = require('./paths.js'),
2024-12-07 14:35:45 +01:00
safe = require('safetydance'),
scheduler = require('./scheduler.js'),
2019-08-28 15:00:55 -07:00
tasks = require('./tasks.js');
2024-06-27 14:34:29 +02:00
const gActiveTasks = {}; // indexed by app id
const gPendingTasks = [];
let gStarted = false;
2019-08-28 15:00:55 -07:00
const TASK_CONCURRENCY = 3;
const DRAIN_TIMER_SECS = 1000;
2019-08-28 15:00:55 -07:00
let gDrainTimerId = null;
2019-08-28 15:00:55 -07:00
2024-12-07 14:35:45 +01:00
async function drain() {
debug(`drain: ${gPendingTasks.length} apptasks pending`);
2019-08-28 15:00:55 -07:00
2024-12-07 14:35:45 +01:00
for (let i = 0; i < gPendingTasks.length; i++) {
const space = Object.keys(gActiveTasks).length - TASK_CONCURRENCY;
if (space == 0) {
debug('At concurrency limit, cannot drain anymore');
break;
}
2019-08-28 15:00:55 -07:00
2024-12-07 14:35:45 +01:00
const { appId, taskId, options, onFinished } = gPendingTasks[i];
2019-08-28 15:00:55 -07:00
2025-07-18 13:22:33 +02:00
// acquire lock _before_ the task. this prevents the task failing if it can't get a lock
const [lockError] = await safe(locks.acquire(`${locks.TYPE_APP_TASK_PREFIX}${appId}`));
2024-12-07 14:35:45 +01:00
if (lockError) continue;
2024-12-07 14:35:45 +01:00
gPendingTasks.splice(i, 1);
gActiveTasks[appId] = {};
2019-08-28 15:00:55 -07:00
2024-12-07 14:35:45 +01:00
const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log');
2020-08-19 18:23:44 +02:00
2024-12-07 14:35:45 +01:00
if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory
scheduler.suspendAppJobs(appId);
2019-08-28 15:00:55 -07:00
2025-06-17 18:54:12 +02:00
// background
tasks.startTask(taskId, Object.assign(options, { logFile }))
.then(async (result) => await safe(onFinished(null, result), { debug }))
.catch(async (error) => await safe(onFinished(error), { debug }))
.finally(async () => {
2025-06-17 18:54:12 +02:00
delete gActiveTasks[appId];
2025-07-18 13:22:33 +02:00
await locks.release(`${locks.TYPE_APP_TASK_PREFIX}${appId}`);
2025-07-18 18:11:56 +02:00
await locks.releaseByTaskId(taskId);
2025-06-17 18:54:12 +02:00
scheduler.resumeAppJobs(appId);
});
2024-12-07 14:35:45 +01:00
}
2019-08-28 15:00:55 -07:00
gDrainTimerId = null;
if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); // check for released locks
}
async function start() {
assert.strictEqual(gDrainTimerId, null);
assert.strictEqual(gStarted, false);
debug('started');
gStarted = true;
if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS);
}
function scheduleTask(appId, taskId, options, onFinished) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskId, 'string');
assert.strictEqual(typeof options, 'object');
assert.strictEqual(typeof onFinished, 'function');
if (appId in gActiveTasks) {
onFinished(new BoxError(BoxError.CONFLICT, `Task for ${appId} is already active`));
return;
}
tasks.update(taskId, { percent: 1, message: gStarted ? 'Queued' : 'Waiting for platform to initialize' });
gPendingTasks.push({ appId, taskId, options, onFinished });
if (gStarted && !gDrainTimerId) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS);
2019-08-28 15:00:55 -07:00
}