'use strict'; exports = module.exports = { start, scheduleTask }; const assert = require('node:assert'), BoxError = require('./boxerror.js'), debug = require('debug')('box:apptaskmanager'), fs = require('node:fs'), locks = require('./locks.js'), path = require('node:path'), paths = require('./paths.js'), safe = require('safetydance'), scheduler = require('./scheduler.js'), tasks = require('./tasks.js'); const gActiveTasks = {}; // indexed by app id const gPendingTasks = []; let gStarted = false; const TASK_CONCURRENCY = 3; const DRAIN_TIMER_SECS = 1000; let gDrainTimerId = null; async function drain() { debug(`drain: ${gPendingTasks.length} apptasks pending`); for (let i = 0; i < gPendingTasks.length; i++) { const space = Object.keys(gActiveTasks).length - TASK_CONCURRENCY; if (space == 0) { debug('At concurrency limit, cannot drain anymore'); break; } const { appId, taskId, options, onFinished } = gPendingTasks[i]; // acquire lock _before_ the task. this prevents the task failing if it can't get a lock const [lockError] = await safe(locks.acquire(`${locks.TYPE_APP_TASK_PREFIX}${appId}`)); if (lockError) continue; gPendingTasks.splice(i, 1); gActiveTasks[appId] = {}; const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log'); if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory scheduler.suspendAppJobs(appId); // background tasks.startTask(taskId, Object.assign(options, { logFile })) .then(async (result) => await safe(onFinished(null, result), { debug })) .catch(async (error) => await safe(onFinished(error), { debug })) .finally(async () => { delete gActiveTasks[appId]; await locks.release(`${locks.TYPE_APP_TASK_PREFIX}${appId}`); await locks.releaseByTaskId(taskId); scheduler.resumeAppJobs(appId); }); } gDrainTimerId = null; if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); // check for released locks } async function start() { assert.strictEqual(gDrainTimerId, null); assert.strictEqual(gStarted, false); debug('started'); gStarted = true; if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); } function scheduleTask(appId, taskId, options, onFinished) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskId, 'string'); assert.strictEqual(typeof options, 'object'); assert.strictEqual(typeof onFinished, 'function'); if (appId in gActiveTasks) { onFinished(new BoxError(BoxError.CONFLICT, `Task for ${appId} is already active`)); return; } tasks.update(taskId, { percent: 1, message: gStarted ? 'Queued' : 'Waiting for platform to initialize' }); gPendingTasks.push({ appId, taskId, options, onFinished }); if (gStarted && !gDrainTimerId) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); }