'use strict'; exports = module.exports = { sync: sync }; let apps = require('./apps.js'), assert = require('assert'), async = require('async'), BoxError = require('./boxerror.js'), constants = require('./constants.js'), CronJob = require('cron').CronJob, debug = require('debug')('box:scheduler'), docker = require('./docker.js'), _ = require('underscore'); // appId -> { schedulerConfig (manifest), cronjobs } var gState = { }; function runTask(appId, taskName, callback) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); assert.strictEqual(typeof callback, 'function'); const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes const containerName = `${appId}-${taskName}`; apps.get(appId, function (error, app) { if (error) return callback(error); if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return callback(); docker.inspectByName(containerName, function (err, data) { if (!err && data && data.State.Running === true) { const jobStartTime = new Date(data.State.StartedAt); // iso 8601 if (new Date() - jobStartTime < JOB_MAX_TIME) return callback(); } docker.restartContainer(containerName, callback); }); }); } function createCronJobs(app, schedulerConfig, callback) { assert.strictEqual(typeof app, 'object'); assert(schedulerConfig && typeof schedulerConfig === 'object'); assert.strictEqual(typeof callback, 'function'); const appId = app.id; let jobs = { }; async.eachSeries(Object.keys(schedulerConfig), function (taskName, iteratorDone) { const task = schedulerConfig[taskName]; const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure const cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests const containerName = `${app.id}-${taskName}`; const cmd = schedulerConfig[taskName].command; docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */, function (error) { if (error && error.reason === BoxError.ALREADY_EXISTS) return iteratorDone(); if (error) return iteratorDone(error); var cronJob = new CronJob({ cronTime: cronTime, // at this point, the pattern has been validated onTick: () => runTask(appId, taskName, (error) => { // put the app id in closure, so we don't use the outdated app object by mistake if (error) debug(`could not run task ${taskName} : ${error.message}`); }), start: true }); jobs[taskName] = cronJob; iteratorDone(); }); }, function (error) { callback(error, jobs); }); } function stopJobs(appId, appState, callback) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof appState, 'object'); assert.strictEqual(typeof callback, 'function'); if (!appState) return callback(); async.eachSeries(Object.keys(appState.schedulerConfig), function (taskName, iteratorDone) { if (appState.cronJobs && appState.cronJobs[taskName]) { // could be null across restarts appState.cronJobs[taskName].stop(); } const containerName = `${appId}-${taskName}`; docker.stopContainerByName(containerName, function (error) { if (error) debug(`stopJobs: failed to stop task container with name ${containerName} : ${error.message}`); docker.deleteContainerByName(containerName, function (error) { if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`); iteratorDone(); }); }); }, callback); } function sync() { apps.getAll(function (error, allApps) { if (error) return debug(`sync: error getting app list. ${error.message}`); var allAppIds = allApps.map(function (app) { return app.id; }); var removedAppIds = _.difference(Object.keys(gState), allAppIds); if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`); async.eachSeries(removedAppIds, function (appId, iteratorDone) { debug(`sync: removing jobs of ${appId}`); stopJobs(appId, gState[appId], iteratorDone); }, function (error) { if (error) debug(`sync: error stopping jobs of removed apps: ${error.message}`); gState = _.omit(gState, removedAppIds); async.eachSeries(allApps, function (app, iteratorDone) { var appState = gState[app.id] || null; var schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null; if (!appState && !schedulerConfig) return iteratorDone(); // nothing to do if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig) && appState.cronJobs) return iteratorDone(); // nothing changed debug(`sync: adding jobs of ${app.id}`); stopJobs(app.id, appState, function (error) { if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`); if (!schedulerConfig) { // updated app version removed scheduler addon delete gState[app.id]; return iteratorDone(); } createCronJobs(app, schedulerConfig, function (error, cronJobs) { if (error) return iteratorDone(error); // if docker is down, the next sync() will recreate everything for this app gState[app.id] = { schedulerConfig, cronJobs }; iteratorDone(); }); }); }, function (error) { if (error) return debug('sync: error creating jobs', error); }); }); }); }