'use strict'; exports = module.exports = { sync, suspendJobs, resumeJobs }; const apps = require('./apps.js'), assert = require('assert'), BoxError = require('./boxerror.js'), constants = require('./constants.js'), CronJob = require('cron').CronJob, debug = require('debug')('box:scheduler'), docker = require('./docker.js'), safe = require('safetydance'), _ = require('underscore'); // appId -> { containerId, schedulerConfig (manifest), cronjobs } let gState = { }; let gSuspendedAppIds = new Set(); // suspended because some apptask is running // TODO: this should probably also stop existing jobs to completely prevent race but the code is not re-entrant function suspendJobs(appId) { debug(`suspendJobs: ${appId}`); gSuspendedAppIds.add(appId); } function resumeJobs(appId) { debug(`resumeJobs: ${appId}`); gSuspendedAppIds.delete(appId); } async function runTask(appId, taskName) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes const containerName = `${appId}-${taskName}`; if (gSuspendedAppIds.has(appId)) return; const app = await apps.get(appId); if (!app) throw new BoxError(BoxError.NOT_FOUND, 'App not found'); if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return; const [error, data] = await safe(docker.inspect(containerName)); if (!error && data && data.State.Running === true) { const jobStartTime = new Date(data.State.StartedAt); // iso 8601 if (new Date() - jobStartTime < JOB_MAX_TIME) return; } await docker.restartContainer(containerName); } async function createJobs(app, schedulerConfig) { assert.strictEqual(typeof app, 'object'); assert(schedulerConfig && typeof schedulerConfig === 'object'); const appId = app.id; const jobs = { }; for (const taskName of Object.keys(schedulerConfig)) { const task = schedulerConfig[taskName]; const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure const cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests const containerName = `${app.id}-${taskName}`; const cmd = schedulerConfig[taskName].command; // stopJobs only deletes jobs since previous run. This means that when box code restarts, none of the containers // are removed. The deleteContainer here ensures we re-create the cron containers with the latest config await safe(docker.deleteContainer(containerName)); // ignore error const [error] = await safe(docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */)); if (error && error.reason !== BoxError.ALREADY_EXISTS) continue; debug(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`); const cronJob = new CronJob({ cronTime: cronTime, // at this point, the pattern has been validated onTick: async () => { const [error] = await safe(runTask(appId, taskName)); // put the app id in closure, so we don't use the outdated app object by mistake if (error) debug(`could not run task ${taskName} : ${error.message}`); }, start: true }); jobs[taskName] = cronJob; } return jobs; } async function stopJobs(appId, appState) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof appState, 'object'); if (!appState) return; for (const taskName of Object.keys(appState.schedulerConfig)) { if (appState.cronJobs && appState.cronJobs[taskName]) appState.cronJobs[taskName].stop(); const containerName = `${appId}-${taskName}`; const [error] = await safe(docker.deleteContainer(containerName)); if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`); } } async function sync() { if (constants.TEST) return; const allApps = await apps.list(); const allAppIds = allApps.map(function (app) { return app.id; }); const removedAppIds = _.difference(Object.keys(gState), allAppIds); if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`); for (const appId of removedAppIds) { debug(`sync: removing jobs of ${appId}`); const [error] = await safe(stopJobs(appId, gState[appId])); if (error) debug(`sync: error stopping jobs of removed app ${appId}: ${error.message}`); } gState = _.omit(gState, removedAppIds); for (const app of allApps) { const appState = gState[app.id] || null; const schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null; if (!appState && !schedulerConfig) continue; // nothing to do if (appState && appState.cronJobs) { // we had created jobs for this app previously if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) continue; // nothing changed } debug(`sync: adding jobs of ${app.id} (${app.fqdn})`); const [error] = await safe(stopJobs(app.id, appState)); if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`); if (!schedulerConfig) { // updated app version removed scheduler addon delete gState[app.id]; continue; } const cronJobs = await createJobs(app, schedulerConfig); // if docker is down, the next sync() will recreate everything for this app gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs }; } }