'use strict'; exports = module.exports = { sync, deleteJobs, suspendAppJobs, resumeAppJobs }; const apps = require('./apps.js'), assert = require('assert'), BoxError = require('./boxerror.js'), cloudron = require('./cloudron.js'), constants = require('./constants.js'), { CronJob } = require('cron'), debug = require('debug')('box:scheduler'), docker = require('./docker.js'), safe = require('safetydance'), _ = require('underscore'); const gState = {}; // appId -> { containerId, schedulerConfig (manifest+crontab), cronjobs } const gSuspendedAppIds = new Set(); // suspended because some apptask is running // TODO: this should probably also stop existing jobs to completely prevent race but the code is not re-entrant function suspendAppJobs(appId) { debug(`suspendAppJobs: ${appId}`); gSuspendedAppIds.add(appId); } function resumeAppJobs(appId) { debug(`resumeAppJobs: ${appId}`); gSuspendedAppIds.delete(appId); } async function runTask(appId, taskName) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes const containerName = `${appId}-${taskName}`; if (gSuspendedAppIds.has(appId)) return; const app = await apps.get(appId); if (!app) throw new BoxError(BoxError.NOT_FOUND, 'App not found'); if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return; const [error, data] = await safe(docker.inspect(containerName)); if (!error && data?.State?.Running === true) { const jobStartTime = new Date(data.State.StartedAt); // iso 8601 if ((new Date() - jobStartTime) < JOB_MAX_TIME) return; debug(`runTask: ${containerName} is running too long, restarting`); } await docker.restartContainer(containerName); } async function createJobs(app, schedulerConfig) { assert.strictEqual(typeof app, 'object'); assert(schedulerConfig && typeof schedulerConfig === 'object'); const appId = app.id; const jobs = {}; const tz = await cloudron.getTimeZone(); for (const taskName of Object.keys(schedulerConfig)) { const { schedule, command } = schedulerConfig[taskName]; const containerName = `${app.id}-${taskName}`; // stopJobs only deletes jobs since previous sync. This means that when box code restarts, none of the containers // are removed. The deleteContainer here ensures we re-create the cron containers with the latest config await safe(docker.deleteContainer(containerName)); // ignore error const [error] = await safe(docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', command ], {} /* options */), { debug }); if (error && error.reason !== BoxError.ALREADY_EXISTS) continue; debug(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`); let cronTime; if (schedule === '@service') { cronTime = new Date(Date.now() + 2*1000); // 2 seconds from now } else { // random is so that all crons start at once to decrease memory pressure cronTime = (constants.TEST ? '*/5 ' : `${Math.floor(60*Math.random())} `) + schedule; // time ticks faster in tests } const cronJob = CronJob.from({ cronTime, onTick: async () => { const [error] = await safe(runTask(appId, taskName)); // put the app id in closure, so we don't use the outdated app object by mistake if (error) debug(`could not run task ${taskName} : ${error.message}`); }, start: true, timeZone: tz }); jobs[taskName] = cronJob; } return jobs; } async function deleteAppJobs(appId, appState) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof appState, 'object'); if (!appState) return; for (const taskName of Object.keys(appState.schedulerConfig)) { if (appState.cronJobs && appState.cronJobs[taskName]) appState.cronJobs[taskName].stop(); const containerName = `${appId}-${taskName}`; const [error] = await safe(docker.deleteContainer(containerName)); if (error) debug(`deleteAppJobs: failed to delete task container with name ${containerName} : ${error.message}`); } } async function deleteJobs() { for (const appId of Object.keys(gState)) { debug(`deleteJobs: removing jobs of ${appId}`); const [error] = await safe(deleteAppJobs(appId, gState[appId])); if (error) debug(`deleteJobs: error stopping jobs of removed app ${appId}: ${error.message}`); delete gState[appId]; } } async function sync() { if (constants.TEST) return; const allApps = await apps.list(); const allAppIds = allApps.map(app => app.id); const removedAppIds = _.difference(Object.keys(gState), allAppIds); if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`); for (const appId of removedAppIds) { debug(`sync: removing jobs of ${appId}`); const [error] = await safe(deleteAppJobs(appId, gState[appId])); if (error) debug(`sync: error stopping jobs of removed app ${appId}: ${error.message}`); delete gState[appId]; } for (const app of allApps) { const appState = gState[app.id] || null; const schedulerConfig = apps.getSchedulerConfig(app); if (!appState && !schedulerConfig) continue; // nothing to do if (appState && appState.cronJobs) { // we had created jobs for this app previously if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) continue; // nothing changed } debug(`sync: clearing jobs of ${app.id} (${app.fqdn})`); const [error] = await safe(deleteAppJobs(app.id, appState)); if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`); if (!schedulerConfig) { // updated app version removed scheduler addon delete gState[app.id]; continue; } if (gSuspendedAppIds.has(app.id)) continue; // do not create jobs of suspended apps const cronJobs = await createJobs(app, schedulerConfig); // if docker is down, the next sync() will recreate everything for this app gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs }; } }