scheduler: reduce container churn
When we have a lot of app, docker has a tough time keeping up with the container churn. The reason why we don't use docker exec is that there is no way to delete or manage exec containers. Fixes #732
This commit is contained in:
178
src/scheduler.js
178
src/scheduler.js
@@ -16,62 +16,64 @@ let apps = require('./apps.js'),
|
||||
// appId -> { schedulerConfig (manifest), cronjobs }
|
||||
var gState = { };
|
||||
|
||||
function sync() {
|
||||
apps.getAll(function (error, allApps) {
|
||||
if (error) return debug(`sync: error getting app list. ${error.message}`);
|
||||
|
||||
var allAppIds = allApps.map(function (app) { return app.id; });
|
||||
var removedAppIds = _.difference(Object.keys(gState), allAppIds);
|
||||
if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`);
|
||||
function runTask(appId, taskName, callback) {
|
||||
assert.strictEqual(typeof appId, 'string');
|
||||
assert.strictEqual(typeof taskName, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
async.eachSeries(removedAppIds, function (appId, iteratorDone) {
|
||||
stopJobs(appId, gState[appId], iteratorDone);
|
||||
}, function (error) {
|
||||
if (error) debug(`sync: error stopping jobs of removed apps: ${error.message}`);
|
||||
const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes
|
||||
const containerName = `${appId}-${taskName}`;
|
||||
|
||||
gState = _.omit(gState, removedAppIds);
|
||||
apps.get(appId, function (error, app) {
|
||||
if (error) return callback(error);
|
||||
|
||||
async.eachSeries(allApps, function (app, iteratorDone) {
|
||||
var appState = gState[app.id] || null;
|
||||
var schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null;
|
||||
if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) return callback();
|
||||
|
||||
if (!appState && !schedulerConfig) return iteratorDone(); // nothing changed
|
||||
docker.inspectByName(containerName, function (err, data) {
|
||||
if (!err && data && data.State.Running === true) {
|
||||
const jobStartTime = new Date(data.State.StartedAt); // iso 8601
|
||||
if (new Date() - jobStartTime < JOB_MAX_TIME) return callback();
|
||||
}
|
||||
|
||||
if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig) && appState.cronJobs) {
|
||||
return iteratorDone(); // nothing changed
|
||||
}
|
||||
|
||||
stopJobs(app.id, appState, function (error) {
|
||||
if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`);
|
||||
|
||||
if (!schedulerConfig) {
|
||||
delete gState[app.id];
|
||||
return iteratorDone();
|
||||
}
|
||||
|
||||
gState[app.id] = {
|
||||
schedulerConfig: schedulerConfig,
|
||||
cronJobs: createCronJobs(app, schedulerConfig)
|
||||
};
|
||||
|
||||
iteratorDone();
|
||||
});
|
||||
});
|
||||
docker.restartContainer(containerName, callback);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function killContainer(containerName, callback) {
|
||||
assert.strictEqual(typeof containerName, 'string');
|
||||
function createCronJobs(app, schedulerConfig, callback) {
|
||||
assert.strictEqual(typeof app, 'object');
|
||||
assert(schedulerConfig && typeof schedulerConfig === 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
async.series([
|
||||
docker.stopContainerByName.bind(null, containerName),
|
||||
docker.deleteContainerByName.bind(null, containerName)
|
||||
], function (error) {
|
||||
if (error) debug(`killContainer: failed to kill task with name ${containerName} : ${error.message}`);
|
||||
const appId = app.id;
|
||||
let jobs = { };
|
||||
|
||||
callback(error);
|
||||
async.eachSeries(Object.keys(schedulerConfig), function (taskName, iteratorDone) {
|
||||
const task = schedulerConfig[taskName];
|
||||
const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure
|
||||
const cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests
|
||||
|
||||
const containerName = `${app.id}-${taskName}`;
|
||||
const cmd = schedulerConfig[taskName].command;
|
||||
|
||||
docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */, function (error) {
|
||||
if (error) return iteratorDone(error);
|
||||
|
||||
var cronJob = new CronJob({
|
||||
cronTime: cronTime, // at this point, the pattern has been validated
|
||||
onTick: () => runTask(appId, taskName, (error) => { // put the app id in closure, so we don't use the outdated app object by mistake
|
||||
if (error) debug(`could not run task ${taskName} : ${error.message}`);
|
||||
}),
|
||||
start: true
|
||||
});
|
||||
|
||||
jobs[taskName] = cronJob;
|
||||
|
||||
iteratorDone();
|
||||
});
|
||||
}, function (error) {
|
||||
callback(error, jobs);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -87,69 +89,59 @@ function stopJobs(appId, appState, callback) {
|
||||
appState.cronJobs[taskName].stop();
|
||||
}
|
||||
|
||||
killContainer(`${appId}-${taskName}`, iteratorDone);
|
||||
const containerName = `${appId}-${taskName}`;
|
||||
|
||||
docker.stopContainerByName(containerName, function (error) {
|
||||
if (error) debug(`stopJobs: failed to stop task container with name ${containerName} : ${error.message}`);
|
||||
|
||||
docker.deleteContainerByName(containerName, function (error) {
|
||||
if (error) debug(`stopJobs: failed to delete task container with name ${containerName} : ${error.message}`);
|
||||
|
||||
iteratorDone();
|
||||
});
|
||||
});
|
||||
}, callback);
|
||||
}
|
||||
|
||||
function createCronJobs(app, schedulerConfig) {
|
||||
assert.strictEqual(typeof app, 'object');
|
||||
assert(schedulerConfig && typeof schedulerConfig === 'object');
|
||||
function sync() {
|
||||
apps.getAll(function (error, allApps) {
|
||||
if (error) return debug(`sync: error getting app list. ${error.message}`);
|
||||
|
||||
const appId = app.id;
|
||||
var jobs = { };
|
||||
var allAppIds = allApps.map(function (app) { return app.id; });
|
||||
var removedAppIds = _.difference(Object.keys(gState), allAppIds);
|
||||
if (removedAppIds.length !== 0) debug(`sync: stopping jobs of removed apps ${JSON.stringify(removedAppIds)}`);
|
||||
|
||||
Object.keys(schedulerConfig).forEach(function (taskName) {
|
||||
var task = schedulerConfig[taskName];
|
||||
async.eachSeries(removedAppIds, function (appId, iteratorDone) {
|
||||
debug(`sync: removing jobs of ${appId}`);
|
||||
stopJobs(appId, gState[appId], iteratorDone);
|
||||
}, function (error) {
|
||||
if (error) debug(`sync: error stopping jobs of removed apps: ${error.message}`);
|
||||
|
||||
const randomSecond = Math.floor(60*Math.random()); // don't start all crons to decrease memory pressure
|
||||
gState = _.omit(gState, removedAppIds);
|
||||
|
||||
var cronTime = (constants.TEST ? '*/5 ' : `${randomSecond} `) + task.schedule; // time ticks faster in tests
|
||||
async.eachSeries(allApps, function (app, iteratorDone) {
|
||||
debug(`sync: adding jobs of ${app.id}`);
|
||||
var appState = gState[app.id] || null;
|
||||
var schedulerConfig = app.manifest.addons ? app.manifest.addons.scheduler : null;
|
||||
|
||||
var cronJob = new CronJob({
|
||||
cronTime: cronTime, // at this point, the pattern has been validated
|
||||
onTick: () => runTask(appId, taskName, (error) => { // put the app id in closure, so we don't use the outdated app object by mistake
|
||||
if (error) debug(`could not run task ${taskName} : ${error.message}`);
|
||||
}),
|
||||
start: true
|
||||
});
|
||||
if (!appState && !schedulerConfig) return iteratorDone(); // nothing to do
|
||||
if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig) && appState.cronJobs) return iteratorDone(); // nothing changed
|
||||
|
||||
jobs[taskName] = cronJob;
|
||||
});
|
||||
stopJobs(app.id, appState, function (error) {
|
||||
if (error) debug(`sync: error stopping jobs of ${app.id} : ${error.message}`);
|
||||
|
||||
return jobs;
|
||||
}
|
||||
if (!schedulerConfig) { // updated app version removed scheduler addon
|
||||
delete gState[app.id];
|
||||
return iteratorDone();
|
||||
}
|
||||
|
||||
function runTask(appId, taskName, callback) {
|
||||
assert.strictEqual(typeof appId, 'string');
|
||||
assert.strictEqual(typeof taskName, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
createCronJobs(app, schedulerConfig, function (error, cronJobs) {
|
||||
if (error) return iteratorDone(error); // if docker is down, the next sync() will recreate everything for this app
|
||||
|
||||
const JOB_MAX_TIME = 30 * 60 * 1000; // 30 minutes
|
||||
gState[app.id] = { schedulerConfig, cronJobs };
|
||||
|
||||
apps.get(appId, function (error, app) {
|
||||
if (error) return callback(error);
|
||||
|
||||
if (app.installationState !== apps.ISTATE_INSTALLED || app.runState !== apps.RSTATE_RUNNING || app.health !== apps.HEALTH_HEALTHY) {
|
||||
return callback();
|
||||
}
|
||||
|
||||
const containerName = `${app.id}-${taskName}`;
|
||||
|
||||
docker.inspectByName(containerName, function (err, data) {
|
||||
if (!err && data && data.State.Running === true) {
|
||||
const jobStartTime = new Date(data.State.StartedAt); // iso 8601
|
||||
if (new Date() - jobStartTime < JOB_MAX_TIME) return callback();
|
||||
}
|
||||
|
||||
killContainer(containerName, function (error) {
|
||||
if (error) return callback(error);
|
||||
const cmd = gState[appId].schedulerConfig[taskName].command;
|
||||
|
||||
// NOTE: if you change container name here, fix addons.js to return correct container names
|
||||
docker.createSubcontainer(app, containerName, [ '/bin/sh', '-c', cmd ], { } /* options */, function (error, container) {
|
||||
if (error) return callback(error);
|
||||
|
||||
docker.startContainer(container.id, callback);
|
||||
iteratorDone();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user