import assert from 'node:assert'; import BoxError from './boxerror.js'; import debugModule from 'debug'; import fs from 'node:fs'; import locks from './locks.js'; import path from 'node:path'; import paths from './paths.js'; import safe from 'safetydance'; import scheduler from './scheduler.js'; import tasks from './tasks.js'; const debug = debugModule('box:apptaskmanager'); const gActiveTasks = {}; // indexed by app id const gPendingTasks = []; let gStarted = false; const TASK_CONCURRENCY = 3; const DRAIN_TIMER_SECS = 1000; let gDrainTimerId = null; async function drain() { debug(`drain: ${gPendingTasks.length} apptasks pending`); for (let i = 0; i < gPendingTasks.length; i++) { const space = Object.keys(gActiveTasks).length - TASK_CONCURRENCY; if (space == 0) { debug('At concurrency limit, cannot drain anymore'); break; } const { appId, taskId, options, onFinished } = gPendingTasks[i]; // acquire lock _before_ the task. this prevents the task failing if it can't get a lock const [lockError] = await safe(locks.acquire(`${locks.TYPE_APP_TASK_PREFIX}${appId}`)); if (lockError) continue; gPendingTasks.splice(i, 1); gActiveTasks[appId] = {}; const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log'); if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory scheduler.suspendAppJobs(appId); // background let taskError = null, taskResult = null; tasks.startTask(taskId, Object.assign(options, { logFile })) .then((result) => { taskResult = result; }) .catch((error) => { taskError = error; }) .finally(async () => { delete gActiveTasks[appId]; await safe(onFinished(taskError, taskResult), { debug }); // hasPendingTasks() can now return false await locks.release(`${locks.TYPE_APP_TASK_PREFIX}${appId}`); await locks.releaseByTaskId(taskId); scheduler.resumeAppJobs(appId); }); } gDrainTimerId = null; if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); // check for released locks } async function start() { assert.strictEqual(gDrainTimerId, null); assert.strictEqual(gStarted, false); debug('started'); gStarted = true; if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); } function scheduleTask(appId, taskId, options, onFinished) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskId, 'string'); assert.strictEqual(typeof options, 'object'); assert.strictEqual(typeof onFinished, 'function'); if (appId in gActiveTasks) { onFinished(new BoxError(BoxError.CONFLICT, `Task for ${appId} is already active`)); return; } tasks.update(taskId, { percent: 1, message: gStarted ? 'Queued' : 'Waiting for platform to initialize' }); gPendingTasks.push({ appId, taskId, options, onFinished }); if (gStarted && !gDrainTimerId) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); } function hasPendingTasks() { return Object.keys(gActiveTasks).length > 0 || gPendingTasks.length > 0; } export default { start, scheduleTask, hasPendingTasks };