Files
cloudron-box/src/apptaskmanager.js
Girish Ramakrishnan f12b4faf34 lint
2026-03-12 23:23:23 +05:30

103 lines
3.3 KiB
JavaScript

import assert from 'node:assert';
import BoxError from './boxerror.js';
import logger from './logger.js';
import fs from 'node:fs';
import locks from './locks.js';
import path from 'node:path';
import paths from './paths.js';
import safe from 'safetydance';
import scheduler from './scheduler.js';
import tasks from './tasks.js';
const { log } = logger('apptaskmanager');
const gActiveTasks = {}; // indexed by app id
const gPendingTasks = [];
let gStarted = false;
const TASK_CONCURRENCY = 3;
const DRAIN_TIMER_SECS = 1000;
let gDrainTimerId = null;
async function drain() {
log(`drain: ${gPendingTasks.length} apptasks pending`);
for (let i = 0; i < gPendingTasks.length; i++) {
const space = Object.keys(gActiveTasks).length - TASK_CONCURRENCY;
if (space == 0) {
log('At concurrency limit, cannot drain anymore');
break;
}
const { appId, taskId, options, onFinished } = gPendingTasks[i];
// acquire lock _before_ the task. this prevents the task failing if it can't get a lock
const [lockError] = await safe(locks.acquire(`${locks.TYPE_APP_TASK_PREFIX}${appId}`));
if (lockError) continue;
gPendingTasks.splice(i, 1);
gActiveTasks[appId] = {};
const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log');
if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory
scheduler.suspendAppJobs(appId);
// background
let taskError = null, taskResult = null;
tasks.startTask(taskId, Object.assign(options, { logFile }))
.then((result) => { taskResult = result; })
.catch((error) => { taskError = error; })
.finally(async () => {
delete gActiveTasks[appId];
await safe(onFinished(taskError, taskResult), { debug: log }); // hasPendingTasks() can now return false
await locks.release(`${locks.TYPE_APP_TASK_PREFIX}${appId}`);
await locks.releaseByTaskId(taskId);
scheduler.resumeAppJobs(appId);
});
}
gDrainTimerId = null;
if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); // check for released locks
}
async function start() {
assert.strictEqual(gDrainTimerId, null);
assert.strictEqual(gStarted, false);
log('started');
gStarted = true;
if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS);
}
function scheduleTask(appId, taskId, options, onFinished) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskId, 'string');
assert.strictEqual(typeof options, 'object');
assert.strictEqual(typeof onFinished, 'function');
if (appId in gActiveTasks) {
onFinished(new BoxError(BoxError.CONFLICT, `Task for ${appId} is already active`));
return;
}
tasks.update(taskId, { percent: 1, message: gStarted ? 'Queued' : 'Waiting for platform to initialize' });
gPendingTasks.push({ appId, taskId, options, onFinished });
if (gStarted && !gDrainTimerId) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS);
}
function hasPendingTasks() {
return Object.keys(gActiveTasks).length > 0 || gPendingTasks.length > 0;
}
export default {
start,
scheduleTask,
hasPendingTasks
};