196 lines
6.7 KiB
JavaScript
196 lines
6.7 KiB
JavaScript
'use strict';
|
|
|
|
exports = module.exports = {
|
|
sync: sync
|
|
};
|
|
|
|
var appdb = require('./appdb.js'),
|
|
apps = require('./apps.js'),
|
|
assert = require('assert'),
|
|
async = require('async'),
|
|
config = require('./config.js'),
|
|
CronJob = require('cron').CronJob,
|
|
debug = require('debug')('box:src/scheduler'),
|
|
docker = require('./docker.js'),
|
|
paths = require('./paths.js'),
|
|
safe = require('safetydance'),
|
|
_ = require('underscore');
|
|
|
|
var NOOP_CALLBACK = function (error) { if (error) debug('Unhandled error: ', error); };
|
|
|
|
// appId -> { schedulerConfig (manifest), cronjobs, containerIds }
|
|
var gState = (function loadState() {
|
|
var state = safe.JSON.parse(safe.fs.readFileSync(paths.SCHEDULER_FILE, 'utf8'));
|
|
return state || { };
|
|
})();
|
|
|
|
function saveState(state) {
|
|
// do not save cronJobs
|
|
var safeState = { };
|
|
for (var appId in state) {
|
|
safeState[appId] = {
|
|
schedulerConfig: state[appId].schedulerConfig,
|
|
containerIds: state[appId].containerIds
|
|
};
|
|
}
|
|
safe.fs.writeFileSync(paths.SCHEDULER_FILE, JSON.stringify(safeState, null, 4), 'utf8');
|
|
}
|
|
|
|
function sync(callback) {
|
|
assert(!callback || typeof callback === 'function');
|
|
|
|
callback = callback || NOOP_CALLBACK;
|
|
|
|
debug('Syncing');
|
|
|
|
apps.getAll(function (error, allApps) {
|
|
if (error) return callback(error);
|
|
|
|
// stop tasks of apps that went away
|
|
var allAppIds = allApps.map(function (app) { return app.id; });
|
|
var removedAppIds = _.difference(Object.keys(gState), allAppIds);
|
|
async.eachSeries(removedAppIds, function (appId, iteratorDone) {
|
|
stopJobs(appId, gState[appId], true /* killContainers */, iteratorDone);
|
|
}, function (error) {
|
|
if (error) debug('Error stopping jobs : %j', error);
|
|
|
|
gState = _.omit(gState, removedAppIds);
|
|
|
|
// start tasks of new apps
|
|
async.eachSeries(allApps, function (app, iteratorDone) {
|
|
var appState = gState[app.id] || null;
|
|
var schedulerConfig = app.manifest.addons.scheduler || null;
|
|
|
|
if (!appState && !schedulerConfig) return iteratorDone(); // nothing changed
|
|
|
|
if (appState && _.isEqual(appState.schedulerConfig, schedulerConfig) && appState.cronJobs) {
|
|
return iteratorDone(); // nothing changed
|
|
}
|
|
|
|
var killContainers = appState && !appState.cronJobs ? true : false; // keep the old containers on 'startup'
|
|
stopJobs(app.id, appState, killContainers, function (error) {
|
|
if (error) debug('Error stopping jobs for %s : %s', app.id, error.message);
|
|
|
|
if (!schedulerConfig) {
|
|
delete gState[app.id];
|
|
return iteratorDone();
|
|
}
|
|
|
|
gState[app.id] = {
|
|
schedulerConfig: schedulerConfig,
|
|
cronJobs: createCronJobs(app.id, schedulerConfig),
|
|
containerIds: { }
|
|
};
|
|
|
|
saveState(gState);
|
|
|
|
iteratorDone();
|
|
});
|
|
});
|
|
|
|
debug('Done syncing');
|
|
});
|
|
});
|
|
}
|
|
|
|
function killContainer(containerId, callback) {
|
|
if (!containerId) return callback();
|
|
|
|
async.series([
|
|
docker.stopContainer.bind(null, containerId),
|
|
docker.deleteContainer.bind(null, containerId)
|
|
], function (error) {
|
|
if (error) debug('Failed to kill task with containerId %s : %s', containerId, error.message);
|
|
|
|
callback(error);
|
|
});
|
|
}
|
|
|
|
function stopJobs(appId, appState, killContainers, callback) {
|
|
assert.strictEqual(typeof appId, 'string');
|
|
assert.strictEqual(typeof appState, 'object');
|
|
assert.strictEqual(typeof killContainers, 'boolean');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
debug('stopJobs for %s', appId);
|
|
|
|
if (!appState) return callback();
|
|
|
|
async.eachSeries(Object.keys(appState.schedulerConfig), function (taskName, iteratorDone) {
|
|
if (appState.cronJobs && appState.cronJobs[taskName]) { // could be null across restarts
|
|
appState.cronJobs[taskName].stop();
|
|
}
|
|
|
|
if (!killContainers) return iteratorDone();
|
|
|
|
killContainer(appState.containerIds[taskName], iteratorDone);
|
|
}, callback);
|
|
}
|
|
|
|
function createCronJobs(appId, schedulerConfig) {
|
|
assert.strictEqual(typeof appId, 'string');
|
|
assert(schedulerConfig && typeof schedulerConfig === 'object');
|
|
|
|
debug('creating cron jobs for app %s', appId);
|
|
|
|
var jobs = { };
|
|
|
|
Object.keys(schedulerConfig).forEach(function (taskName) {
|
|
var task = schedulerConfig[taskName];
|
|
|
|
var cronTime = (config.TEST ? '*/5 ' : '00 ') + task.schedule; // time ticks faster in tests
|
|
|
|
debug('scheduling task for %s/%s @ %s : %s', appId, taskName, cronTime, task.command);
|
|
|
|
var cronJob = new CronJob({
|
|
cronTime: cronTime, // at this point, the pattern has been validated
|
|
onTick: doTask.bind(null, appId, taskName),
|
|
start: true
|
|
});
|
|
|
|
jobs[taskName] = cronJob;
|
|
});
|
|
|
|
return jobs;
|
|
}
|
|
|
|
function doTask(appId, taskName, callback) {
|
|
assert.strictEqual(typeof appId, 'string');
|
|
assert.strictEqual(typeof taskName, 'string');
|
|
assert(!callback || typeof callback === 'function');
|
|
|
|
callback = callback || NOOP_CALLBACK;
|
|
|
|
var appState = gState[appId];
|
|
|
|
debug('Executing task %s/%s', appId, taskName);
|
|
|
|
apps.get(appId, function (error, app) {
|
|
if (error) return callback(error);
|
|
|
|
if (app.installationState !== appdb.ISTATE_INSTALLED || app.runState !== appdb.RSTATE_RUNNING) {
|
|
debug('task %s skipped. app %s is not installed/running', taskName, app.id);
|
|
return callback();
|
|
}
|
|
|
|
if (appState.containerIds[taskName]) debug('task %s/%s has existing container %s. killing it', appId, taskName, appState.containerIds[taskName]);
|
|
|
|
killContainer(appState.containerIds[taskName], function (error) {
|
|
if (error) return callback(error);
|
|
|
|
debug('Creating createSubcontainer for %s/%s : %s', app.id, taskName, gState[appId].schedulerConfig[taskName].command);
|
|
|
|
// NOTE: if you change container name here, fix addons.js to return correct container names
|
|
docker.createSubcontainer(app, app.id + '-' + taskName, [ '/bin/sh', '-c', gState[appId].schedulerConfig[taskName].command ], { } /* options */, function (error, container) {
|
|
if (error) return callback(error);
|
|
|
|
appState.containerIds[taskName] = container.id;
|
|
|
|
saveState(gState);
|
|
|
|
docker.startContainer(container.id, callback);
|
|
});
|
|
});
|
|
});
|
|
}
|