port taskmanager to use tasks
This commit is contained in:
@@ -0,0 +1,103 @@
|
||||
'use strict';
|
||||
|
||||
exports = module.exports = {
|
||||
resumeTasks: resumeTasks,
|
||||
scheduleTask: scheduleTask
|
||||
};
|
||||
|
||||
var appdb = require('./appdb.js'),
|
||||
apps = require('./apps.js'),
|
||||
assert = require('assert'),
|
||||
debug = require('debug')('box:taskmanager'),
|
||||
fs = require('fs'),
|
||||
locker = require('./locker.js'),
|
||||
safe = require('safetydance'),
|
||||
path = require('path'),
|
||||
paths = require('./paths.js'),
|
||||
tasks = require('./tasks.js');
|
||||
|
||||
let gActiveTasks = { }; // indexed by app id
|
||||
let gPendingTasks = [ ];
|
||||
let gPaused = true; // tracks if the platform is ready
|
||||
|
||||
const TASK_CONCURRENCY = 3;
|
||||
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
|
||||
|
||||
// callback is called when task is finished
|
||||
function scheduleTask(appId, taskId, callback) {
|
||||
assert.strictEqual(typeof appId, 'string');
|
||||
assert.strictEqual(typeof taskId, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
if (appId in gActiveTasks) {
|
||||
return callback(new Error(`Task for %s is already active: ${appId}`));
|
||||
}
|
||||
|
||||
if (gPaused) {
|
||||
debug(`Platform not ready yet, queueing task id ${taskId}`);
|
||||
gPendingTasks.push({ appId, taskId, callback });
|
||||
return;
|
||||
}
|
||||
|
||||
if (Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) {
|
||||
debug(`Reached concurrency limit, queueing task id ${taskId}`);
|
||||
gPendingTasks.push({ appId, taskId, callback });
|
||||
return;
|
||||
}
|
||||
|
||||
var lockError = locker.recursiveLock(locker.OP_APPTASK);
|
||||
|
||||
if (lockError) {
|
||||
debug(`Locked for another operation, queueing task id ${taskId}`);
|
||||
gPendingTasks.push({ appId, taskId, callback });
|
||||
return;
|
||||
}
|
||||
|
||||
gActiveTasks[appId] = {};
|
||||
|
||||
const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log');
|
||||
|
||||
if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory
|
||||
|
||||
tasks.startTask(taskId, { logFile }, function (error, result) {
|
||||
callback(error, result);
|
||||
|
||||
delete gActiveTasks[appId];
|
||||
locker.unlock(locker.OP_APPTASK); // unlock event will trigger next task
|
||||
});
|
||||
}
|
||||
|
||||
function startNextTask() {
|
||||
if (gPendingTasks.length === 0) return;
|
||||
|
||||
assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY);
|
||||
|
||||
const t = gPendingTasks.shift();
|
||||
scheduleTask(t.appId, t.taskId, t.callback);
|
||||
}
|
||||
|
||||
// resume app tasks when platform is ready or after a crash
|
||||
function resumeTasks(callback) {
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
debug('resuming tasks');
|
||||
|
||||
locker.on('unlocked', startNextTask);
|
||||
|
||||
gPaused = false;
|
||||
|
||||
apps.getAll(function (error, result) {
|
||||
if (error) return callback(error);
|
||||
|
||||
result.forEach(function (app) {
|
||||
if (app.installationState === appdb.ISTATE_INSTALLED && app.runState === appdb.RSTATE_RUNNING) return;
|
||||
if (app.installationState === appdb.ISTATE_ERROR) return;
|
||||
|
||||
debug(`resumeTask: starting app task for ${app.fqdn} ${app.id} and state ${app.installationState}`);
|
||||
|
||||
scheduleTask(app.id, app.taskId, NOOP_CALLBACK);
|
||||
});
|
||||
|
||||
callback(null);
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user