Files
cloudron-box/src/taskmanager.js

193 lines
5.9 KiB
JavaScript
Raw Normal View History

'use strict';
exports = module.exports = {
initialize: initialize,
uninitialize: uninitialize,
stopAppTask: stopAppTask,
startAppTask: startAppTask,
restartAppTask: restartAppTask,
stopPendingTasks: stopPendingTasks,
waitForPendingTasks: waitForPendingTasks
};
var appdb = require('./appdb.js'),
assert = require('assert'),
async = require('async'),
child_process = require('child_process'),
cloudron = require('./cloudron.js'),
debug = require('debug')('box:taskmanager'),
locker = require('./locker.js'),
2016-04-19 18:39:44 -07:00
sendFailureLogs = require('./logcollector.js').sendFailureLogs,
2016-02-09 12:14:04 -08:00
util = require('util'),
_ = require('underscore');
var gActiveTasks = { };
var gPendingTasks = [ ];
var gPlatformReady = false; // PaaS (addons) up and running
2016-06-20 22:39:11 -05:00
var gPlatformReadyTimer = null;
var TASK_CONCURRENCY = 5;
2015-11-03 15:15:37 -08:00
var NOOP_CALLBACK = function (error) { if (error) console.error(error); };
function initialize(callback) {
assert.strictEqual(typeof callback, 'function');
locker.on('unlocked', startNextTask);
2015-11-02 17:45:38 -08:00
2016-06-20 22:39:11 -05:00
gPlatformReadyTimer = setTimeout(function () {
gPlatformReady = true;
if (cloudron.isConfiguredSync()) {
resumeTasks();
} else {
cloudron.events.on(cloudron.EVENT_CONFIGURED, resumeTasks);
}
}, 30000); // wait 30 seconds to signal platform ready
callback();
}
function uninitialize(callback) {
assert.strictEqual(typeof callback, 'function');
gPendingTasks = [ ]; // clear this first, otherwise stopAppTask will resume them
2016-06-20 22:39:11 -05:00
clearTimeout(gPlatformReadyTimer);
cloudron.events.removeListener(cloudron.EVENT_CONFIGURED, resumeTasks);
2015-09-29 14:40:23 -07:00
locker.removeListener('unlocked', startNextTask);
async.eachSeries(Object.keys(gActiveTasks), stopAppTask, callback);
}
function stopPendingTasks(callback) {
assert.strictEqual(typeof callback, 'function');
gPendingTasks = [];
async.eachSeries(Object.keys(gActiveTasks), stopAppTask, callback);
}
function waitForPendingTasks(callback) {
assert.strictEqual(typeof callback, 'function');
function checkTasks() {
if (Object.keys(gActiveTasks).length === 0 && gPendingTasks.length === 0) return callback();
setTimeout(checkTasks, 1000);
}
checkTasks();
}
2015-11-02 17:45:38 -08:00
// resume app installs and uninstalls
function resumeTasks(callback) {
callback = callback || NOOP_CALLBACK;
2016-06-21 10:33:38 -05:00
debug('resuming tasks');
2015-11-02 17:45:38 -08:00
appdb.getAll(function (error, apps) {
if (error) return callback(error);
apps.forEach(function (app) {
if (app.installationState === appdb.ISTATE_INSTALLED && app.runState === appdb.RSTATE_RUNNING) return;
2016-04-20 19:05:49 -07:00
if (app.installationState === appdb.ISTATE_ERROR) return;
2015-11-02 17:45:38 -08:00
debug('Creating process for %s (%s) with state %s', app.location, app.id, app.installationState);
2016-02-09 12:14:04 -08:00
startAppTask(app.id, NOOP_CALLBACK);
2015-11-02 17:45:38 -08:00
});
callback(null);
});
}
function startNextTask() {
if (gPendingTasks.length === 0) return;
assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY);
2016-02-09 12:14:04 -08:00
startAppTask(gPendingTasks.shift(), NOOP_CALLBACK);
}
2016-02-09 12:14:04 -08:00
function startAppTask(appId, callback) {
assert.strictEqual(typeof appId, 'string');
2016-02-09 12:14:04 -08:00
assert.strictEqual(typeof callback, 'function');
if (appId in gActiveTasks) {
return callback(new Error(util.format('Task for %s is already active', appId)));
}
if (!gPlatformReady) {
debug('Platform not ready yet, queueing task for %s', appId);
gPendingTasks.push(appId);
return callback();
}
if (Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) {
debug('Reached concurrency limit, queueing task for %s', appId);
gPendingTasks.push(appId);
2016-02-09 12:14:04 -08:00
return callback();
}
var lockError = locker.recursiveLock(locker.OP_APPTASK);
if (lockError) {
debug('Locked for another operation, queueing task for %s', appId);
gPendingTasks.push(appId);
2016-02-09 12:14:04 -08:00
return callback();
}
// when parent process dies, apptask processes are killed because KillMode=control-group in systemd unit file
gActiveTasks[appId] = child_process.fork(__dirname + '/apptask.js', [ appId ]);
2015-11-09 21:58:34 -08:00
var pid = gActiveTasks[appId].pid;
debug('Started task of %s pid: %s', appId, pid);
2015-12-14 19:07:56 -08:00
gActiveTasks[appId].once('exit', function (code, signal) {
2015-11-09 21:58:34 -08:00
debug('Task for %s pid %s completed with status %s', appId, pid, code);
2015-12-14 19:07:56 -08:00
if (code === null /* signal */ || (code !== 0 && code !== 50)) { // apptask crashed
debug('Apptask crashed with code %s and signal %s', code, signal);
2016-04-19 18:39:44 -07:00
sendFailureLogs('apptask', { unit: 'box' });
2015-12-14 19:07:56 -08:00
appdb.update(appId, { installationState: appdb.ISTATE_ERROR, installationProgress: 'Apptask crashed with code ' + code + ' and signal ' + signal }, NOOP_CALLBACK);
2016-04-19 18:39:44 -07:00
} else if (code === 50) {
sendFailureLogs('apptask', { unit: 'box' });
}
delete gActiveTasks[appId];
locker.unlock(locker.OP_APPTASK); // unlock event will trigger next task
});
2016-02-09 12:14:04 -08:00
callback();
}
function stopAppTask(appId, callback) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof callback, 'function');
if (gActiveTasks[appId]) {
2015-11-09 21:58:34 -08:00
debug('stopAppTask : Killing existing task of %s with pid %s', appId, gActiveTasks[appId].pid);
gActiveTasks[appId].once('exit', function () { callback(); });
gActiveTasks[appId].kill('SIGTERM'); // this will end up calling the 'exit' handler
return;
}
if (gPendingTasks.indexOf(appId) !== -1) {
2015-11-09 21:58:34 -08:00
debug('stopAppTask: Removing pending task : %s', appId);
gPendingTasks = _.without(gPendingTasks, appId);
2015-11-09 21:58:34 -08:00
} else {
debug('stopAppTask: no task for %s to be stopped', appId);
}
callback();
}
function restartAppTask(appId, callback) {
callback = callback || NOOP_CALLBACK;
async.series([
stopAppTask.bind(null, appId),
startAppTask.bind(null, appId)
], callback);
}