'use strict'; exports = module.exports = { sync, suspendJobs, resumeJobs }; let apps = require('./apps.js'), assert = require('assert'), async = require('async'), BoxError = require('./boxerror.js'), constants = require('./constants.js'), CronJob = require('cron').CronJob, debug = require('debug')('box:scheduler'), docker = require('./docker.js'), _ = require('underscore'); // appId -> { containerId, schedulerConfig (manifest), cronjobs } let gState = { }; let gSuspendedAppIds = new Set(); // suspended because some apptask is running // TODO: this should probably also stop existing jobs to completely prevent race but the code is not re-entrant function suspendJobs(appId) { debug(`suspendJobs: ${appId}`); gSuspendedAppIds.add(appId); } function resumeJobs(appId) { debug(`resumeJobs: ${appId}`); gSuspendedAppIds.delete(appId); } function runTask(appId, taskName, callback) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); assert.strictEqual(typeof callback, 'function'); const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes const containerName = `${appId}-${taskName}`; if (gSuspendedAppIds.has(appId)) return callback(); apps.get(appId, function (error, app) { if (error) return callback(error); if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return callback(); docker.inspectByName(containerName, function (error, data) { if (!error && data && data.State.Running === true) { const jobStartTime = new Date(data.State.StartedAt); // iso 8601 if (new Date() - jobStartTime < JOB_MAX_TIME) return callback(); } docker.restartContainer(containerName, callback); }); }); } function createJobs(app, schedulerConfig, callback) { assert.strictEqual(typeof app, 'object'); assert(schedulerConfig && typeof schedulerConfig === 'object'); assert.strictEqual(typeof callback, 'function'); const appId = app.id; let jobs = { }; async.eachSeries(Object.keys(schedulerConfig), function (taskName, iteratorDone) { const task = schedulerConfig[taskName]; const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure const cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests const containerName = `${app.id}-${taskName}`; const cmd = schedulerConfig[taskName].command; // stopJobs only deletes jobs since previous run. This means that when box code restarts, none of the containers // are removed. The deleteContainer here ensures we re-create the cron containers with the latest config docker.deleteContainer(containerName, function ( /* ignoredError */) { docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */, function (error) { if (error && error.reason !== BoxError.ALREADY_EXISTS) return iteratorDone(error); debug(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`); var cronJob = new CronJob({ cronTime: cronTime, // at this point, the pattern has been validated onTick: () => runTask(appId, taskName, (error) => { // put the app id in closure, so we don't use the outdated app object by mistake if (error) debug(`could not run task ${taskName} : ${error.message}`); }), start: true }); jobs[taskName] = cronJob; iteratorDone(); }); }); }, function (error) { callback(error, jobs); }); } function stopJobs(appId, appState, callback) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof appState, 'object'); assert.strictEqual(typeof callback, 'function'); if (!appState) return callback(); async.eachSeries(Object.keys(appState.schedulerConfig), function (taskName, iteratorDone) { if (appState.cronJobs && appState.cronJobs[taskName]) appState.cronJobs[taskName].stop(); const containerName = `${appId}-${taskName}`; docker.deleteContainer(containerName, function (error) { if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`); iteratorDone(); }); }, callback); } function sync() { apps.getAll(function (error, allApps) { if (error) return debug(`sync: error getting app list. ${error.message}`); var allAppIds = allApps.map(function (app) { return app.id; }); var removedAppIds = _.difference(Object.keys(gState), allAppIds); if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`); async.eachSeries(removedAppIds, function (appId, iteratorDone) { debug(`sync: removing jobs of ${appId}`); stopJobs(appId, gState[appId], iteratorDone); }, function (error) { if (error) debug(`sync: error stopping jobs of removed apps: ${error.message}`); gState = _.omit(gState, removedAppIds); async.eachSeries(allApps, function (app, iteratorDone) { var appState = gState[app.id] || null; var schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null; if (!appState && !schedulerConfig) return iteratorDone(); // nothing to do if (appState && appState.cronJobs) { // we had created jobs for this app previously if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) return iteratorDone(); // nothing changed } debug(`sync: adding jobs of ${app.id} (${app.fqdn})`); stopJobs(app.id, appState, function (error) { if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`); if (!schedulerConfig) { // updated app version removed scheduler addon delete gState[app.id]; return iteratorDone(); } createJobs(app, schedulerConfig, function (error, cronJobs) { if (error) return iteratorDone(error); // if docker is down, the next sync() will recreate everything for this app gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs }; iteratorDone(); }); }); }, function (error) { if (error) return debug('sync: error creating jobs', error.message); }); }); }); }