scheduler: load/save state

This commit is contained in:
Girish Ramakrishnan
2015-10-19 22:41:42 -07:00
parent 32ddda404c
commit d0360e9e68
2 changed files with 57 additions and 38 deletions

View File

@@ -11,11 +11,21 @@ var appdb = require('./appdb.js'),
CronJob = require('cron').CronJob,
debug = require('debug')('box:src/scheduler'),
docker = require('./docker.js').connection,
paths = require('./paths.js'),
safe = require('safetydance'),
_ = require('underscore');
var NOOP_CALLBACK = function (error) { if (error) console.error(error); };
var gTasks = { }; // appId -> { tasksConfig (manifest), jobs -> { containerId, cronJob } }
// appId -> { tasksConfig (manifest), jobs -> { containerId, cronJob } }
function loadState() {
var tasks = safe.JSON.parse(safe.fs.readFileSync(paths.SCHEDULER_FILE, 'utf8'));
return tasks || { };
}
function saveState(tasks) {
safe.fs.writeFileSync(paths.SCHEDULER_FILE, JSON.stringify(tasks, null, 4), 'utf8');
}
function sync(callback) {
assert(!callback || typeof callback === 'function');
@@ -24,81 +34,87 @@ function sync(callback) {
debug('Syncing');
var state = loadState();
apps.getAll(function (error, allApps) {
if (error) return callback(error);
// stop tasks of apps that went away
var allAppIds = allApps.map(function (app) { return app.id; });
var removedAppIds = _.difference(Object.keys(gTasks), allAppIds);
async.eachSeries(removedAppIds, stopJobs, function (error) {
var removedAppIds = _.difference(Object.keys(state), allAppIds);
async.eachSeries(removedAppIds, function (appId, iteratorDone) {
stopJobs(appId, state[appId], iteratorDone);
}, function (error) {
if (error) debug('Error stopping jobs : %j', error);
state = _.omit(state, removedAppIds);
// start tasks of new apps
allApps.forEach(function (app) {
resetTasks(app.id, app.manifest.addons.scheduler || null);
state[app.id] = resetAppState(app.id, state[app.id] || null, app.manifest.addons.scheduler || null);
});
saveState(state);
debug('Done syncing');
});
});
}
function stopJobs(appId, callback) {
function stopJobs(appId, appState, callback) {
assert.strictEqual(typeof appId, 'string');
debug('stopJobs for %s', appId);
async.eachSeries(Object.keys(gTasks[appId].jobs), function (taskName, iteratorDone) {
gTasks[appId].jobs[taskName].cronJob.stop();
killTask(appId, taskName, iteratorDone);
}, function (error) {
if (error) return callback(error);
delete gTasks[appId];
callback();
});
async.eachSeries(Object.keys(appState.jobs), function (taskName, iteratorDone) {
appState.jobs[taskName].cronJob.stop();
killTask(appState.jobs[taskName].containerId, iteratorDone);
}, callback);
}
function createCronJobs(appId, tasksConfig) {
gTasks[appId] = { tasksConfig: tasksConfig, jobs: { } };
debug('creating cron jobs for %s', appId);
var jobs = { };
Object.keys(tasksConfig).forEach(function (taskName) {
var task = tasksConfig[taskName];
debug('scheduling task %s/%s @ 00 %s : %s', appId, taskName, task.schedule, task.command);
var job = new CronJob({
var cronJob = new CronJob({
cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated
onTick: doTask.bind(null, appId, taskName, task),
onTick: doTask.bind(null, appId, taskName),
start: true
});
gTasks[appId].jobs[taskName] = { cronJob: job };
jobs[taskName] = { cronJob: cronJob, containerId: null };
});
return jobs;
}
function resetTasks(appId, tasksConfig) {
function resetAppState(appId, appState, tasksConfig) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof tasksConfig, 'object'); // can be null
assert.strictEqual(typeof appState, 'object');
assert.strictEqual(typeof tasksConfig, 'object');
// cleanup existing state
if (appId in gTasks) {
if (_.isEqual(gTasks[appId].tasksConfig, tasksConfig)) return; // nothing changed
if (appState) {
// cleanup existing state
if (_.isEqual(appState.tasksConfig, tasksConfig)) return; // nothing changed
stopJobs(appId); // something changes, stop all the existing jobs
stopJobs(appId, appState); // something changed, stop all the existing jobs
}
if (!tasksConfig) return;
if (!tasksConfig) return null;
createCronJobs(appId, tasksConfig);
return {
tasksConfig: tasksConfig,
jobs: createCronJobs(appId, tasksConfig)
};
}
function killTask(appId, taskName, callback) {
var containerId = gTasks[appId].jobs[taskName].containerId;
function killTask(containerId, callback) {
if (!containerId) return callback();
async.series([
@@ -107,19 +123,19 @@ function killTask(appId, taskName, callback) {
], callback);
}
function doTask(appId, taskName, task, callback) {
function doTask(appId, taskName, callback) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskName, 'string');
assert.strictEqual(typeof task, 'object');
assert(!callback || typeof callback === 'function');
callback = callback || NOOP_CALLBACK;
var containerId = gTasks[appId].jobs[taskName].containerId;
var state = loadState();
var job = state[appId].jobs[taskName];
if (containerId) {
if (job.containerId) {
debug('task %s/%s is already running. killing it');
return killTask(appId, taskName, callback);
return killTask(job.containerId, callback);
}
apps.get(appId, function (error, app) {
@@ -132,8 +148,10 @@ function doTask(appId, taskName, task, callback) {
debug('task %s/%s starting', app.id, taskName);
docker.createSubcontainer(app, [ '/bin/sh', '-c', task.command ], function (error, container) {
gTasks[appId].jobs[taskName].containerId = container.id;
docker.createSubcontainer(app, [ '/bin/sh', '-c', state[appId].tasksConfig[taskName].command ], function (error, container) {
job.containerId = container.id;
saveState(state);
docker.startContainer(container.id, callback);
});