tasks: rework the startTask API

it is now async. change was required to reset the pending flag
This commit is contained in:
Girish Ramakrishnan
2025-06-17 18:54:12 +02:00
parent 4770b32287
commit d9c104613c
13 changed files with 178 additions and 202 deletions

View File

@@ -156,10 +156,9 @@ async function add(type, args) {
return String(result.insertId);
}
function startTask(id, options, onTaskFinished) {
async function startTask(id, options) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof options, 'object');
assert(typeof onTaskFinished === 'undefined' || typeof onTaskFinished === 'function');
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`;
debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`);
@@ -168,56 +167,60 @@ function startTask(id, options, onTaskFinished) {
const sudoOptions = { preserveEnv: true, logStream: null };
if (constants.TEST) sudoOptions.logStream = fs.createWriteStream('/dev/null'); // without this output is messed up, not sure why
gTasks[id] = shell.sudo([ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ], sudoOptions, async function (sudoError) {
if (!gTasks[id]) return; // ignore task exit since we are shutting down. see stopAllTasks
const code = sudoError ? sudoError.code : 0;
const p = new Promise((resolve, reject) => {
gTasks[id] = shell.sudo([ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ], sudoOptions, async function (sudoError) {
if (!gTasks[id]) return; // ignore task exit since we are shutting down. see stopAllTasks
debug(`startTask: ${id} completed with code ${code}`);
const code = sudoError ? sudoError.code : 0;
if (options.timeout) clearTimeout(killTimerId);
debug(`startTask: ${id} completed with code ${code}`);
const [getError, task] = await safe(get(id));
if (options.timeout) clearTimeout(killTimerId);
let taskError = null;
if (!getError && task.percent !== 100) { // taskworker crashed or was killed by us
if (code === 0) {
taskError = {
message: `Task ${id} ${timedOut ? 'timed out' : 'stopped'}` ,
code: timedOut ? exports.ETIMEOUT : exports.ESTOPPED
};
} else { // task crashed. for code, maybe we can check systemctl show box-task-1707 -p ExecMainStatus
taskError = {
message: code === 2 ? `Task ${id} crashed as it ran out of memory` : `Task ${id} crashed with code ${code}`,
code: exports.ECRASHED
};
const [getError, task] = await safe(get(id));
let taskError = null;
if (!getError && task.percent !== 100) { // taskworker crashed or was killed by us
if (code === 0) {
taskError = {
message: `Task ${id} ${timedOut ? 'timed out' : 'stopped'}` ,
code: timedOut ? exports.ETIMEOUT : exports.ESTOPPED
};
} else { // task crashed. for code, maybe we can check systemctl show box-task-1707 -p ExecMainStatus
taskError = {
message: code === 2 ? `Task ${id} crashed as it ran out of memory` : `Task ${id} crashed with code ${code}`,
code: exports.ECRASHED
};
}
// note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit
await safe(setCompleted(id, { error: taskError }));
} else if (!getError && task.error) {
taskError = task.error;
} else if (!task) { // db got cleared in tests
taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${id}`);
}
// note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit
await safe(setCompleted(id, { error: taskError }));
} else if (!getError && task.error) {
taskError = task.error;
} else if (!task) { // db got cleared in tests
taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${id}`);
delete gTasks[id];
debug(`startTask: ${id} done. error: %o`, taskError);
if (taskError) reject(taskError); else resolve(task.result);
});
if (options.timeout) {
killTimerId = setTimeout(async function () {
debug(`startTask: task ${id} took too long. killing`);
timedOut = true;
const [error] = await safe(stopTask(id));
if (error) debug(`startTask: error stopping task: ${error.message}`);
}, options.timeout);
}
delete gTasks[id];
if (onTaskFinished) onTaskFinished(taskError, task ? task.result : null);
debug(`startTask: ${id} done. error: %o`, taskError);
});
if (options.timeout) {
killTimerId = setTimeout(async function () {
debug(`startTask: task ${id} took too long. killing`);
timedOut = true;
const [error] = await safe(stopTask(id));
if (error) debug(`startTask: error stopping task: ${error.message}`);
}, options.timeout);
}
update(id, { pending: false }); // fixme: make async
await update(id, { pending: false });
return p;
}
async function stopTask(id) {