Files
cloudron-box/src/scheduler.js
Girish Ramakrishnan d137cdf881 update cron module
CronJob -> CronJob.from
CronJob(time) -> CronTime
2024-04-19 18:31:47 +02:00

155 lines
5.9 KiB
JavaScript

'use strict';
exports = module.exports = {
sync,
suspendJobs,
resumeJobs
};
const apps = require('./apps.js'),
assert = require('assert'),
BoxError = require('./boxerror.js'),
constants = require('./constants.js'),
{ CronJob } = require('cron'),
debug = require('debug')('box:scheduler'),
docker = require('./docker.js'),
safe = require('safetydance'),
_ = require('underscore');
const gState = {}; // appId -> { containerId, schedulerConfig (manifest+crontab), cronjobs }
const gSuspendedAppIds = new Set(); // suspended because some apptask is running
// TODO: this should probably also stop existing jobs to completely prevent race but the code is not re-entrant
function suspendJobs(appId) {
debug(`suspendJobs: ${appId}`);
gSuspendedAppIds.add(appId);
}
function resumeJobs(appId) {
debug(`resumeJobs: ${appId}`);
gSuspendedAppIds.delete(appId);
}
async function runTask(appId, taskName) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskName, 'string');
const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes
const containerName = `${appId}-${taskName}`;
if (gSuspendedAppIds.has(appId)) return;
const app = await apps.get(appId);
if (!app) throw new BoxError(BoxError.NOT_FOUND, 'App not found');
if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return;
const [error, data] = await safe(docker.inspect(containerName));
if (!error && data?.State?.Running === true) {
const jobStartTime = new Date(data.State.StartedAt); // iso 8601
if ((new Date() - jobStartTime) < JOB_MAX_TIME) return;
debug(`runTask: ${containerName} is running too long, restarting`);
}
await docker.restartContainer(containerName);
}
async function createJobs(app, schedulerConfig) {
assert.strictEqual(typeof app, 'object');
assert(schedulerConfig && typeof schedulerConfig === 'object');
const appId = app.id;
const jobs = {};
for (const taskName of Object.keys(schedulerConfig)) {
const { schedule, command } = schedulerConfig[taskName];
const containerName = `${app.id}-${taskName}`;
// stopJobs only deletes jobs since previous sync. This means that when box code restarts, none of the containers
// are removed. The deleteContainer here ensures we re-create the cron containers with the latest config
await safe(docker.deleteContainer(containerName)); // ignore error
const [error] = await safe(docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', command ], {} /* options */), { debug });
if (error && error.reason !== BoxError.ALREADY_EXISTS) continue;
debug(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`);
let cronTime;
if (schedule === '@service') {
cronTime = new Date(Date.now() + 2*1000); // 2 seconds from now
} else {
// random is so that all crons start at once to decrease memory pressure
cronTime = (constants.TEST ? '*/5 ' : `${Math.floor(60*Math.random())} `) + schedule; // time ticks faster in tests
}
const cronJob = CronJob.from({
cronTime,
onTick: async () => {
const [error] = await safe(runTask(appId, taskName)); // put the app id in closure, so we don't use the outdated app object by mistake
if (error) debug(`could not run task ${taskName} : ${error.message}`);
},
start: true
});
jobs[taskName] = cronJob;
}
return jobs;
}
async function stopJobs(appId, appState) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof appState, 'object');
if (!appState) return;
for (const taskName of Object.keys(appState.schedulerConfig)) {
if (appState.cronJobs && appState.cronJobs[taskName]) appState.cronJobs[taskName].stop();
const containerName = `${appId}-${taskName}`;
const [error] = await safe(docker.deleteContainer(containerName));
if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`);
}
}
async function sync() {
if (constants.TEST) return;
const allApps = await apps.list();
const allAppIds = allApps.map(app => app.id);
const removedAppIds = _.difference(Object.keys(gState), allAppIds);
if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`);
for (const appId of removedAppIds) {
debug(`sync: removing jobs of ${appId}`);
const [error] = await safe(stopJobs(appId, gState[appId]));
if (error) debug(`sync: error stopping jobs of removed app ${appId}: ${error.message}`);
delete gState[appId];
}
for (const app of allApps) {
const appState = gState[app.id] || null;
const schedulerConfig = apps.getSchedulerConfig(app);
if (!appState && !schedulerConfig) continue; // nothing to do
if (appState && appState.cronJobs) { // we had created jobs for this app previously
if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) continue; // nothing changed
}
debug(`sync: clearing jobs of ${app.id} (${app.fqdn})`);
const [error] = await safe(stopJobs(app.id, appState));
if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`);
if (!schedulerConfig) { // updated app version removed scheduler addon
delete gState[app.id];
continue;
}
if (gSuspendedAppIds.has(app.id)) continue; // do not create jobs of suspended apps
const cronJobs = await createJobs(app, schedulerConfig); // if docker is down, the next sync() will recreate everything for this app
gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs };
}
}