diff --git a/src/paths.js b/src/paths.js index 930745f07..e39f16c47 100644 --- a/src/paths.js +++ b/src/paths.js @@ -12,6 +12,7 @@ exports = module.exports = { NGINX_CERT_DIR: path.join(config.baseDir(), 'data/nginx/cert'), ADDON_CONFIG_DIR: path.join(config.baseDir(), 'data/addons'), + SCHEDULER_FILE: path.join(config.baseDir(), 'data/addons/scheduler.json'), COLLECTD_APPCONFIG_DIR: path.join(config.baseDir(), 'data/collectd/collectd.conf.d'), diff --git a/src/scheduler.js b/src/scheduler.js index b8e4904f4..87083c11a 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -11,11 +11,21 @@ var appdb = require('./appdb.js'), CronJob = require('cron').CronJob, debug = require('debug')('box:src/scheduler'), docker = require('./docker.js').connection, + paths = require('./paths.js'), + safe = require('safetydance'), _ = require('underscore'); var NOOP_CALLBACK = function (error) { if (error) console.error(error); }; -var gTasks = { }; // appId -> { tasksConfig (manifest), jobs -> { containerId, cronJob } } +// appId -> { tasksConfig (manifest), jobs -> { containerId, cronJob } } +function loadState() { + var tasks = safe.JSON.parse(safe.fs.readFileSync(paths.SCHEDULER_FILE, 'utf8')); + return tasks || { }; +} + +function saveState(tasks) { + safe.fs.writeFileSync(paths.SCHEDULER_FILE, JSON.stringify(tasks, null, 4), 'utf8'); +} function sync(callback) { assert(!callback || typeof callback === 'function'); @@ -24,81 +34,87 @@ function sync(callback) { debug('Syncing'); + var state = loadState(); + apps.getAll(function (error, allApps) { if (error) return callback(error); // stop tasks of apps that went away var allAppIds = allApps.map(function (app) { return app.id; }); - var removedAppIds = _.difference(Object.keys(gTasks), allAppIds); - async.eachSeries(removedAppIds, stopJobs, function (error) { + var removedAppIds = _.difference(Object.keys(state), allAppIds); + async.eachSeries(removedAppIds, function (appId, iteratorDone) { + stopJobs(appId, state[appId], iteratorDone); + }, function (error) { if (error) debug('Error stopping jobs : %j', error); + state = _.omit(state, removedAppIds); + // start tasks of new apps allApps.forEach(function (app) { - resetTasks(app.id, app.manifest.addons.scheduler || null); + state[app.id] = resetAppState(app.id, state[app.id] || null, app.manifest.addons.scheduler || null); }); + saveState(state); + debug('Done syncing'); }); }); } -function stopJobs(appId, callback) { +function stopJobs(appId, appState, callback) { assert.strictEqual(typeof appId, 'string'); debug('stopJobs for %s', appId); - async.eachSeries(Object.keys(gTasks[appId].jobs), function (taskName, iteratorDone) { - gTasks[appId].jobs[taskName].cronJob.stop(); - killTask(appId, taskName, iteratorDone); - }, function (error) { - if (error) return callback(error); - - delete gTasks[appId]; - - callback(); - }); + async.eachSeries(Object.keys(appState.jobs), function (taskName, iteratorDone) { + appState.jobs[taskName].cronJob.stop(); + killTask(appState.jobs[taskName].containerId, iteratorDone); + }, callback); } function createCronJobs(appId, tasksConfig) { - gTasks[appId] = { tasksConfig: tasksConfig, jobs: { } }; - debug('creating cron jobs for %s', appId); + var jobs = { }; + Object.keys(tasksConfig).forEach(function (taskName) { var task = tasksConfig[taskName]; debug('scheduling task %s/%s @ 00 %s : %s', appId, taskName, task.schedule, task.command); - var job = new CronJob({ + var cronJob = new CronJob({ cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated - onTick: doTask.bind(null, appId, taskName, task), + onTick: doTask.bind(null, appId, taskName), start: true }); - gTasks[appId].jobs[taskName] = { cronJob: job }; + jobs[taskName] = { cronJob: cronJob, containerId: null }; }); + + return jobs; } -function resetTasks(appId, tasksConfig) { +function resetAppState(appId, appState, tasksConfig) { assert.strictEqual(typeof appId, 'string'); - assert.strictEqual(typeof tasksConfig, 'object'); // can be null + assert.strictEqual(typeof appState, 'object'); + assert.strictEqual(typeof tasksConfig, 'object'); - // cleanup existing state - if (appId in gTasks) { - if (_.isEqual(gTasks[appId].tasksConfig, tasksConfig)) return; // nothing changed + if (appState) { + // cleanup existing state + if (_.isEqual(appState.tasksConfig, tasksConfig)) return; // nothing changed - stopJobs(appId); // something changes, stop all the existing jobs + stopJobs(appId, appState); // something changed, stop all the existing jobs } - if (!tasksConfig) return; + if (!tasksConfig) return null; - createCronJobs(appId, tasksConfig); + return { + tasksConfig: tasksConfig, + jobs: createCronJobs(appId, tasksConfig) + }; } -function killTask(appId, taskName, callback) { - var containerId = gTasks[appId].jobs[taskName].containerId; - +function killTask(containerId, callback) { if (!containerId) return callback(); async.series([ @@ -107,19 +123,19 @@ function killTask(appId, taskName, callback) { ], callback); } -function doTask(appId, taskName, task, callback) { +function doTask(appId, taskName, callback) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); - assert.strictEqual(typeof task, 'object'); assert(!callback || typeof callback === 'function'); callback = callback || NOOP_CALLBACK; - var containerId = gTasks[appId].jobs[taskName].containerId; + var state = loadState(); + var job = state[appId].jobs[taskName]; - if (containerId) { + if (job.containerId) { debug('task %s/%s is already running. killing it'); - return killTask(appId, taskName, callback); + return killTask(job.containerId, callback); } apps.get(appId, function (error, app) { @@ -132,8 +148,10 @@ function doTask(appId, taskName, task, callback) { debug('task %s/%s starting', app.id, taskName); - docker.createSubcontainer(app, [ '/bin/sh', '-c', task.command ], function (error, container) { - gTasks[appId].jobs[taskName].containerId = container.id; + docker.createSubcontainer(app, [ '/bin/sh', '-c', state[appId].tasksConfig[taskName].command ], function (error, container) { + job.containerId = container.id; + + saveState(state); docker.startContainer(container.id, callback); });