diff --git a/src/scheduler.js b/src/scheduler.js index b38e778da..41eeca1e7 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -16,62 +16,64 @@ let apps = require('./apps.js'), // appId -> { schedulerConfig (manifest), cronjobs } var gState = { }; -function sync() { - apps.getAll(function (error, allApps) { - if (error) return debug(`sync: error getting app list. ${error.message}`); - var allAppIds = allApps.map(function (app) { return app.id; }); - var removedAppIds = _.difference(Object.keys(gState), allAppIds); - if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`); +function runTask(appId, taskName, callback) { + assert.strictEqual(typeof appId, 'string'); + assert.strictEqual(typeof taskName, 'string'); + assert.strictEqual(typeof callback, 'function'); - async.eachSeries(removedAppIds, function (appId, iteratorDone) { - stopJobs(appId, gState[appId], iteratorDone); - }, function (error) { - if (error) debug(`sync: error stopping jobs of removed apps: ${error.message}`); + const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes + const containerName = `${appId}-${taskName}`; - gState = _.omit(gState, removedAppIds); + apps.get(appId, function (error, app) { + if (error) return callback(error); - async.eachSeries(allApps, function (app, iteratorDone) { - var appState = gState[app.id] || null; - var schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null; + if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return callback(); - if (!appState && !schedulerConfig) return iteratorDone(); // nothing changed + docker.inspectByName(containerName, function (err, data) { + if (!err && data && data.State.Running === true) { + const jobStartTime = new Date(data.State.StartedAt); // iso 8601 + if (new Date() - jobStartTime < JOB_MAX_TIME) return callback(); + } - if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig) && appState.cronJobs) { - return iteratorDone(); // nothing changed - } - - stopJobs(app.id, appState, function (error) { - if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`); - - if (!schedulerConfig) { - delete gState[app.id]; - return iteratorDone(); - } - - gState[app.id] = { - schedulerConfig: schedulerConfig, - cronJobs: createCronJobs(app, schedulerConfig) - }; - - iteratorDone(); - }); - }); + docker.restartContainer(containerName, callback); }); }); } -function killContainer(containerName, callback) { - assert.strictEqual(typeof containerName, 'string'); +function createCronJobs(app, schedulerConfig, callback) { + assert.strictEqual(typeof app, 'object'); + assert(schedulerConfig && typeof schedulerConfig === 'object'); assert.strictEqual(typeof callback, 'function'); - async.series([ - docker.stopContainerByName.bind(null, containerName), - docker.deleteContainerByName.bind(null, containerName) - ], function (error) { - if (error) debug(`killContainer: failed to kill task with name ${containerName} : ${error.message}`); + const appId = app.id; + let jobs = { }; - callback(error); + async.eachSeries(Object.keys(schedulerConfig), function (taskName, iteratorDone) { + const task = schedulerConfig[taskName]; + const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure + const cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests + + const containerName = `${app.id}-${taskName}`; + const cmd = schedulerConfig[taskName].command; + + docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */, function (error) { + if (error) return iteratorDone(error); + + var cronJob = new CronJob({ + cronTime: cronTime, // at this point, the pattern has been validated + onTick: () => runTask(appId, taskName, (error) => { // put the app id in closure, so we don't use the outdated app object by mistake + if (error) debug(`could not run task ${taskName} : ${error.message}`); + }), + start: true + }); + + jobs[taskName] = cronJob; + + iteratorDone(); + }); + }, function (error) { + callback(error, jobs); }); } @@ -87,69 +89,59 @@ function stopJobs(appId, appState, callback) { appState.cronJobs[taskName].stop(); } - killContainer(`${appId}-${taskName}`, iteratorDone); + const containerName = `${appId}-${taskName}`; + + docker.stopContainerByName(containerName, function (error) { + if (error) debug(`stopJobs: failed to stop task container with name ${containerName} : ${error.message}`); + + docker.deleteContainerByName(containerName, function (error) { + if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`); + + iteratorDone(); + }); + }); }, callback); } -function createCronJobs(app, schedulerConfig) { - assert.strictEqual(typeof app, 'object'); - assert(schedulerConfig && typeof schedulerConfig === 'object'); +function sync() { + apps.getAll(function (error, allApps) { + if (error) return debug(`sync: error getting app list. ${error.message}`); - const appId = app.id; - var jobs = { }; + var allAppIds = allApps.map(function (app) { return app.id; }); + var removedAppIds = _.difference(Object.keys(gState), allAppIds); + if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`); - Object.keys(schedulerConfig).forEach(function (taskName) { - var task = schedulerConfig[taskName]; + async.eachSeries(removedAppIds, function (appId, iteratorDone) { + debug(`sync: removing jobs of ${appId}`); + stopJobs(appId, gState[appId], iteratorDone); + }, function (error) { + if (error) debug(`sync: error stopping jobs of removed apps: ${error.message}`); - const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure + gState = _.omit(gState, removedAppIds); - var cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests + async.eachSeries(allApps, function (app, iteratorDone) { + debug(`sync: adding jobs of ${app.id}`); + var appState = gState[app.id] || null; + var schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null; - var cronJob = new CronJob({ - cronTime: cronTime, // at this point, the pattern has been validated - onTick: () => runTask(appId, taskName, (error) => { // put the app id in closure, so we don't use the outdated app object by mistake - if (error) debug(`could not run task ${taskName} : ${error.message}`); - }), - start: true - }); + if (!appState && !schedulerConfig) return iteratorDone(); // nothing to do + if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig) && appState.cronJobs) return iteratorDone(); // nothing changed - jobs[taskName] = cronJob; - }); + stopJobs(app.id, appState, function (error) { + if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`); - return jobs; -} + if (!schedulerConfig) { // updated app version removed scheduler addon + delete gState[app.id]; + return iteratorDone(); + } -function runTask(appId, taskName, callback) { - assert.strictEqual(typeof appId, 'string'); - assert.strictEqual(typeof taskName, 'string'); - assert.strictEqual(typeof callback, 'function'); + createCronJobs(app, schedulerConfig, function (error, cronJobs) { + if (error) return iteratorDone(error); // if docker is down, the next sync() will recreate everything for this app - const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes + gState[app.id] = { schedulerConfig, cronJobs }; - apps.get(appId, function (error, app) { - if (error) return callback(error); - - if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) { - return callback(); - } - - const containerName = `${app.id}-${taskName}`; - - docker.inspectByName(containerName, function (err, data) { - if (!err && data && data.State.Running === true) { - const jobStartTime = new Date(data.State.StartedAt); // iso 8601 - if (new Date() - jobStartTime < JOB_MAX_TIME) return callback(); - } - - killContainer(containerName, function (error) { - if (error) return callback(error); - const cmd = gState[appId].schedulerConfig[taskName].command; - - // NOTE: if you change container name here, fix addons.js to return correct container names - docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */, function (error, container) { - if (error) return callback(error); - - docker.startContainer(container.id, callback); + iteratorDone(); + }); }); }); });