docker.js and services.js: async'ify

This commit is contained in:
Girish Ramakrishnan
2021-08-25 19:41:46 -07:00
parent 1cc11fece8
commit 42774eac8c
24 changed files with 1618 additions and 2130 deletions

View File

@@ -8,13 +8,12 @@ exports = module.exports = {
const apps = require('./apps.js'),
assert = require('assert'),
async = require('async'),
BoxError = require('./boxerror.js'),
constants = require('./constants.js'),
CronJob = require('cron').CronJob,
debug = require('debug')('box:scheduler'),
docker = require('./docker.js'),
util = require('util'),
safe = require('safetydance'),
_ = require('underscore');
// appId -> { containerId, schedulerConfig (manifest), cronjobs }
@@ -32,42 +31,37 @@ function resumeJobs(appId) {
gSuspendedAppIds.delete(appId);
}
function runTask(appId, taskName, callback) {
async function runTask(appId, taskName) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskName, 'string');
assert.strictEqual(typeof callback, 'function');
const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes
const containerName = `${appId}-${taskName}`;
if (gSuspendedAppIds.has(appId)) return callback();
if (gSuspendedAppIds.has(appId)) return;
util.callbackify(apps.get)(appId, function (error, app) {
if (error) return callback(error);
if (!app) return callback(new BoxError(BoxError.NOT_FOUND, 'App not found'));
const app = await apps.get(appId);
if (!app) throw new BoxError(BoxError.NOT_FOUND, 'App not found');
if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return callback();
if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return;
docker.inspectByName(containerName, function (error, data) {
if (!error && data && data.State.Running === true) {
const jobStartTime = new Date(data.State.StartedAt); // iso 8601
if (new Date() - jobStartTime < JOB_MAX_TIME) return callback();
}
const [error, data] = await safe(docker.inspectByName(containerName));
if (!error && data && data.State.Running === true) {
const jobStartTime = new Date(data.State.StartedAt); // iso 8601
if (new Date() - jobStartTime < JOB_MAX_TIME) return;
}
docker.restartContainer(containerName, callback);
});
});
await docker.restartContainer(containerName);
}
function createJobs(app, schedulerConfig, callback) {
async function createJobs(app, schedulerConfig) {
assert.strictEqual(typeof app, 'object');
assert(schedulerConfig && typeof schedulerConfig === 'object');
assert.strictEqual(typeof callback, 'function');
const appId = app.id;
let jobs = { };
const jobs = { };
async.eachSeries(Object.keys(schedulerConfig), function (taskName, iteratorDone) {
for (const taskName of Object.keys(schedulerConfig)) {
const task = schedulerConfig[taskName];
const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure
const cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests
@@ -77,97 +71,79 @@ function createJobs(app, schedulerConfig, callback) {
// stopJobs only deletes jobs since previous run. This means that when box code restarts, none of the containers
// are removed. The deleteContainer here ensures we re-create the cron containers with the latest config
docker.deleteContainer(containerName, function ( /* ignoredError */) {
docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */, function (error) {
if (error && error.reason !== BoxError.ALREADY_EXISTS) return iteratorDone(error);
await safe(docker.deleteContainer(containerName)); // ignore error
const [error] = await safe(docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */));
if (error && error.reason !== BoxError.ALREADY_EXISTS) continue;
debug(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`);
debug(`createJobs: ${taskName} (${app.fqdn}) will run in container ${containerName}`);
var cronJob = new CronJob({
cronTime: cronTime, // at this point, the pattern has been validated
onTick: () => runTask(appId, taskName, (error) => { // put the app id in closure, so we don't use the outdated app object by mistake
if (error) debug(`could not run task ${taskName} : ${error.message}`);
}),
start: true
});
jobs[taskName] = cronJob;
iteratorDone();
});
const cronJob = new CronJob({
cronTime: cronTime, // at this point, the pattern has been validated
onTick: async () => {
const [error] = await safe(runTask(appId, taskName)); // put the app id in closure, so we don't use the outdated app object by mistake
if (error) debug(`could not run task ${taskName} : ${error.message}`);
},
start: true
});
}, function (error) {
callback(error, jobs);
});
jobs[taskName] = cronJob;
}
return jobs;
}
function stopJobs(appId, appState, callback) {
async function stopJobs(appId, appState) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof appState, 'object');
assert.strictEqual(typeof callback, 'function');
if (!appState) return callback();
if (!appState) return;
async.eachSeries(Object.keys(appState.schedulerConfig), function (taskName, iteratorDone) {
for (const taskName of Object.keys(appState.schedulerConfig)) {
if (appState.cronJobs && appState.cronJobs[taskName]) appState.cronJobs[taskName].stop();
const containerName = `${appId}-${taskName}`;
docker.deleteContainer(containerName, function (error) {
if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`);
iteratorDone();
});
}, callback);
const [error] = await safe(docker.deleteContainer(containerName));
if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`);
}
}
function sync() {
async function sync() {
if (constants.TEST) return;
util.callbackify(apps.list)(function (error, allApps) {
if (error) return debug(`sync: error getting app list. ${error.message}`);
const allApps = await apps.list();
var allAppIds = allApps.map(function (app) { return app.id; });
var removedAppIds = _.difference(Object.keys(gState), allAppIds);
if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`);
const allAppIds = allApps.map(function (app) { return app.id; });
const removedAppIds = _.difference(Object.keys(gState), allAppIds);
if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`);
async.eachSeries(removedAppIds, function (appId, iteratorDone) {
debug(`sync: removing jobs of ${appId}`);
stopJobs(appId, gState[appId], iteratorDone);
}, function (error) {
if (error) debug(`sync: error stopping jobs of removed apps: ${error.message}`);
for (const appId of removedAppIds) {
debug(`sync: removing jobs of ${appId}`);
const [error] = await safe(stopJobs(appId, gState[appId]));
if (error) debug(`sync: error stopping jobs of removed app ${appId}: ${error.message}`);
}
gState = _.omit(gState, removedAppIds);
gState = _.omit(gState, removedAppIds);
async.eachSeries(allApps, function (app, iteratorDone) {
var appState = gState[app.id] || null;
var schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null;
for (const app of allApps) {
const appState = gState[app.id] || null;
const schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null;
if (!appState && !schedulerConfig) return iteratorDone(); // nothing to do
if (appState && appState.cronJobs) { // we had created jobs for this app previously
if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) return iteratorDone(); // nothing changed
}
if (!appState && !schedulerConfig) continue; // nothing to do
if (appState && appState.cronJobs) { // we had created jobs for this app previously
if (_.isEqual(appState.schedulerConfig, schedulerConfig) && appState.containerId === app.containerId) continue; // nothing changed
}
debug(`sync: adding jobs of ${app.id} (${app.fqdn})`);
debug(`sync: adding jobs of ${app.id} (${app.fqdn})`);
stopJobs(app.id, appState, function (error) {
if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`);
const [error] = await safe(stopJobs(app.id, appState));
if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`);
if (!schedulerConfig) { // updated app version removed scheduler addon
delete gState[app.id];
return iteratorDone();
}
if (!schedulerConfig) { // updated app version removed scheduler addon
delete gState[app.id];
continue;
}
createJobs(app, schedulerConfig, function (error, cronJobs) {
if (error) return iteratorDone(error); // if docker is down, the next sync() will recreate everything for this app
gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs };
iteratorDone();
});
});
}, function (error) {
if (error) return debug('sync: error creating jobs', error.message);
});
});
});
const cronJobs = await createJobs(app, schedulerConfig); // if docker is down, the next sync() will recreate everything for this app
gState[app.id] = { containerId: app.containerId, schedulerConfig, cronJobs };
}
}