'use strict'; exports = module.exports = { get, add, update, setCompleted, setCompletedByType, list, getLogs, startTask, stopTask, stopAllTasks, removePrivateFields, _del: del, // task types. if you add a task here, fill up the function table in taskworker and dashboard constants.js // '_' prefix is removed for lookup TASK_APP: 'app', // "prefix" allows us to locate the tasks of a specific app or backup site TASK_APP_BACKUP_PREFIX: 'appBackup_', TASK_FULL_BACKUP_PREFIX: 'backup_', // full backup TASK_CLEAN_BACKUPS_PREFIX: 'cleanBackups_', TASK_BOX_UPDATE: 'boxUpdate', TASK_CHECK_CERTS: 'checkCerts', TASK_SYNC_DYNDNS: 'syncDyndns', TASK_PREPARE_DASHBOARD_LOCATION: 'prepareDashboardLocation', TASK_SYNC_EXTERNAL_LDAP: 'syncExternalLdap', TASK_CHANGE_MAIL_LOCATION: 'changeMailLocation', TASK_SYNC_DNS_RECORDS: 'syncDnsRecords', TASK_CHECK_BACKUP_INTEGRITY: 'checkBackupIntegrity', // error codes ESTOPPED: 'stopped', ECRASHED: 'crashed', ETIMEOUT: 'timeout', // testing _TASK_IDENTITY: 'identity', _TASK_CRASH: 'crash', _TASK_ERROR: 'error', _TASK_SLEEP: 'sleep' }; const assert = require('node:assert'), BoxError = require('./boxerror.js'), database = require('./database.js'), debug = require('debug')('box:tasks'), logs = require('./logs.js'), mysql = require('mysql2'), path = require('node:path'), paths = require('./paths.js'), safe = require('safetydance'), shell = require('./shell.js')('tasks'), _ = require('./underscore.js'); let gTasks = {}; // holds AbortControllers indexed by task id const START_TASK_CMD = path.join(__dirname, 'scripts/starttask.sh'); const STOP_TASK_CMD = path.join(__dirname, 'scripts/stoptask.sh'); const TASKS_FIELDS = [ 'id', 'type', 'argsJson', 'percent', 'pending', 'completed', 'message', 'errorJson', 'creationTime', 'resultJson', 'ts' ]; function postProcess(task) { assert.strictEqual(typeof task, 'object'); assert(task.argsJson === null || typeof task.argsJson === 'string'); task.args = safe.JSON.parse(task.argsJson) || []; delete task.argsJson; task.id = String(task.id); task.pending = !!task.pending; task.completed = !!task.completed; task.result = JSON.parse(task.resultJson); delete task.resultJson; task.error = safe.JSON.parse(task.errorJson); delete task.errorJson; // result.pending - task is scheduled to run at some point // result.completed - task finished and exit/crash was cleanly collected. internal flag. task.running = !!gTasks[task.id]; // running means actively running task.active = task.running || task.pending; // active mean task is 'done'. at this point, clients can stop polling this task. task.success = task.completed && !task.error; // if task has completed without an error // the error in db will be empty if task is done but the completed flag is not set if (!task.active && !task.completed) { task.error = { message: 'Task was stopped because the server restarted or crashed', code: exports.ECRASHED }; } return task; } async function get(id) { assert.strictEqual(typeof id, 'string'); const result = await database.query(`SELECT ${TASKS_FIELDS} FROM tasks WHERE id = ?`, [ id ]); if (result.length === 0) return null; return postProcess(result[0]); } async function update(id, task) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof task, 'object'); debug(`updating task ${id} with: ${JSON.stringify(task)}`); const args = [], fields = []; for (const k in task) { if (k === 'result' || k === 'error') { fields.push(`${k}Json = ?`); args.push(JSON.stringify(task[k])); } else { fields.push(k + ' = ?'); args.push(task[k]); } } args.push(id); const result = await database.query('UPDATE tasks SET ' + fields.join(', ') + ' WHERE id = ?', args); if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found'); } async function setCompleted(id, task) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof task, 'object'); debug(`setCompleted - ${id}: ${JSON.stringify(task)}`); await update(id, Object.assign({ completed: true }, task)); } async function list(page, perPage, options) { assert.strictEqual(typeof page, 'number'); assert.strictEqual(typeof perPage, 'number'); assert.strictEqual(typeof options, 'object'); const data = []; let query = `SELECT ${TASKS_FIELDS} FROM tasks`; if (options.type) { query += ' WHERE TYPE=?'; data.push(options.type); } else if (options.prefix) { query += ' WHERE TYPE LIKE ' + mysql.escape(options.prefix + '%'); } query += ' ORDER BY creationTime DESC, id DESC LIMIT ?,?'; // put latest task first data.push((page-1)*perPage); data.push(perPage); const results = await database.query(query, data); results.forEach(postProcess); return results; } async function setCompletedByType(type, task) { assert.strictEqual(typeof type, 'string'); assert.strictEqual(typeof task, 'object'); const results = await list(1, 1, { type }); if (results.length !== 1) throw new BoxError(BoxError.NOT_FOUND, 'No such task'); await setCompleted(results[0].id, task); } async function add(type, args) { assert.strictEqual(typeof type, 'string'); assert(Array.isArray(args)); const result = await database.query('INSERT INTO tasks (type, argsJson, percent, message, pending) VALUES (?, ?, ?, ?, ?)', [ type, JSON.stringify(args), 0, 'Queued', true ]); return String(result.insertId); } async function stopTask(id) { assert.strictEqual(typeof id, 'string'); if (!gTasks[id]) throw new BoxError(BoxError.BAD_STATE, 'task is not active'); debug(`stopTask: stopping task ${id}`); await shell.sudo([ STOP_TASK_CMD, id, ], {}); // note: this is stopping the systemd-run task. the sudo will exit when this exits } async function startTask(id, options) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof options, 'object'); const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`; debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`); const ac = new AbortController(); gTasks[id] = ac; const sudoOptions = { preserveEnv: true, encoding: 'utf8', abortSignal: ac.signal, timeout: options.timeout || 0, onTimeout: async () => { // custom stop because kill won't do. the task is running in some other process tree debug(`onTimeout: ${id}`); await stopTask(id); } }; safe(update(id, { pending: false }), { debug }); // background. we have to create the cp immediately to prevent race with stopTask() const [sudoError] = await safe(shell.sudo([ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ], sudoOptions)); if (!gTasks[id]) { // when box code is shutting down, don't update the task status as "crashed". see stopAllTasks() debug(`startTask: ${id} completed as a result of box shutdown`); return null; } delete gTasks[id]; const task = await get(id); if (!task) return null; // task disappeared on us. this can happen when db got cleared in tests if (task.completed) { // task completed. we can trust the db result debug(`startTask: ${id} completed. error: %o`, task.error); if (task.error) throw task.error; return task.result; } assert.ok(sudoError, 'sudo should have errored because task did not complete!'); // taskworker.sh forwards the exit code of the actual worker. It's either a raw signal number OR the exit code let taskError = null; if (sudoError.timedOut) taskError = { message: `Task ${id} timed out`, code: exports.ETIMEOUT }; else if (sudoError.code === 70) taskError = { message: `Task ${id} stopped`, code: exports.ESTOPPED }; // set by taskworker SIGTERM else if (sudoError.code === 9 /* SIGKILL */) taskError = { message: `Task ${id} ran out of memory or terminated`, code: exports.ECRASHED }; // SIGTERM with oom gets set as 2 by nodejs else if (sudoError.code === 50) taskError = { message:`Task ${id} crashed with code ${sudoError.code}`, code: exports.ECRASHED }; else taskError = { message:`Task ${id} crashed with unknown code ${sudoError.code}`, code: exports.ECRASHED }; debug(`startTask: ${id} done. error: %o`, taskError); await safe(setCompleted(id, { error: taskError }), { debug }); throw taskError; } async function stopAllTasks() { const acs = Object.values(gTasks); debug(`stopAllTasks: ${acs.length} tasks are running. sending abort signal`); gTasks = {}; // this signals startTask() to not set completion status as "crashed" acs.forEach(ac => ac.abort()); // cleanup all the sudos and systemd-run const [error] = await safe(shell.sudo([ STOP_TASK_CMD, 'all' ], { cwd: paths.baseDir() })); if (error) debug(`stopAllTasks: error stopping stasks: ${error.message}`); } async function getLogs(task, options) { assert.strictEqual(typeof task, 'object'); assert(options && typeof options === 'object'); assert.strictEqual(typeof options.lines, 'number'); assert.strictEqual(typeof options.format, 'string'); assert.strictEqual(typeof options.follow, 'boolean'); const logFile = `${paths.TASKS_LOG_DIR}/${task.id}.log`; if (!task.active && !safe.fs.existsSync(logFile)) throw new BoxError(BoxError.FS_ERROR, 'Log file removed/missing'); // logrotated const cp = logs.tail([`${paths.TASKS_LOG_DIR}/${task.id}.log`], { lines: options.lines, follow: options.follow }); const logStream = new logs.LogStream({ format: options.format || 'json', source: task.id }); logStream.on('close', () => cp.terminate()); // the caller has to call destroy() on logStream. destroy() of Transform emits 'close' cp.stdout.pipe(logStream); return logStream; } // removes all fields that are strictly private and should never be returned by API calls function removePrivateFields(task) { return _.pick(task, ['id', 'type', 'percent', 'message', 'error', 'running', 'active', 'creationTime', 'result', 'ts', 'success']); } async function del(id) { assert.strictEqual(typeof id, 'string'); const result = await database.query('DELETE FROM tasks WHERE id = ?', [ id ]); if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found'); }