Make tasks indexed by id instead of type
The caas migrate logic is broken at this point until it uses new task framework
This commit is contained in:
+39
-48
@@ -1,14 +1,15 @@
|
||||
'use strict';
|
||||
|
||||
exports = module.exports = {
|
||||
update: update,
|
||||
get: get,
|
||||
update: update,
|
||||
|
||||
startTask: startTask,
|
||||
stopTask: stopTask,
|
||||
|
||||
TaskError: TaskError,
|
||||
|
||||
// task types
|
||||
TASK_BACKUP: 'backup',
|
||||
TASK_UPDATE: 'update',
|
||||
TASK_MIGRATE: 'migrate'
|
||||
@@ -29,7 +30,7 @@ let assert = require('assert'),
|
||||
|
||||
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
|
||||
|
||||
const TASKS = {
|
||||
const TASKS = { // indexed by task type
|
||||
backup: {
|
||||
lock: locker.OP_FULL_BACKUP,
|
||||
logFile: paths.BACKUP_LOG_FILE,
|
||||
@@ -48,7 +49,7 @@ const TASKS = {
|
||||
}
|
||||
};
|
||||
|
||||
let gTasks = {};
|
||||
let gTasks = {}; // indexed by task id
|
||||
|
||||
function TaskError(reason, errorOrMessage) {
|
||||
assert.strictEqual(typeof reason, 'string');
|
||||
@@ -73,20 +74,6 @@ TaskError.INTERNAL_ERROR = 'Internal Error';
|
||||
TaskError.BAD_STATE = 'Bad State';
|
||||
TaskError.NOT_FOUND = 'Not Found';
|
||||
|
||||
function update(id, progress, callback) {
|
||||
assert.strictEqual(typeof id, 'string');
|
||||
assert.strictEqual(typeof progress, 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
debug(`${id}: ${JSON.stringify(progress)}`);
|
||||
|
||||
taskdb.update(id, progress, function (error) {
|
||||
if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error));
|
||||
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
||||
function get(id, callback) {
|
||||
assert.strictEqual(typeof id, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
@@ -101,21 +88,28 @@ function get(id, callback) {
|
||||
});
|
||||
}
|
||||
|
||||
function clear(id, args, callback) {
|
||||
function update(id, task, callback) {
|
||||
assert.strictEqual(typeof id, 'string');
|
||||
assert(args && typeof args === 'object');
|
||||
assert.strictEqual(typeof task, 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
update(id, { percent: 0, message: 'Starting', result: '', errorMessage: '', args: args }, callback);
|
||||
debug(`${id}: ${JSON.stringify(task)}`);
|
||||
|
||||
taskdb.update(id, task, function (error) {
|
||||
if (error && error.reason == DatabaseError.NOT_FOUND) return callback(new TaskError(TaskError.NOT_FOUND));
|
||||
if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error));
|
||||
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
||||
function startTask(id, args, auditSource, callback) {
|
||||
assert.strictEqual(typeof id, 'string');
|
||||
function startTask(type, args, auditSource, callback) {
|
||||
assert.strictEqual(typeof type, 'string');
|
||||
assert(args && typeof args === 'object');
|
||||
assert.strictEqual(typeof auditSource, 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
const taskInfo = TASKS[id];
|
||||
const taskInfo = TASKS[type];
|
||||
if (!taskInfo) return callback(new TaskError(TaskError.NOT_FOUND, 'No such task'));
|
||||
|
||||
let error = locker.lock(taskInfo.lock);
|
||||
@@ -128,39 +122,39 @@ function startTask(id, args, auditSource, callback) {
|
||||
return callback(new TaskError(TaskError.INTERNAL_ERROR, error.message));
|
||||
}
|
||||
|
||||
debug(`startTask - starting task ${id}. logs at ${taskInfo.logFile}`);
|
||||
taskdb.add({ type: type, percent: 0, message: 'Starting', args: args }, function (error, taskId) {
|
||||
if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error));
|
||||
|
||||
// when parent process dies, this process is killed because KillMode=control-group in systemd unit file
|
||||
assert(!gTasks[id], 'Task is already running');
|
||||
debug(`startTask - starting task ${type}. logs at ${taskInfo.logFile}. id ${taskId}`);
|
||||
|
||||
clear(id, args, NOOP_CALLBACK);
|
||||
eventlog.add(taskInfo.startEventId, auditSource, args);
|
||||
eventlog.add(taskInfo.startEventId, auditSource, args);
|
||||
|
||||
gTasks[id] = child_process.fork(taskInfo.program, [], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc
|
||||
gTasks[id].once('exit', function (code, signal) {
|
||||
debug(`startTask: ${id} completed with code ${code} and signal ${signal}`);
|
||||
gTasks[taskId] = child_process.fork(taskInfo.program, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc
|
||||
gTasks[taskId].once('exit', function (code, signal) {
|
||||
debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`);
|
||||
|
||||
get(id, function (error, progress) {
|
||||
if (!error && progress.percent !== 100) { // task crashed or was killed by us (code 50)
|
||||
error = code === 0 ? new Error(`${id} task stopped`) : new Error(`${id} task crashed with code ${code} and signal ${signal}`);
|
||||
update(id, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK);
|
||||
} else if (!error && progress.errorMessage) {
|
||||
error = new Error(progress.errorMessage);
|
||||
}
|
||||
get(taskId, function (error, task) {
|
||||
if (!error && task.percent !== 100) { // task crashed or was killed by us (code 50)
|
||||
error = code === 0 ? new Error(`${taskId} task stopped`) : new Error(`${taskId} task crashed with code ${code} and signal ${signal}`);
|
||||
update(taskId, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK);
|
||||
} else if (!error && task.errorMessage) {
|
||||
error = new Error(task.errorMessage);
|
||||
}
|
||||
|
||||
eventlog.add(taskInfo.finishEventId, auditSource, _.extend({ errorMessage: error ? error.message : null }, progress ? progress.result : {}));
|
||||
eventlog.add(taskInfo.finishEventId, auditSource, _.extend({ errorMessage: error ? error.message : null }, task ? task.result : {}));
|
||||
|
||||
locker.unlock(taskInfo.lock);
|
||||
locker.unlock(taskInfo.lock);
|
||||
|
||||
if (error) taskInfo.onFailure(error);
|
||||
if (error) taskInfo.onFailure(error);
|
||||
|
||||
gTasks[id] = null;
|
||||
gTasks[taskId] = null;
|
||||
|
||||
debug(`startTask: ${id} done`);
|
||||
debug(`startTask: ${taskId} done`);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
callback(null);
|
||||
callback(null, taskId);
|
||||
});
|
||||
}
|
||||
|
||||
function stopTask(id, auditSource, callback) {
|
||||
@@ -168,9 +162,6 @@ function stopTask(id, auditSource, callback) {
|
||||
assert.strictEqual(typeof auditSource, 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
const taskInfo = TASKS[id];
|
||||
if (!taskInfo) return callback(new TaskError(TaskError.NOT_FOUND, 'No such task'));
|
||||
|
||||
if (!gTasks[id]) return callback(new TaskError(TaskError.BAD_STATE, 'task is not active'));
|
||||
|
||||
debug(`stopTask: stopping task ${id}`);
|
||||
|
||||
Reference in New Issue
Block a user