shell: add timeout logic and rework error handling
what's important: * if task code ran, it exits with 0. this code is regardless of (error, result) * when it exited cleanly, we will get the values from the database * if task timed out, the box code kills it and it has a flag tracking timedOut. we can ignore exit code in this case. * if task code was stopped, box code will send SIGTERM which ideally it will handle and end with 70. * if task code crashed and it caught the exception, it will return 50 * if task code crashed and node nuked us, it will exit with 1 * if task code was killed with some unhandleabe signal, taskworker.sh will return the signal (9=SIGKILL)
This commit is contained in:
+41
-62
@@ -78,24 +78,18 @@ function postProcess(task) {
|
||||
task.error = safe.JSON.parse(task.errorJson);
|
||||
delete task.errorJson;
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
function updateStatus(result) {
|
||||
assert.strictEqual(typeof result, 'object');
|
||||
|
||||
// result.pending - task is scheduled to run at some point
|
||||
// result.completed - task finished and exit/crash was cleanly collected
|
||||
result.running = !!gTasks[result.id]; // running means actively running
|
||||
result.active = result.running || result.pending; // active mean task is 'done' or not. at this point, clients can stop polling this task.
|
||||
result.success = result.completed && !result.error; // if task has completed without an error
|
||||
// result.completed - task finished and exit/crash was cleanly collected. internal flag.
|
||||
task.running = !!gTasks[task.id]; // running means actively running
|
||||
task.active = task.running || task.pending; // active mean task is 'done' or not. at this point, clients can stop polling this task.
|
||||
task.success = task.completed && !task.error; // if task has completed without an error
|
||||
|
||||
// the error in db will be empty if we didn't get a chance to handle task exit
|
||||
if (!result.active && !result.completed && !result.error) {
|
||||
result.error = { message: 'Task was stopped because the server restarted or crashed', code: exports.ECRASHED };
|
||||
if (!task.active && !task.completed && !task.error) {
|
||||
task.error = { message: 'Task was stopped because the server restarted or crashed', code: exports.ECRASHED };
|
||||
}
|
||||
|
||||
return result;
|
||||
return task;
|
||||
}
|
||||
|
||||
async function get(id) {
|
||||
@@ -104,7 +98,7 @@ async function get(id) {
|
||||
const result = await database.query(`SELECT ${TASKS_FIELDS} FROM tasks WHERE id = ?`, [ id ]);
|
||||
if (result.length === 0) return null;
|
||||
|
||||
return updateStatus(postProcess(result[0]));
|
||||
return postProcess(result[0]);
|
||||
}
|
||||
|
||||
async function update(id, task) {
|
||||
@@ -163,67 +157,53 @@ async function startTask(id, options) {
|
||||
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`;
|
||||
debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`);
|
||||
|
||||
let killTimerId = null, timedOut = false;
|
||||
|
||||
await update(id, { pending: false });
|
||||
|
||||
const ac = new AbortController();
|
||||
gTasks[id] = ac;
|
||||
|
||||
if (options.timeout) {
|
||||
killTimerId = setTimeout(async function () {
|
||||
debug(`startTask: task ${id} took too long. killing`);
|
||||
timedOut = true;
|
||||
const [error] = await safe(stopTask(id));
|
||||
if (error) debug(`startTask: error stopping task: ${error.message}`);
|
||||
}, options.timeout);
|
||||
}
|
||||
|
||||
const sudoArgs = [ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ];
|
||||
const sudoOptions = {
|
||||
preserveEnv: true,
|
||||
encoding: 'utf8',
|
||||
abortSignal: ac.signal,
|
||||
timeout: options.timeout || 0,
|
||||
onTimeout: async () => { // custom stop because kill won't do. the task is running in some other process tree
|
||||
debug(`onTimeout: ${id}`);
|
||||
await stopTask(id);
|
||||
}
|
||||
};
|
||||
|
||||
const [sudoError] = await safe(shell.sudo(sudoArgs, sudoOptions));
|
||||
const code = sudoError ? sudoError.code : 0;
|
||||
safe(update(id, { pending: false }), { debug }); // background. we have to create the cp immediately to prevent race with stopTask()
|
||||
|
||||
debug(`startTask: ${id} completed with code ${code} ${!gTasks[id] ? '. this will be ignored' : ''}`);
|
||||
const [sudoError] = await safe(shell.sudo([ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ], sudoOptions));
|
||||
|
||||
if (options.timeout) clearTimeout(killTimerId);
|
||||
|
||||
if (!gTasks[id]) return; // when box code is shutting down, don't update the task status as "crashed". see stopAllTasks()
|
||||
|
||||
const [getError, task] = await safe(get(id));
|
||||
|
||||
let taskError = null;
|
||||
if (!getError && !task.completed) { // taskworker crashed or was killed by us
|
||||
if (code === 0) {
|
||||
taskError = {
|
||||
message: `Task ${id} ${timedOut ? 'timed out' : 'stopped'}` ,
|
||||
code: timedOut ? exports.ETIMEOUT : exports.ESTOPPED
|
||||
};
|
||||
} else { // task crashed. for code, maybe we can check systemctl show box-task-1707 -p ExecMainStatus
|
||||
taskError = {
|
||||
message: code === 2 ? `Task ${id} crashed as it ran out of memory` : `Task ${id} crashed with code ${code}`,
|
||||
code: exports.ECRASHED
|
||||
};
|
||||
}
|
||||
|
||||
// note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit
|
||||
await safe(setCompleted(id, { error: taskError }));
|
||||
} else if (!getError && task.error) {
|
||||
taskError = task.error;
|
||||
} else if (!task) { // db got cleared in tests
|
||||
taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${id}`);
|
||||
if (!gTasks[id]) { // when box code is shutting down, don't update the task status as "crashed". see stopAllTasks()
|
||||
debug(`startTask: ${id} completed as a result of box shutdown`);
|
||||
return null;
|
||||
}
|
||||
|
||||
delete gTasks[id];
|
||||
|
||||
const task = await get(id);
|
||||
if (!task) return null; // task disappeared on us. this can happen when db got cleared in tests
|
||||
|
||||
if (task.completed) { // task completed. we can trust the db result
|
||||
debug(`startTask: ${id} completed. error: %o`, task.error);
|
||||
if (task.error) throw task.error;
|
||||
return task.result;
|
||||
}
|
||||
|
||||
assert.ok(sudoError, 'sudo should have errorred because task did not complete!');
|
||||
|
||||
// taskworker.sh forwards the exit code of the actual worker. It's either a raw signal number OR the exit code
|
||||
let taskError = null;
|
||||
if (sudoError.timedOut) taskError = { message: `Task ${id} timed out`, code: exports.ETIMEOUT };
|
||||
else if (sudoError.code === 70) taskError = { message: `Task ${id} stopped`, code: exports.ESTOPPED }; // set by taskworker SIGTERM
|
||||
else if (sudoError.code === 9 /* SIGKILL */) taskError = { message: `Task ${id} ran out of memory or terminated`, code: exports.ECRASHED }; // SIGTERM with oom gets set as 2 by nodejs
|
||||
else if (sudoError.code === 50) taskError = { message:`Task ${id} crashed with code ${sudoError.code}`, code: exports.ECRASHED };
|
||||
else taskError = { message:`Task ${id} crashed with unknown code ${sudoError.code}`, code: exports.ECRASHED };
|
||||
|
||||
debug(`startTask: ${id} done. error: %o`, taskError);
|
||||
|
||||
if (taskError) throw taskError;
|
||||
return task.result;
|
||||
throw taskError;
|
||||
}
|
||||
|
||||
async function stopTask(id) {
|
||||
@@ -233,7 +213,7 @@ async function stopTask(id) {
|
||||
|
||||
debug(`stopTask: stopping task ${id}`);
|
||||
|
||||
await shell.sudo([ STOP_TASK_CMD, id, ], {});
|
||||
await shell.sudo([ STOP_TASK_CMD, id, ], {}); // note: this is stopping the systemd-run task. the sudo will exit when this exits
|
||||
}
|
||||
|
||||
async function stopAllTasks() {
|
||||
@@ -265,7 +245,6 @@ async function listByTypePaged(type, page, perPage) {
|
||||
|
||||
const results = await database.query(query, data);
|
||||
results.forEach(postProcess);
|
||||
results.forEach(updateStatus);
|
||||
return results;
|
||||
}
|
||||
|
||||
@@ -292,7 +271,7 @@ async function getLogs(task, options) {
|
||||
|
||||
// removes all fields that are strictly private and should never be returned by API calls
|
||||
function removePrivateFields(task) {
|
||||
return _.pick(task, ['id', 'type', 'percent', 'message', 'error', 'running', 'completed', 'active', 'creationTime', 'result', 'ts', 'success']);
|
||||
return _.pick(task, ['id', 'type', 'percent', 'message', 'error', 'running', 'active', 'creationTime', 'result', 'ts', 'success']);
|
||||
}
|
||||
|
||||
async function del(id) {
|
||||
|
||||
Reference in New Issue
Block a user