diff --git a/src/scheduler.js b/src/scheduler.js index 3712f71f8..2c039102b 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -17,14 +17,16 @@ var appdb = require('./appdb.js'), var NOOP_CALLBACK = function (error) { if (error) console.error(error); }; -// appId -> { tasksConfig (manifest), jobs -> { containerId, cronJob } } +// appId -> { schedulerConfig (manifest), cronjobs, containerIds } +var gState = null; // null indicates that we will load state on first sync + function loadState() { - var tasks = safe.JSON.parse(safe.fs.readFileSync(paths.SCHEDULER_FILE, 'utf8')); - return tasks || { }; + var state = safe.JSON.parse(safe.fs.readFileSync(paths.SCHEDULER_FILE, 'utf8')); + return state || { }; } -function saveState(tasks) { - safe.fs.writeFileSync(paths.SCHEDULER_FILE, JSON.stringify(tasks, null, 4), 'utf8'); +function saveState(state) { + safe.fs.writeFileSync(paths.SCHEDULER_FILE, JSON.stringify(_.omit(state, 'cronJobs'), null, 4), 'utf8'); } function sync(callback) { @@ -34,88 +36,54 @@ function sync(callback) { debug('Syncing'); - var state = loadState(); + if (gState === null) gState = loadState(); apps.getAll(function (error, allApps) { if (error) return callback(error); // stop tasks of apps that went away var allAppIds = allApps.map(function (app) { return app.id; }); - var removedAppIds = _.difference(Object.keys(state), allAppIds); + var removedAppIds = _.difference(Object.keys(gState), allAppIds); async.eachSeries(removedAppIds, function (appId, iteratorDone) { - stopJobs(appId, state[appId], iteratorDone); + stopJobs(appId, gState[appId], iteratorDone); }, function (error) { if (error) debug('Error stopping jobs : %j', error); - state = _.omit(state, removedAppIds); + gState = _.omit(gState, removedAppIds); // start tasks of new apps - allApps.forEach(function (app) { - var newAppState = resetAppState(app.id, state[app.id] || null, app.manifest.addons.scheduler || null); + async.eachSeries(allApps, function (app, iteratorDone) { + var appState = gState[app.id] || null; + var schedulerConfig = app.manifest.addons.scheduler || null; - if (newAppState) state[app.id] = newAppState; else delete state[app.id]; + if (!appState && !schedulerConfig) return iteratorDone(); // nothing changed + if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig)) return iteratorDone(); // nothing changed + + stopJobs(app.id, appState, function (error) { + if (error) debug('Error stopping jobs for %s : %s', app.id, error.message); + + if (!schedulerConfig) { + delete gState[app.id]; + return iteratorDone(); + } + + gState[app.id] = { + schedulerConfig: schedulerConfig, + cronJobs: createCronJobs(app.id, schedulerConfig), + containerIds: { } + }; + + saveState(gState); + + iteratorDone(); + }); }); - saveState(state); - debug('Done syncing'); }); }); } -function stopJobs(appId, appState, callback) { - assert.strictEqual(typeof appId, 'string'); - - debug('stopJobs for %s', appId); - - async.eachSeries(Object.keys(appState.jobs), function (taskName, iteratorDone) { - appState.jobs[taskName].cronJob.stop(); - killTask(appState.jobs[taskName].containerId, iteratorDone); - }, callback); -} - -function createCronJobs(appId, tasksConfig) { - debug('creating cron jobs for app %s', appId); - - var jobs = { }; - - Object.keys(tasksConfig).forEach(function (taskName) { - var task = tasksConfig[taskName]; - - debug('scheduling task for %s/%s @ 00 %s : %s', appId, taskName, task.schedule, task.command); - - var cronJob = new CronJob({ - cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated - onTick: doTask.bind(null, appId, taskName), - start: true - }); - - jobs[taskName] = { cronJob: cronJob, containerId: null }; - }); - - return jobs; -} - -function resetAppState(appId, appState, tasksConfig) { - assert.strictEqual(typeof appId, 'string'); - assert.strictEqual(typeof appState, 'object'); - assert.strictEqual(typeof tasksConfig, 'object'); - - if (appState) { - // cleanup existing state - if (_.isEqual(appState.tasksConfig, tasksConfig)) return; // nothing changed - - stopJobs(appId, appState); // something changed, stop all the existing jobs - } - - if (!tasksConfig) return null; - - return { - tasksConfig: tasksConfig, - jobs: createCronJobs(appId, tasksConfig) - }; -} - function killTask(containerId, callback) { if (!containerId) return callback(); @@ -129,6 +97,45 @@ function killTask(containerId, callback) { }); } +function stopJobs(appId, appState, callback) { + assert.strictEqual(typeof appId, 'string'); + assert.strictEqual(typeof appState, 'object'); + + debug('stopJobs for %s', appId); + + if (!appState) return callback(); + + async.eachSeries(Object.keys(appState.schedulerConfig), function (taskName, iteratorDone) { + if (appState.cronJobs[taskName]) appState.cronJobs[taskName].stop(); // could be null across restarts + + killTask(appState.containerIds[taskName], iteratorDone); + }, callback); +} + +function createCronJobs(appId, schedulerConfig) { + debug('creating cron jobs for app %s', appId); + + if (!schedulerConfig) return null; + + var jobs = { }; + + Object.keys(schedulerConfig).forEach(function (taskName) { + var task = schedulerConfig[taskName]; + + debug('scheduling task for %s/%s @ 00 %s : %s', appId, taskName, task.schedule, task.command); + + var cronJob = new CronJob({ + cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated + onTick: doTask.bind(null, appId, taskName), + start: true + }); + + jobs[taskName] = cronJob; + }); + + return jobs; +} + function doTask(appId, taskName, callback) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); @@ -136,8 +143,7 @@ function doTask(appId, taskName, callback) { callback = callback || NOOP_CALLBACK; - var state = loadState(); - var job = state[appId].jobs[taskName]; + var appState = gState[appId]; apps.get(appId, function (error, app) { if (error) return callback(error); @@ -147,17 +153,17 @@ function doTask(appId, taskName, callback) { return callback(); } - if (job.containerId) debug('task %s/%s is already running. killing it'); + if (appState.containerIds[taskName]) debug('task %s/%s is already running. killing it'); - killTask(job.containerId, function (error) { + killTask(appState.containerIds[taskName], function (error) { if (error) return callback(error); debug('task %s/%s starting', app.id, taskName); - docker.createSubcontainer(app, [ '/bin/sh', '-c', state[appId].tasksConfig[taskName].command ], function (error, container) { - job.containerId = container.id; + docker.createSubcontainer(app, [ '/bin/sh', '-c', gState[appId].schedulerConfig[taskName].command ], function (error, container) { + appState.containerIds[taskName] = container.id; - saveState(state); + saveState(gState); docker.startContainer(container.id, callback); });