diff --git a/src/routes/test/tasks-test.js b/src/routes/test/tasks-test.js index 2eef288f5..88f32ee43 100644 --- a/src/routes/test/tasks-test.js +++ b/src/routes/test/tasks-test.js @@ -31,7 +31,6 @@ describe('Tasks API', function () { expect(response.body.active).to.be(false); // finished expect(response.body.success).to.be(true); expect(response.body.result).to.be('ping'); - expect(response.body.completed).to.be(true); expect(response.body.error).to.be(null); }); @@ -63,7 +62,9 @@ describe('Tasks API', function () { setTimeout(async function () { const response = await superagent.post(`${serverUrl}/api/v1/tasks/${taskId}/stop`) - .query({ access_token: owner.token }); + .query({ access_token: owner.token }) + .send({}) + .ok(() => true); expect(response.status).to.equal(204); }, 100); @@ -77,7 +78,6 @@ describe('Tasks API', function () { expect(response.body.active).to.be(false); // finished expect(response.body.success).to.be(false); expect(response.body.result).to.be(null); - expect(response.body.completed).to.be(true); expect(response.body.error.message).to.contain('stopped'); }); @@ -96,7 +96,6 @@ describe('Tasks API', function () { expect(response.body.tasks[0].active).to.be(false); // finished expect(response.body.tasks[0].success).to.be(true); // finished expect(response.body.tasks[0].result).to.be('ping'); - expect(response.body.tasks[0].completed).to.be(true); expect(response.body.tasks[0].error).to.be(null); }); }); diff --git a/src/scripts/kill-child.sh b/src/scripts/kill-child.sh index 38af0686c..9ad404a1e 100755 --- a/src/scripts/kill-child.sh +++ b/src/scripts/kill-child.sh @@ -23,6 +23,8 @@ function killtree() { killtree "${cpid}" || true done echo "kill-child: killing $pid" + kill -SIGTERM "${pid}" 2>/dev/null || true + sleep 1 kill -SIGKILL "${pid}" 2>/dev/null || true } diff --git a/src/scripts/starttask.sh b/src/scripts/starttask.sh index 8b5ce6833..ef2b19174 100755 --- a/src/scripts/starttask.sh +++ b/src/scripts/starttask.sh @@ -50,7 +50,9 @@ if ! systemd-run --unit "${service_name}" --wait --uid=${id} --gid=${id} \ echo "Service ${service_name} failed to run" # this only happens if the path to task worker itself is wrong fi +# ExecMainCode=0 means killed by signal in ExecMainStatus. ExecMainCode=1 means exited cleanly with code in ExecMainStatus exit_code=$(systemctl show "${service_name}" -p ExecMainCode | sed 's/ExecMainCode=//g') +exit_status=$(systemctl show "${service_name}" -p ExecMainStatus | sed 's/ExecMainStatus=//g') -echo "Service ${service_name} finished with exit code ${exit_code}" -exit "${exit_code}" +echo "Service ${service_name} finished with exit code ${exit_code} and status ${exit_status}" +exit "${exit_status}" diff --git a/src/shell.js b/src/shell.js index 8ce344463..94b257628 100644 --- a/src/shell.js +++ b/src/shell.js @@ -5,6 +5,7 @@ const assert = require('assert'), child_process = require('child_process'), debug = require('debug')('box:shell'), path = require('path'), + safe = require('safetydance'), _ = require('./underscore.js'); exports = module.exports = shell; @@ -48,10 +49,10 @@ function spawn(tag, file, args, options) { const abortSignal = options.abortSignal || null; // note: we use our own handler and not the child_process one return new Promise((resolve, reject) => { - const spawnOptions = _.omit(options, [ 'maxLines', 'logger', 'abortSignal', 'onMessage', 'input', 'encoding' ]); + const spawnOptions = _.omit(options, [ 'maxLines', 'logger', 'abortSignal', 'onMessage', 'input', 'encoding', 'timeout', 'onTimeout' ]); const cp = child_process.spawn(file, args, spawnOptions); const stdoutBuffers = [], stderrBuffers = []; - let stdoutLineCount = 0, stderrLineCount = 0; + let stdoutLineCount = 0, stderrLineCount = 0, killTimerId = null, timedOut = false, terminated = false; cp.stdout.on('data', (data) => { if (logger) return logger(data); @@ -69,7 +70,11 @@ function spawn(tag, file, args, options) { cp.on('close', function (code, signal) { // always called. after 'exit' or 'error' const stdoutBuffer = Buffer.concat(stdoutBuffers); const stdout = options.encoding ? stdoutBuffer.toString(options.encoding) : stdoutBuffer; - if (code === 0) return resolve(stdout); + + if (killTimerId) clearTimeout(killTimerId); + + // if terminated or timedout, the code is ignored + if (!terminated && !timedOut && code === 0) return resolve(stdout); const stderrBuffer = Buffer.concat(stderrBuffers); const stderr = options.encoding ? stderrBuffer.toString(options.encoding) : stderrBuffer; @@ -81,6 +86,8 @@ function spawn(tag, file, args, options) { e.stderrLineCount = stderrLineCount; e.code = code; e.signal = signal; + e.timedOut = timedOut; + e.terminated = terminated; debug(`${tag}: ${file} ${args.join(' ').replace(/\n/g, '\\n')} errored`, e); @@ -91,16 +98,31 @@ function spawn(tag, file, args, options) { debug(`${tag}: ${file} ${args.join(' ').replace(/\n/g, '\\n')} errored`, error); }); - abortSignal?.addEventListener('abort', () => { - debug(`${tag}: aborting ${cp.pid}`); + cp.terminate = function () { + terminated = true; + // many approaches to kill sudo launched process failed. we now have a sudo wrapper to kill the full tree child_process.execFile('/usr/bin/sudo', [ KILL_CHILD_CMD, cp.pid, process.pid ], { encoding: 'utf8' }, (error, stdout, stderr) => { if (error) debug(`${tag}: failed to kill children`, stdout, stderr); - else debug(`${tag}: aborted ${cp.pid}`, stdout, stderr); + else debug(`${tag}: terminated ${cp.pid}`, stdout, stderr); }); + }; + + abortSignal?.addEventListener('abort', () => { + debug(`${tag}: aborting ${cp.pid}`); + cp.terminate(); }, { once: true }); if (options.onMessage) cp.on('message', options.onMessage); // ipc mode messages + if (options.timeout) { + killTimerId = setTimeout(async () => { + debug(`${tag}: timedout`); + timedOut = true; + if (typeof options.onTimeout !== 'function') return cp.terminate(); + await safe(options.onTimeout(), { debug }); + }, options.timeout); + } + // https://github.com/nodejs/node/issues/25231 if ('input' in options) { // when empty, just closes cp.stdin.write(options.input); diff --git a/src/tasks.js b/src/tasks.js index 368db34d8..854cd0116 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -78,24 +78,18 @@ function postProcess(task) { task.error = safe.JSON.parse(task.errorJson); delete task.errorJson; - return task; -} - -function updateStatus(result) { - assert.strictEqual(typeof result, 'object'); - // result.pending - task is scheduled to run at some point - // result.completed - task finished and exit/crash was cleanly collected - result.running = !!gTasks[result.id]; // running means actively running - result.active = result.running || result.pending; // active mean task is 'done' or not. at this point, clients can stop polling this task. - result.success = result.completed && !result.error; // if task has completed without an error + // result.completed - task finished and exit/crash was cleanly collected. internal flag. + task.running = !!gTasks[task.id]; // running means actively running + task.active = task.running || task.pending; // active mean task is 'done' or not. at this point, clients can stop polling this task. + task.success = task.completed && !task.error; // if task has completed without an error // the error in db will be empty if we didn't get a chance to handle task exit - if (!result.active && !result.completed && !result.error) { - result.error = { message: 'Task was stopped because the server restarted or crashed', code: exports.ECRASHED }; + if (!task.active && !task.completed && !task.error) { + task.error = { message: 'Task was stopped because the server restarted or crashed', code: exports.ECRASHED }; } - return result; + return task; } async function get(id) { @@ -104,7 +98,7 @@ async function get(id) { const result = await database.query(`SELECT ${TASKS_FIELDS} FROM tasks WHERE id = ?`, [ id ]); if (result.length === 0) return null; - return updateStatus(postProcess(result[0])); + return postProcess(result[0]); } async function update(id, task) { @@ -163,67 +157,53 @@ async function startTask(id, options) { const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`; debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`); - let killTimerId = null, timedOut = false; - - await update(id, { pending: false }); - const ac = new AbortController(); gTasks[id] = ac; - if (options.timeout) { - killTimerId = setTimeout(async function () { - debug(`startTask: task ${id} took too long. killing`); - timedOut = true; - const [error] = await safe(stopTask(id)); - if (error) debug(`startTask: error stopping task: ${error.message}`); - }, options.timeout); - } - - const sudoArgs = [ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ]; const sudoOptions = { preserveEnv: true, + encoding: 'utf8', abortSignal: ac.signal, + timeout: options.timeout || 0, + onTimeout: async () => { // custom stop because kill won't do. the task is running in some other process tree + debug(`onTimeout: ${id}`); + await stopTask(id); + } }; - const [sudoError] = await safe(shell.sudo(sudoArgs, sudoOptions)); - const code = sudoError ? sudoError.code : 0; + safe(update(id, { pending: false }), { debug }); // background. we have to create the cp immediately to prevent race with stopTask() - debug(`startTask: ${id} completed with code ${code} ${!gTasks[id] ? '. this will be ignored' : ''}`); + const [sudoError] = await safe(shell.sudo([ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ], sudoOptions)); - if (options.timeout) clearTimeout(killTimerId); - - if (!gTasks[id]) return; // when box code is shutting down, don't update the task status as "crashed". see stopAllTasks() - - const [getError, task] = await safe(get(id)); - - let taskError = null; - if (!getError && !task.completed) { // taskworker crashed or was killed by us - if (code === 0) { - taskError = { - message: `Task ${id} ${timedOut ? 'timed out' : 'stopped'}` , - code: timedOut ? exports.ETIMEOUT : exports.ESTOPPED - }; - } else { // task crashed. for code, maybe we can check systemctl show box-task-1707 -p ExecMainStatus - taskError = { - message: code === 2 ? `Task ${id} crashed as it ran out of memory` : `Task ${id} crashed with code ${code}`, - code: exports.ECRASHED - }; - } - - // note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit - await safe(setCompleted(id, { error: taskError })); - } else if (!getError && task.error) { - taskError = task.error; - } else if (!task) { // db got cleared in tests - taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${id}`); + if (!gTasks[id]) { // when box code is shutting down, don't update the task status as "crashed". see stopAllTasks() + debug(`startTask: ${id} completed as a result of box shutdown`); + return null; } delete gTasks[id]; + const task = await get(id); + if (!task) return null; // task disappeared on us. this can happen when db got cleared in tests + + if (task.completed) { // task completed. we can trust the db result + debug(`startTask: ${id} completed. error: %o`, task.error); + if (task.error) throw task.error; + return task.result; + } + + assert.ok(sudoError, 'sudo should have errorred because task did not complete!'); + + // taskworker.sh forwards the exit code of the actual worker. It's either a raw signal number OR the exit code + let taskError = null; + if (sudoError.timedOut) taskError = { message: `Task ${id} timed out`, code: exports.ETIMEOUT }; + else if (sudoError.code === 70) taskError = { message: `Task ${id} stopped`, code: exports.ESTOPPED }; // set by taskworker SIGTERM + else if (sudoError.code === 9 /* SIGKILL */) taskError = { message: `Task ${id} ran out of memory or terminated`, code: exports.ECRASHED }; // SIGTERM with oom gets set as 2 by nodejs + else if (sudoError.code === 50) taskError = { message:`Task ${id} crashed with code ${sudoError.code}`, code: exports.ECRASHED }; + else taskError = { message:`Task ${id} crashed with unknown code ${sudoError.code}`, code: exports.ECRASHED }; + debug(`startTask: ${id} done. error: %o`, taskError); - if (taskError) throw taskError; - return task.result; + throw taskError; } async function stopTask(id) { @@ -233,7 +213,7 @@ async function stopTask(id) { debug(`stopTask: stopping task ${id}`); - await shell.sudo([ STOP_TASK_CMD, id, ], {}); + await shell.sudo([ STOP_TASK_CMD, id, ], {}); // note: this is stopping the systemd-run task. the sudo will exit when this exits } async function stopAllTasks() { @@ -265,7 +245,6 @@ async function listByTypePaged(type, page, perPage) { const results = await database.query(query, data); results.forEach(postProcess); - results.forEach(updateStatus); return results; } @@ -292,7 +271,7 @@ async function getLogs(task, options) { // removes all fields that are strictly private and should never be returned by API calls function removePrivateFields(task) { - return _.pick(task, ['id', 'type', 'percent', 'message', 'error', 'running', 'completed', 'active', 'creationTime', 'result', 'ts', 'success']); + return _.pick(task, ['id', 'type', 'percent', 'message', 'error', 'running', 'active', 'creationTime', 'result', 'ts', 'success']); } async function del(id) { diff --git a/src/taskworker.js b/src/taskworker.js index f73ad5160..cafc1cf8c 100755 --- a/src/taskworker.js +++ b/src/taskworker.js @@ -17,6 +17,7 @@ const apptask = require('./apptask.js'), reverseProxy = require('./reverseproxy.js'), safe = require('safetydance'), tasks = require('./tasks.js'), + timers = require('timers/promises'), updater = require('./updater.js'); const TASKS = { // indexed by task type @@ -34,7 +35,7 @@ const TASKS = { // indexed by task type _identity: async (arg, progressCallback) => { progressCallback({ percent: 20 }); return arg; }, _error: async (arg, progressCallback) => { progressCallback({ percent: 20 }); throw new Error(`Failed for arg: ${arg}`); }, _crash: (arg) => { throw new Error(`Crashing for arg: ${arg}`); }, // the test looks for this debug string in the log file - _sleep: async (arg) => setTimeout(process.exit, arg) + _sleep: async (arg) => await timers.setTimeout(parseInt(arg, 10)) }; if (process.argv.length !== 4) { @@ -62,6 +63,8 @@ async function setupNetworking() { } // this is also used as the 'uncaughtException' handler which can only have synchronous functions +// taskworker.sh forwards the exit code of the actual worker. It's either a raw signal number OR the exit code. So, choose exit codes > 31 +// 50 - internal error , 70 - SIGTERM exit function exitSync(status) { if (status.error) fs.write(logFd, status.error.stack + '\n', function () {}); fs.fsyncSync(logFd); @@ -87,7 +90,7 @@ async function main() { process.on('SIGTERM', () => { debug('Terminated'); - exitSync({ code: 0 }); + exitSync({ code: 70 }); }); // ensure we log task crashes with the task logs. neither console.log nor debug are sync for some reason @@ -115,9 +118,9 @@ async function main() { debug(`Task took ${(new Date() - startTime)/1000} seconds`); await safe(tasks.setCompleted(taskId, progress)); - exitSync({ error: runError, code: runError ? 50 : 0 }); + exitSync({ error: runError, code: 0 }); // code itself ran fine, but resulted in some error. so exit with success } catch (error) { - exitSync({ error, code: 1 }); // do not call setCompleted() intentionally. the task code must be resilient enough to handle it + exitSync({ error, code: 50 }); // do not call setCompleted() intentionally. the task code must be resilient enough to handle it } } diff --git a/src/test/tasks-test.js b/src/test/tasks-test.js index c7aa62c6e..7e72798c6 100644 --- a/src/test/tasks-test.js +++ b/src/test/tasks-test.js @@ -121,6 +121,17 @@ describe('task', function () { const [error, result] = await safe(tasks.startTask(taskId, {})); if (!error) throw new Error('expecting task to stop'); expect(error.message).to.contain('stopped'); + expect(error.code).to.be(tasks.ESTOPPED); + expect(result).to.not.be.ok(); + }); + + it('task timesout', async function () { + const taskId = await tasks.add(tasks._TASK_SLEEP, [ 10000 ]); + + const [error, result] = await safe(tasks.startTask(taskId, { timeout: 2000 })); + if (!error) throw new Error('expecting task to timeout'); + expect(error.code).to.be(tasks.ETIMEOUT); + expect(error.message).to.contain('timed out'); expect(result).to.not.be.ok(); }); });