diff --git a/src/scheduler.js b/src/scheduler.js index b27a4300e..b8e4904f4 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -30,29 +30,40 @@ function sync(callback) { // stop tasks of apps that went away var allAppIds = allApps.map(function (app) { return app.id; }); var removedAppIds = _.difference(Object.keys(gTasks), allAppIds); - removedAppIds.forEach(stopJobs); + async.eachSeries(removedAppIds, stopJobs, function (error) { + if (error) debug('Error stopping jobs : %j', error); - // start tasks of new apps - allApps.forEach(function (app) { resetTasks(app.id, app.manifest.addons.scheduler || null); }); + // start tasks of new apps + allApps.forEach(function (app) { + resetTasks(app.id, app.manifest.addons.scheduler || null); + }); + + debug('Done syncing'); + }); }); } -function stopJobs(appId) { +function stopJobs(appId, callback) { assert.strictEqual(typeof appId, 'string'); debug('stopJobs for %s', appId); - for (var job in gTasks[appId].jobs) { - job.cronJob.stop(); - } + async.eachSeries(Object.keys(gTasks[appId].jobs), function (taskName, iteratorDone) { + gTasks[appId].jobs[taskName].cronJob.stop(); + killTask(appId, taskName, iteratorDone); + }, function (error) { + if (error) return callback(error); - delete gTasks[appId]; + delete gTasks[appId]; + + callback(); + }); } -function startJobs(appId, tasksConfig) { +function createCronJobs(appId, tasksConfig) { gTasks[appId] = { tasksConfig: tasksConfig, jobs: { } }; - debug('startJobs for %s', appId); + debug('creating cron jobs for %s', appId); Object.keys(tasksConfig).forEach(function (taskName) { var task = tasksConfig[taskName]; @@ -61,7 +72,7 @@ function startJobs(appId, tasksConfig) { var job = new CronJob({ cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated - onTick: runTask.bind(null, appId, taskName, task), + onTick: doTask.bind(null, appId, taskName, task), start: true }); @@ -71,21 +82,32 @@ function startJobs(appId, tasksConfig) { function resetTasks(appId, tasksConfig) { assert.strictEqual(typeof appId, 'string'); - assert.strictEqual(typeof tasksConfig, 'object'); + assert.strictEqual(typeof tasksConfig, 'object'); // can be null // cleanup existing state if (appId in gTasks) { if (_.isEqual(gTasks[appId].tasksConfig, tasksConfig)) return; // nothing changed - stopJobs(appId); + stopJobs(appId); // something changes, stop all the existing jobs } if (!tasksConfig) return; - startJobs(appId, tasksConfig); + createCronJobs(appId, tasksConfig); } -function runTask(appId, taskName, task, callback) { +function killTask(appId, taskName, callback) { + var containerId = gTasks[appId].jobs[taskName].containerId; + + if (!containerId) return callback(); + + async.series([ + docker.stopContainer.bind(null, containerId), + docker.deleteContainer.bind(null, containerId) + ], callback); +} + +function doTask(appId, taskName, task, callback) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); assert.strictEqual(typeof task, 'object'); @@ -96,12 +118,8 @@ function runTask(appId, taskName, task, callback) { var containerId = gTasks[appId].jobs[taskName].containerId; if (containerId) { - debug('task %s/%s is already running'); - async.series([ - docker.stopContainer.bind(null, containerId), - docker.deleteContainer.bind(null, containerId) - ], callback); - return; + debug('task %s/%s is already running. killing it'); + return killTask(appId, taskName, callback); } apps.get(appId, function (error, app) {