Files
cloudron-box/src/scheduler.js
Girish Ramakrishnan c634bdbd34 scheduler: do not create jobs of suspended apps
otherwise, when an app is uninstalling, it creates the docker containers
by calling getDynamicEnvironment. This ends up adding addonConfigs for the
docker addon and prevents the app from getting uninstalled.
2024-03-12 00:55:06 +01:00

155 lines
5.9 KiB
JavaScript

'use strict';
exports = module.exports = {
sync,
suspendJobs,
resumeJobs
};
const apps = require('./apps.js'),
assert = require('assert'),
BoxError = require('./boxerror.js'),
constants = require('./constants.js'),
CronJob = require('cron').CronJob,
debug = require('debug')('box:scheduler'),
docker = require('./docker.js'),
safe = require('safetydance'),
_ = require('underscore');
const gState = {}; // appId -> { containerId, schedulerConfig (manifest+crontab), cronjobs }
const gSuspendedAppIds = new Set(); // suspended because some apptask is running
// TODO: this should probably also stop existing jobs to completely prevent race but the code is not re-entrant
function suspendJobs(appId) {
debug(`suspendJobs: ${appId}`);
gSuspendedAppIds.add(appId);
}
function resumeJobs(appId) {
debug(`resumeJobs: ${appId}`);
gSuspendedAppIds.delete(appId);
}
async function runTask(appId, taskName) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskName, 'string');
const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes
const containerName = `${appId}-${taskName}`;
if (gSuspendedAppIds.has(appId)) return;
const app = await apps.get(appId);
if (!app) throw new BoxError(BoxError.NOT_FOUND, 'App not found');
if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return;
const [error, data] = await safe(docker.inspect(containerName));
if (!error && data?.State?.Running === true) {
const jobStartTime = new Date(data.State.StartedAt); // iso 8601
if ((new Date() - jobStartTime) < JOB_MAX_TIME) return;
debug(`runTask: ${containerName} is running too long, restarting`);
}
await docker.restartContainer(containerName);
}
async function createJobs(app, schedulerConfig) {
assert.strictEqual(typeof app, 'object');
assert(schedulerConfig && typeof schedulerConfig === 'object');
const appId = app.id;
const jobs = {};
for (const taskName of Object.keys(schedulerConfig)) {
const { schedule, command } = schedulerConfig[taskName];
const containerName = `${app.id}-${taskName}`;
// stopJobs only deletes jobs since previous sync. This means that when box code restarts, none of the containers
// are removed. The deleteContainer here ensures we re-create the cron containers with the latest config
await safe(docker.deleteContainer(containerName)); // ignore error
const [error] = await safe(docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', command ], {} /* options */), { debug });
if (error && error.reason !== BoxError.ALREADY_EXISTS) continue;
debug(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`);
let cronTime;
if (schedule === '@service') {
cronTime = new Date(Date.now() + 2*1000); // 2 seconds from now
} else {
// random is so that all crons start at once to decrease memory pressure
cronTime = (constants.TEST ? '*/5 ' : `${Math.floor(60*Math.random())} `) + schedule; // time ticks faster in tests
}
const cronJob = new CronJob({
cronTime,
onTick: async () => {
const [error] = await safe(runTask(appId, taskName)); // put the app id in closure, so we don't use the outdated app object by mistake
if (error) debug(`could not run task ${taskName} : ${error.message}`);
},
start: true
});
jobs[taskName] = cronJob;
}
return jobs;
}
async function stopJobs(appId, appState) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof appState, 'object');
if (!appState) return;
for (const taskName of Object.keys(appState.schedulerConfig)) {
if (appState.cronJobs && appState.cronJobs[taskName]) appState.cronJobs[taskName].stop();
const containerName = `${appId}-${taskName}`;
const [error] = await safe(docker.deleteContainer(containerName));
if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`);
}
}
async function sync() {
if (constants.TEST) return;
const allApps = await apps.list();
const allAppIds = allApps.map(app => app.id);
const removedAppIds = _.difference(Object.keys(gState), allAppIds);
if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`);
for (const appId of removedAppIds) {
debug(`sync: removing jobs of ${appId}`);
const [error] = await safe(stopJobs(appId, gState[appId]));
if (error) debug(`sync: error stopping jobs of removed app ${appId}: ${error.message}`);
delete gState[appId];
}
for (const app of allApps) {
const appState = gState[app.id] || null;
const schedulerConfig = apps.getSchedulerConfig(app);
if (!appState && !schedulerConfig) continue; // nothing to do
if (appState && appState.cronJobs) { // we had created jobs for this app previously
if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) continue; // nothing changed
}
debug(`sync: clearing jobs of ${app.id} (${app.fqdn})`);
const [error] = await safe(stopJobs(app.id, appState));
if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`);
if (!schedulerConfig) { // updated app version removed scheduler addon
delete gState[app.id];
continue;
}
if (gSuspendedAppIds.has(app.d)) continue; // do not create jobs of suspended apps
const cronJobs = await createJobs(app, schedulerConfig); // if docker is down, the next sync() will recreate everything for this app
gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs };
}
}