import apps from './apps.js'; import assert from 'node:assert'; import BoxError from './boxerror.js'; import cloudron from './cloudron.js'; import constants from './constants.js'; import { CronJob } from 'cron'; import logger from './logger.js'; import docker from './docker.js'; import safe from '@cloudron/safetydance'; import _ from './underscore.js'; const { log } = logger('scheduler'); const gState = {}; // appId -> { containerId, schedulerConfig (manifest+crontab), cronjobs } const gSuspendedAppIds = new Set(); // suspended because some apptask is running // TODO: this should probably also stop existing jobs to completely prevent race but the code is not re-entrant function suspendAppJobs(appId) { log(`suspendAppJobs: ${appId}`); gSuspendedAppIds.add(appId); } function resumeAppJobs(appId) { log(`resumeAppJobs: ${appId}`); gSuspendedAppIds.delete(appId); } async function runTask(appId, taskName) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof taskName, 'string'); const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes const containerName = `${appId}-${taskName}`; if (gSuspendedAppIds.has(appId)) return; const app = await apps.get(appId); if (!app) throw new BoxError(BoxError.NOT_FOUND, 'App not found'); if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return; const [error, data] = await safe(docker.inspect(containerName)); if (!error && data?.State?.Running === true) { const jobStartTime = new Date(data.State.StartedAt); // iso 8601 if ((new Date() - jobStartTime) < JOB_MAX_TIME) return; log(`runTask: ${containerName} is running too long, restarting`); } await docker.restartContainer(containerName); } async function createJobs(app, schedulerConfig) { assert.strictEqual(typeof app, 'object'); assert(schedulerConfig && typeof schedulerConfig === 'object'); const appId = app.id; const jobs = {}; const tz = await cloudron.getTimeZone(); for (const taskName of Object.keys(schedulerConfig)) { const { schedule, command } = schedulerConfig[taskName]; const containerName = `${app.id}-${taskName}`; // stopJobs only deletes jobs since previous sync. This means that when box code restarts, none of the containers // are removed. The deleteContainer here ensures we re-create the cron containers with the latest config await safe(docker.deleteContainer(containerName)); // ignore error const [error] = await safe(docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', command ], {} /* options */), { debug: log }); if (error && error.reason !== BoxError.ALREADY_EXISTS) continue; log(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`); let cronTime; if (schedule === '@service') { cronTime = new Date(Date.now() + 2*1000); // 2 seconds from now } else { // random is so that all crons start at once to decrease memory pressure cronTime = (constants.TEST ? '*/5 ' : `${Math.floor(60*Math.random())} `) + schedule; // time ticks faster in tests } const cronJob = CronJob.from({ cronTime, onTick: async () => { const [taskError] = await safe(runTask(appId, taskName)); // put the app id in closure, so we don't use the outdated app object by mistake if (taskError) log(`could not run task ${taskName} : ${taskError.message}`); }, start: true, timeZone: tz }); jobs[taskName] = cronJob; } return jobs; } async function deleteAppJobs(appId, appState) { assert.strictEqual(typeof appId, 'string'); assert.strictEqual(typeof appState, 'object'); if (!appState) return; for (const taskName of Object.keys(appState.schedulerConfig)) { if (appState.cronJobs && appState.cronJobs[taskName]) appState.cronJobs[taskName].stop(); const containerName = `${appId}-${taskName}`; const [error] = await safe(docker.deleteContainer(containerName)); if (error) log(`deleteAppJobs: failed to delete task container with name ${containerName} : ${error.message}`); } } async function deleteJobs() { for (const appId of Object.keys(gState)) { log(`deleteJobs: removing jobs of ${appId}`); const [error] = await safe(deleteAppJobs(appId, gState[appId])); if (error) log(`deleteJobs: error stopping jobs of removed app ${appId}: ${error.message}`); delete gState[appId]; } } async function sync() { if (constants.TEST) return; const allApps = await apps.list(); const allAppIds = allApps.map(app => app.id); const removedAppIds = _.difference(Object.keys(gState), allAppIds); if (removedAppIds.length !== 0) log(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`); for (const appId of removedAppIds) { log(`sync: removing jobs of ${appId}`); const [error] = await safe(deleteAppJobs(appId, gState[appId])); if (error) log(`sync: error stopping jobs of removed app ${appId}: ${error.message}`); delete gState[appId]; } for (const app of allApps) { const appState = gState[app.id] || null; const schedulerConfig = apps.getSchedulerConfig(app); if (!appState && !schedulerConfig) continue; // nothing to do if (appState && appState.cronJobs) { // we had created jobs for this app previously if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) continue; // nothing changed } log(`sync: clearing jobs of ${app.id} (${app.fqdn})`); const [error] = await safe(deleteAppJobs(app.id, appState)); if (error) log(`sync: error stopping jobs of ${app.id} : ${error.message}`); if (!schedulerConfig) { // updated app version removed scheduler addon delete gState[app.id]; continue; } if (gSuspendedAppIds.has(app.id)) continue; // do not create jobs of suspended apps const cronJobs = await createJobs(app, schedulerConfig); // if docker is down, the next sync() will recreate everything for this app gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs }; } } export default { sync, deleteJobs, suspendAppJobs, resumeAppJobs };