scheduler: make stopJobs async

This commit is contained in:
Girish Ramakrishnan
2015-10-19 19:04:53 -07:00
parent 682c2721d2
commit 7a840ad15f
+39 -21
View File
@@ -30,29 +30,40 @@ function sync(callback) {
// stop tasks of apps that went away
var allAppIds = allApps.map(function (app) { return app.id; });
var removedAppIds = _.difference(Object.keys(gTasks), allAppIds);
removedAppIds.forEach(stopJobs);
async.eachSeries(removedAppIds, stopJobs, function (error) {
if (error) debug('Error stopping jobs : %j', error);
// start tasks of new apps
allApps.forEach(function (app) { resetTasks(app.id, app.manifest.addons.scheduler || null); });
// start tasks of new apps
allApps.forEach(function (app) {
resetTasks(app.id, app.manifest.addons.scheduler || null);
});
debug('Done syncing');
});
});
}
function stopJobs(appId) {
function stopJobs(appId, callback) {
assert.strictEqual(typeof appId, 'string');
debug('stopJobs for %s', appId);
for (var job in gTasks[appId].jobs) {
job.cronJob.stop();
}
async.eachSeries(Object.keys(gTasks[appId].jobs), function (taskName, iteratorDone) {
gTasks[appId].jobs[taskName].cronJob.stop();
killTask(appId, taskName, iteratorDone);
}, function (error) {
if (error) return callback(error);
delete gTasks[appId];
delete gTasks[appId];
callback();
});
}
function startJobs(appId, tasksConfig) {
function createCronJobs(appId, tasksConfig) {
gTasks[appId] = { tasksConfig: tasksConfig, jobs: { } };
debug('startJobs for %s', appId);
debug('creating cron jobs for %s', appId);
Object.keys(tasksConfig).forEach(function (taskName) {
var task = tasksConfig[taskName];
@@ -61,7 +72,7 @@ function startJobs(appId, tasksConfig) {
var job = new CronJob({
cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated
onTick: runTask.bind(null, appId, taskName, task),
onTick: doTask.bind(null, appId, taskName, task),
start: true
});
@@ -71,21 +82,32 @@ function startJobs(appId, tasksConfig) {
function resetTasks(appId, tasksConfig) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof tasksConfig, 'object');
assert.strictEqual(typeof tasksConfig, 'object'); // can be null
// cleanup existing state
if (appId in gTasks) {
if (_.isEqual(gTasks[appId].tasksConfig, tasksConfig)) return; // nothing changed
stopJobs(appId);
stopJobs(appId); // something changes, stop all the existing jobs
}
if (!tasksConfig) return;
startJobs(appId, tasksConfig);
createCronJobs(appId, tasksConfig);
}
function runTask(appId, taskName, task, callback) {
function killTask(appId, taskName, callback) {
var containerId = gTasks[appId].jobs[taskName].containerId;
if (!containerId) return callback();
async.series([
docker.stopContainer.bind(null, containerId),
docker.deleteContainer.bind(null, containerId)
], callback);
}
function doTask(appId, taskName, task, callback) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskName, 'string');
assert.strictEqual(typeof task, 'object');
@@ -96,12 +118,8 @@ function runTask(appId, taskName, task, callback) {
var containerId = gTasks[appId].jobs[taskName].containerId;
if (containerId) {
debug('task %s/%s is already running');
async.series([
docker.stopContainer.bind(null, containerId),
docker.deleteContainer.bind(null, containerId)
], callback);
return;
debug('task %s/%s is already running. killing it');
return killTask(appId, taskName, callback);
}
apps.get(appId, function (error, app) {