scheduler: kill existing tasks if they are still running

This commit is contained in:
Girish Ramakrishnan
2015-10-19 18:27:18 -07:00
parent fb56795cbd
commit 682c2721d2

View File

@@ -7,6 +7,7 @@ exports = module.exports = {
var appdb = require('./appdb.js'),
apps = require('./apps.js'),
assert = require('assert'),
async = require('async'),
CronJob = require('cron').CronJob,
debug = require('debug')('box:src/scheduler'),
docker = require('./docker.js').connection,
@@ -14,7 +15,7 @@ var appdb = require('./appdb.js'),
var NOOP_CALLBACK = function (error) { if (error) console.error(error); };
var gTasks = { }; // appId -> { tasksConfig (manifest), jobs -> { execContainer, cronJob } }
var gTasks = { }; // appId -> { tasksConfig (manifest), jobs -> { containerId, cronJob } }
function sync(callback) {
assert(!callback || typeof callback === 'function');
@@ -92,15 +93,14 @@ function runTask(appId, taskName, task, callback) {
callback = callback || NOOP_CALLBACK;
function onTaskCompleted(error) {
debug('task %s/%s completed. error: %s', appId, taskName, error ? error.message : 'null');
var containerId = gTasks[appId].jobs[taskName].containerId;
gTasks[appId].jobs[taskName].execContainer = null;
}
// https://github.com/docker/docker/pull/9167 (no way to stop a running exec)
if (gTasks[appId].jobs[taskName].execContainer) {
if (containerId) {
debug('task %s/%s is already running');
async.series([
docker.stopContainer.bind(null, containerId),
docker.deleteContainer.bind(null, containerId)
], callback);
return;
}
@@ -112,23 +112,12 @@ function runTask(appId, taskName, task, callback) {
return callback();
}
var container = docker.getContainer(app.containerId);
debug('task %s/%s starting', app.id, taskName);
debug('task %s/%s started', app.id, taskName);
docker.createSubcontainer(app, [ '/bin/sh', '-c', task.command ], function (error, container) {
gTasks[appId].jobs[taskName].containerId = container.id;
container.exec({ Cmd: [ '/bin/sh', '-c', task.command ], AttachStdout: true, AttachStderr: true }, function (error, execContainer) {
if (error) return callback(error);
gTasks[appId].jobs[taskName].execContainer = execContainer;
execContainer.start(function (error, stream) {
if (error) return callback(error);
stream.setEncoding('utf8');
stream.on('error', onTaskCompleted);
stream.on('data', function (d) { debug('%s/%s : %s', app.id, taskName, d); });
stream.on('end', onTaskCompleted);
});
docker.startContainer(container.id, callback);
});
});
}