implement scheduler

- scan for apps every 10 minutes and schedules tasks
- uses docker.exec
    - there is no way to control exec container. docker developers
      feel exec is for debugging purposes primarily
- future version will be based on docker run instead

part of #519
This commit is contained in:
Girish Ramakrishnan
2015-10-18 08:40:24 -07:00
parent 8d6dd62ef4
commit 990b7a2d20
5 changed files with 2193 additions and 3284 deletions
+6
View File
@@ -90,6 +90,12 @@ var KNOWN_ADDONS = {
backup: NOOP,
restore: setupSendMail
},
scheduler: {
setup: NOOP,
teardown: NOOP,
backup: NOOP,
restore: NOOP
},
simpleauth: {
setup: setupSimpleAuth,
teardown: teardownSimpleAuth,
+14 -2
View File
@@ -11,6 +11,7 @@ var apps = require('./apps.js'),
CronJob = require('cron').CronJob,
debug = require('debug')('box:cron'),
janitor = require('./janitor.js'),
scheduler = require('./scheduler.js'),
settings = require('./settings.js'),
updateChecker = require('./updatechecker.js');
@@ -20,7 +21,8 @@ var gAutoupdaterJob = null,
gHeartbeatJob = null,
gBackupJob = null,
gCleanupTokensJob = null,
gDockerVolumeCleanerJob = null;
gDockerVolumeCleanerJob = null,
gSchedulerSyncJob = null;
var gInitialized = false;
@@ -101,6 +103,14 @@ function recreateJobs(unusedTimeZone, callback) {
timeZone: allSettings[settings.TIME_ZONE_KEY]
});
if (gSchedulerSyncJob) gSchedulerSyncJob.stop();
gSchedulerSyncJob = new CronJob({
cronTime: '00 */10 * * * *', // every 10 minutes
onTick: scheduler.sync,
start: true,
timeZone: allSettings[settings.TIME_ZONE_KEY]
});
autoupdatePatternChanged(allSettings[settings.AUTOUPDATE_PATTERN_KEY]);
if (callback) callback();
@@ -161,8 +171,10 @@ function uninitialize(callback) {
gDockerVolumeCleanerJob.stop();
gDockerVolumeCleanerJob = null;
gSchedulerSyncJob.stop();
gSchedulerSyncJob = null;
gInitialized = false;
callback();
}
+134
View File
@@ -0,0 +1,134 @@
'use strict';
exports = module.exports = {
sync: sync
};
var appdb = require('./appdb.js'),
apps = require('./apps.js'),
assert = require('assert'),
CronJob = require('cron').CronJob,
debug = require('debug')('box:src/scheduler'),
docker = require('./docker.js').connection,
_ = require('underscore');
var NOOP_CALLBACK = function (error) { if (error) console.error(error); };
var gTasks = { }; // appId -> { task, jobs -> { execContainer, job } }
function sync(callback) {
assert(!callback || typeof callback === 'function');
callback = callback || NOOP_CALLBACK;
debug('Syncing');
apps.getAll(function (error, allApps) {
if (error) return callback(error);
// stop tasks of apps that went away
var allAppIds = allApps.map(function (app) { return app.id; });
var removedAppIds = _.difference(Object.keys(gTasks), allAppIds);
removedAppIds.forEach(stopJobs);
// start tasks of new apps
allApps.forEach(function (app) { resetTasks(app.id, app.manifest.addons.scheduler || null); });
});
}
function stopJobs(appId) {
assert.strictEqual(typeof appId, 'string');
debug('stopJobs for %s', appId);
for (var job in gTasks[appId].jobs) {
job.job.stop();
}
delete gTasks[appId];
}
function startJobs(appId, tasks) {
gTasks[appId] = { tasks: tasks, jobs: { } };
debug('startJobs for %s', appId);
Object.keys(tasks).forEach(function (taskName) {
var task = tasks[taskName];
debug('scheduling task %s/%s @ 00 %s : %s', appId, taskName, task.schedule, task.command);
var job = new CronJob({
cronTime: '00 ' + task.schedule, // at this point, the pattern has been validated
onTick: runTask.bind(null, appId, taskName, task),
start: true
});
gTasks[appId].jobs[taskName] = { job: job };
});
}
function resetTasks(appId, tasks) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof tasks, 'object');
// cleanup existing state
if (appId in gTasks) {
if (_.isEqual(gTasks[appId].tasks, tasks)) return; // nothing changed
stopJobs(appId);
}
if (!tasks) return;
startJobs(appId, tasks);
}
function runTask(appId, taskName, task, callback) {
assert.strictEqual(typeof appId, 'string');
assert.strictEqual(typeof taskName, 'string');
assert.strictEqual(typeof task, 'object');
assert(!callback || typeof callback === 'function');
callback = callback || NOOP_CALLBACK;
function onTaskCompleted(error) {
debug('task %s/%s completed. error: %s', appId, taskName, error ? error.message : 'null');
gTasks[appId].jobs[taskName].execContainer = null;
}
// https://github.com/docker/docker/pull/9167 (no way to stop a running exec)
if (gTasks[appId].jobs[taskName].execContainer) {
debug('task %s/%s is already running');
return;
}
apps.get(appId, function (error, app) {
if (error) return callback(error);
if (app.installationState !== appdb.ISTATE_INSTALLED || app.runState !== appdb.RSTATE_RUNNING) {
debug('task %s skipped. app %s is not installed/running', taskName, app.id);
return callback();
}
var container = docker.getContainer(app.containerId);
debug('task %s/%s started', app.id, taskName);
container.exec({ Cmd: [ '/bin/sh', '-c', task.command ], AttachStdout: true, AttachStderr: true }, function (error, execContainer) {
if (error) return callback(error);
gTasks[appId].jobs[taskName].execContainer = execContainer;
execContainer.start(function (error, stream) {
if (error) return callback(error);
stream.setEncoding('utf8');
stream.on('error', onTaskCompleted);
stream.on('data', function (d) { debug('%s/%s : %s', app.id, taskName, d); });
stream.on('end', onTaskCompleted);
});
});
});
}