'use strict'; exports = module.exports = { start, scheduleTask }; const assert = require('assert'), BoxError = require('./boxerror.js'), debug = require('debug')('box:apptaskmanager'), fs = require('fs'), locks = require('./locks.js'), path = require('path'), paths = require('./paths.js'), safe = require('safetydance'), scheduler = require('./scheduler.js'), tasks = require('./tasks.js'); const gActiveTasks = {}; // indexed by app id const gPendingTasks = []; let gStarted = false; const TASK_CONCURRENCY = 3; const DRAIN_TIMER_SECS = 1000; let gDrainTimerId = null; async function drain() { debug(`drain: ${gPendingTasks.length} apptasks pending`); for (let i = 0; i < gPendingTasks.length; i++) { const space = Object.keys(gActiveTasks).length - TASK_CONCURRENCY; if (space == 0) { debug('At concurrency limit, cannot drain anymore'); break; } const { appId, taskId, options, onFinished } = gPendingTasks[i]; const [lockError] = await safe(locks.acquire(`${locks.TYPE_APP_PREFIX}${appId}`)); if (lockError) continue; gPendingTasks.splice(i, 1); gActiveTasks[appId] = {}; const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log'); if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory scheduler.suspendAppJobs(appId); tasks.startTask(taskId, Object.assign(options, { logFile }), async function (error, result) { onFinished(error, result); delete gActiveTasks[appId]; await locks.release(`${locks.TYPE_APP_PREFIX}${appId}`); scheduler.resumeAppJobs(appId); }); } gDrainTimerId = null; if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); // check for released locks } async function start() { assert.strictEqual(gDrainTimerId, null); assert.strictEqual(gStarted, false); debug('started'); gStarted = true; if (gPendingTasks.length) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); } function scheduleTask(appId, taskId, options, onFinished) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskId, 'string'); assert.strictEqual(typeof options, 'object'); assert.strictEqual(typeof onFinished, 'function'); if (appId in gActiveTasks) { onFinished(new BoxError(BoxError.CONFLICT, `Task for ${appId} is already active`)); return; } // percent 1 is relies on the tasks "active" flag to indicate task is queued but not started yet tasks.update(taskId, { percent: 1, message: gStarted ? 'Queued' : 'Waiting for platform to initialize' }); gPendingTasks.push({ appId, taskId, options, onFinished }); if (gStarted && !gDrainTimerId) gDrainTimerId = setTimeout(drain, DRAIN_TIMER_SECS); }