rework task API to be two-phase
this lets us avoid this EE based API. we now add and then start explicitly.
This commit is contained in:
+51
-47
@@ -2,6 +2,7 @@
|
||||
|
||||
exports = module.exports = {
|
||||
get: get,
|
||||
add: add,
|
||||
update: update,
|
||||
listByTypePaged: listByTypePaged,
|
||||
|
||||
@@ -33,7 +34,6 @@ let assert = require('assert'),
|
||||
child_process = require('child_process'),
|
||||
DatabaseError = require('./databaseerror.js'),
|
||||
debug = require('debug')('box:tasks'),
|
||||
EventEmitter = require('events'),
|
||||
paths = require('./paths.js'),
|
||||
safe = require('safetydance'),
|
||||
spawn = require('child_process').spawn,
|
||||
@@ -69,6 +69,13 @@ TaskError.INTERNAL_ERROR = 'Internal Error';
|
||||
TaskError.BAD_STATE = 'Bad State';
|
||||
TaskError.NOT_FOUND = 'Not Found';
|
||||
|
||||
function postProcess(result) {
|
||||
assert.strictEqual(typeof result, 'object');
|
||||
|
||||
result.active = !!gTasks[result.id];
|
||||
result.success = result.percent === 100 && !result.errorMessage;
|
||||
}
|
||||
|
||||
function get(id, callback) {
|
||||
assert.strictEqual(typeof id, 'string');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
@@ -77,9 +84,7 @@ function get(id, callback) {
|
||||
if (error && error.reason == DatabaseError.NOT_FOUND) return callback(new TaskError(TaskError.NOT_FOUND));
|
||||
if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error));
|
||||
|
||||
// add some virtual fields
|
||||
task.active = !!gTasks[id];
|
||||
task.success = task.percent === 100 && !task.errorMessage;
|
||||
postProcess(task);
|
||||
|
||||
callback(null, task);
|
||||
});
|
||||
@@ -100,54 +105,53 @@ function update(id, task, callback) {
|
||||
});
|
||||
}
|
||||
|
||||
function startTask(type, args, options) {
|
||||
function add(type, args, callback) {
|
||||
assert.strictEqual(typeof type, 'string');
|
||||
assert(Array.isArray(args));
|
||||
assert(!options || typeof options === 'object');
|
||||
|
||||
let events = new EventEmitter();
|
||||
|
||||
options = options || {};
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
taskdb.add({ type: type, percent: 0, message: 'Starting', args: args }, function (error, taskId) {
|
||||
if (error) return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error));
|
||||
if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error));
|
||||
|
||||
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${taskId}.log`;
|
||||
let fd = safe.fs.openSync(logFile, 'w'); // will autoclose
|
||||
if (!fd) {
|
||||
debug(`startTask: unable to get log filedescriptor ${safe.error.message}`);
|
||||
return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error.message));
|
||||
}
|
||||
|
||||
debug(`startTask - starting task ${type}. logs at ${logFile} id ${taskId}`);
|
||||
|
||||
gTasks[taskId] = child_process.fork(`${__dirname}/taskworker.js`, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc
|
||||
gTasks[taskId].once('exit', function (code, signal) {
|
||||
debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`);
|
||||
|
||||
get(taskId, function (error, task) {
|
||||
if (!error && task.percent !== 100) { // task crashed or was killed by us (code 50)
|
||||
error = code === 0 ? new Error(`Task ${taskId} stopped`) : new Error(`Task ${taskId} crashed with code ${code} and signal ${signal}`);
|
||||
update(taskId, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK);
|
||||
} else if (!error && task.errorMessage) {
|
||||
error = new Error(task.errorMessage);
|
||||
} else if (!task) { // db got cleared in tests
|
||||
error = new Error(`No such task ${taskId}`);
|
||||
}
|
||||
|
||||
gTasks[taskId] = null;
|
||||
|
||||
events.emit('finish', error, task ? task.result : null);
|
||||
|
||||
debug(`startTask: ${taskId} done`);
|
||||
});
|
||||
});
|
||||
|
||||
events.id = taskId;
|
||||
events.emit('start', taskId);
|
||||
callback(null, taskId);
|
||||
});
|
||||
}
|
||||
|
||||
return events;
|
||||
function startTask(taskId, options, callback) {
|
||||
assert.strictEqual(typeof taskId, 'string');
|
||||
assert.strictEqual(typeof options, 'object');
|
||||
assert.strictEqual(typeof callback, 'function');
|
||||
|
||||
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${taskId}.log`;
|
||||
let fd = safe.fs.openSync(logFile, 'w'); // will autoclose
|
||||
if (!fd) {
|
||||
debug(`startTask: unable to get log filedescriptor ${safe.error.message}`);
|
||||
return callback(new TaskError(TaskError.INTERNAL_ERROR, safe.error));
|
||||
}
|
||||
|
||||
debug(`startTask - starting task ${taskId}. logs at ${logFile}`);
|
||||
|
||||
gTasks[taskId] = child_process.fork(`${__dirname}/taskworker.js`, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc
|
||||
gTasks[taskId].once('exit', function (code, signal) {
|
||||
debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`);
|
||||
|
||||
get(taskId, function (error, task) {
|
||||
if (!error && task.percent !== 100) { // task crashed or was killed by us (code 50)
|
||||
error = code === 0 ? new Error(`Task ${taskId} stopped`) : new Error(`Task ${taskId} crashed with code ${code} and signal ${signal}`);
|
||||
update(taskId, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK);
|
||||
} else if (!error && task.errorMessage) {
|
||||
error = new Error(task.errorMessage);
|
||||
} else if (!task) { // db got cleared in tests
|
||||
error = new Error(`No such task ${taskId}`);
|
||||
}
|
||||
|
||||
delete gTasks[taskId];
|
||||
|
||||
callback(error, task ? task.result : null);
|
||||
|
||||
debug(`startTask: ${taskId} done`);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function stopTask(id, callback) {
|
||||
@@ -172,7 +176,7 @@ function listByTypePaged(type, page, perPage, callback) {
|
||||
taskdb.listByTypePaged(type, page, perPage, function (error, tasks) {
|
||||
if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error));
|
||||
|
||||
tasks.forEach((task) => { task.active = !!gTasks[task.id]; });
|
||||
tasks.forEach(postProcess);
|
||||
|
||||
callback(null, tasks);
|
||||
});
|
||||
@@ -228,6 +232,6 @@ function getLogs(taskId, options, callback) {
|
||||
|
||||
// removes all fields that are strictly private and should never be returned by API calls
|
||||
function removePrivateFields(task) {
|
||||
var result = _.pick(task, 'id', 'type', 'percent', 'message', 'errorMessage', 'active', 'creationTime', 'result', 'ts');
|
||||
var result = _.pick(task, 'id', 'type', 'percent', 'message', 'errorMessage', 'active', 'creationTime', 'result', 'ts', 'success');
|
||||
return result;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user