scheduler: do not save cronjob object in state
the cronjob object has lots of js stuff and stringify fails
This commit is contained in:
+80
-74
@@ -17,14 +17,16 @@ var appdb = require('./appdb.js'),
|
||||
|
||||
var NOOP_CALLBACK = function (error) { if (error) console.error(error); };
|
||||
|
||||
// appId -> { tasksConfig (manifest), jobs -> { containerId, cronJob } }
|
||||
// appId -> { schedulerConfig (manifest), cronjobs, containerIds }
|
||||
var gState = null; // null indicates that we will load state on first sync
|
||||
|
||||
function loadState() {
|
||||
var tasks = safe.JSON.parse(safe.fs.readFileSync(paths.SCHEDULER_FILE, 'utf8'));
|
||||
return tasks || { };
|
||||
var state = safe.JSON.parse(safe.fs.readFileSync(paths.SCHEDULER_FILE, 'utf8'));
|
||||
return state || { };
|
||||
}
|
||||
|
||||
function saveState(tasks) {
|
||||
safe.fs.writeFileSync(paths.SCHEDULER_FILE, JSON.stringify(tasks, null, 4), 'utf8');
|
||||
function saveState(state) {
|
||||
safe.fs.writeFileSync(paths.SCHEDULER_FILE, JSON.stringify(_.omit(state, 'cronJobs'), null, 4), 'utf8');
|
||||
}
|
||||
|
||||
function sync(callback) {
|
||||
@@ -34,88 +36,54 @@ function sync(callback) {
|
||||
|
||||
debug('Syncing');
|
||||
|
||||
var state = loadState();
|
||||
if (gState === null) gState = loadState();
|
||||
|
||||
apps.getAll(function (error, allApps) {
|
||||
if (error) return callback(error);
|
||||
|
||||
// stop tasks of apps that went away
|
||||
var allAppIds = allApps.map(function (app) { return app.id; });
|
||||
var removedAppIds = _.difference(Object.keys(state), allAppIds);
|
||||
var removedAppIds = _.difference(Object.keys(gState), allAppIds);
|
||||
async.eachSeries(removedAppIds, function (appId, iteratorDone) {
|
||||
stopJobs(appId, state[appId], iteratorDone);
|
||||
stopJobs(appId, gState[appId], iteratorDone);
|
||||
}, function (error) {
|
||||
if (error) debug('Error stopping jobs : %j', error);
|
||||
|
||||
state = _.omit(state, removedAppIds);
|
||||
gState = _.omit(gState, removedAppIds);
|
||||
|
||||
// start tasks of new apps
|
||||
allApps.forEach(function (app) {
|
||||
var newAppState = resetAppState(app.id, state[app.id] || null, app.manifest.addons.scheduler || null);
|
||||
async.eachSeries(allApps, function (app, iteratorDone) {
|
||||
var appState = gState[app.id] || null;
|
||||
var schedulerConfig = app.manifest.addons.scheduler || null;
|
||||
|
||||
if (newAppState) state[app.id] = newAppState; else delete state[app.id];
|
||||
if (!appState && !schedulerConfig) return iteratorDone(); // nothing changed
|
||||
if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig)) return iteratorDone(); // nothing changed
|
||||
|
||||
stopJobs(app.id, appState, function (error) {
|
||||
if (error) debug('Error stopping jobs for %s : %s', app.id, error.message);
|
||||
|
||||
if (!schedulerConfig) {
|
||||
delete gState[app.id];
|
||||
return iteratorDone();
|
||||
}
|
||||
|
||||
gState[app.id] = {
|
||||
schedulerConfig: schedulerConfig,
|
||||
cronJobs: createCronJobs(app.id, schedulerConfig),
|
||||
containerIds: { }
|
||||
};
|
||||
|
||||
saveState(gState);
|
||||
|
||||
iteratorDone();
|
||||
});
|
||||
});
|
||||
|
||||
saveState(state);
|
||||
|
||||
debug('Done syncing');
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function stopJobs(appId, appState, callback) {
|
||||
assert.strictEqual(typeof appId, 'string');
|
||||
|
||||
debug('stopJobs for %s', appId);
|
||||
|
||||
async.eachSeries(Object.keys(appState.jobs), function (taskName, iteratorDone) {
|
||||
appState.jobs[taskName].cronJob.stop();
|
||||
killTask(appState.jobs[taskName].containerId, iteratorDone);
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function createCronJobs(appId, tasksConfig) {
|
||||
debug('creating cron jobs for app %s', appId);
|
||||
|
||||
var jobs = { };
|
||||
|
||||
Object.keys(tasksConfig).forEach(function (taskName) {
|
||||
var task = tasksConfig[taskName];
|
||||
|
||||
debug('scheduling task for %s/%s @ 00 %s : %s', appId, taskName, task.schedule, task.command);
|
||||
|
||||
var cronJob = new CronJob({
|
||||
cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated
|
||||
onTick: doTask.bind(null, appId, taskName),
|
||||
start: true
|
||||
});
|
||||
|
||||
jobs[taskName] = { cronJob: cronJob, containerId: null };
|
||||
});
|
||||
|
||||
return jobs;
|
||||
}
|
||||
|
||||
function resetAppState(appId, appState, tasksConfig) {
|
||||
assert.strictEqual(typeof appId, 'string');
|
||||
assert.strictEqual(typeof appState, 'object');
|
||||
assert.strictEqual(typeof tasksConfig, 'object');
|
||||
|
||||
if (appState) {
|
||||
// cleanup existing state
|
||||
if (_.isEqual(appState.tasksConfig, tasksConfig)) return; // nothing changed
|
||||
|
||||
stopJobs(appId, appState); // something changed, stop all the existing jobs
|
||||
}
|
||||
|
||||
if (!tasksConfig) return null;
|
||||
|
||||
return {
|
||||
tasksConfig: tasksConfig,
|
||||
jobs: createCronJobs(appId, tasksConfig)
|
||||
};
|
||||
}
|
||||
|
||||
function killTask(containerId, callback) {
|
||||
if (!containerId) return callback();
|
||||
|
||||
@@ -129,6 +97,45 @@ function killTask(containerId, callback) {
|
||||
});
|
||||
}
|
||||
|
||||
function stopJobs(appId, appState, callback) {
|
||||
assert.strictEqual(typeof appId, 'string');
|
||||
assert.strictEqual(typeof appState, 'object');
|
||||
|
||||
debug('stopJobs for %s', appId);
|
||||
|
||||
if (!appState) return callback();
|
||||
|
||||
async.eachSeries(Object.keys(appState.schedulerConfig), function (taskName, iteratorDone) {
|
||||
if (appState.cronJobs[taskName]) appState.cronJobs[taskName].stop(); // could be null across restarts
|
||||
|
||||
killTask(appState.containerIds[taskName], iteratorDone);
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function createCronJobs(appId, schedulerConfig) {
|
||||
debug('creating cron jobs for app %s', appId);
|
||||
|
||||
if (!schedulerConfig) return null;
|
||||
|
||||
var jobs = { };
|
||||
|
||||
Object.keys(schedulerConfig).forEach(function (taskName) {
|
||||
var task = schedulerConfig[taskName];
|
||||
|
||||
debug('scheduling task for %s/%s @ 00 %s : %s', appId, taskName, task.schedule, task.command);
|
||||
|
||||
var cronJob = new CronJob({
|
||||
cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated
|
||||
onTick: doTask.bind(null, appId, taskName),
|
||||
start: true
|
||||
});
|
||||
|
||||
jobs[taskName] = cronJob;
|
||||
});
|
||||
|
||||
return jobs;
|
||||
}
|
||||
|
||||
function doTask(appId, taskName, callback) {
|
||||
assert.strictEqual(typeof appId, 'string');
|
||||
assert.strictEqual(typeof taskName, 'string');
|
||||
@@ -136,8 +143,7 @@ function doTask(appId, taskName, callback) {
|
||||
|
||||
callback = callback || NOOP_CALLBACK;
|
||||
|
||||
var state = loadState();
|
||||
var job = state[appId].jobs[taskName];
|
||||
var appState = gState[appId];
|
||||
|
||||
apps.get(appId, function (error, app) {
|
||||
if (error) return callback(error);
|
||||
@@ -147,17 +153,17 @@ function doTask(appId, taskName, callback) {
|
||||
return callback();
|
||||
}
|
||||
|
||||
if (job.containerId) debug('task %s/%s is already running. killing it');
|
||||
if (appState.containerIds[taskName]) debug('task %s/%s is already running. killing it');
|
||||
|
||||
killTask(job.containerId, function (error) {
|
||||
killTask(appState.containerIds[taskName], function (error) {
|
||||
if (error) return callback(error);
|
||||
|
||||
debug('task %s/%s starting', app.id, taskName);
|
||||
|
||||
docker.createSubcontainer(app, [ '/bin/sh', '-c', state[appId].tasksConfig[taskName].command ], function (error, container) {
|
||||
job.containerId = container.id;
|
||||
docker.createSubcontainer(app, [ '/bin/sh', '-c', gState[appId].schedulerConfig[taskName].command ], function (error, container) {
|
||||
appState.containerIds[taskName] = container.id;
|
||||
|
||||
saveState(state);
|
||||
saveState(gState);
|
||||
|
||||
docker.startContainer(container.id, callback);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user