12e073e8cf
mostly because code is being autogenerated by all the AI stuff using this prefix. it's also used in the stack trace.
288 lines
10 KiB
JavaScript
288 lines
10 KiB
JavaScript
'use strict';
|
|
|
|
exports = module.exports = {
|
|
get,
|
|
add,
|
|
update,
|
|
setCompleted,
|
|
setCompletedByType,
|
|
listByTypePaged,
|
|
|
|
getLogs,
|
|
|
|
startTask,
|
|
stopTask,
|
|
stopAllTasks,
|
|
|
|
removePrivateFields,
|
|
|
|
_del: del,
|
|
|
|
// task types. if you add a task here, fill up the function table in taskworker and dashboard constants.js
|
|
// '_' prefix is removed for lookup
|
|
TASK_APP: 'app',
|
|
|
|
// "prefix" allows us to locate the tasks of a specific app or backup target
|
|
TASK_APP_BACKUP_PREFIX: 'appBackup_',
|
|
TASK_FULL_BACKUP_PREFIX: 'backup_', // full backup
|
|
TASK_CLEAN_BACKUPS_PREFIX: 'cleanBackups_',
|
|
|
|
TASK_BOX_UPDATE: 'boxUpdate',
|
|
TASK_CHECK_CERTS: 'checkCerts',
|
|
TASK_SYNC_DYNDNS: 'syncDyndns',
|
|
TASK_PREPARE_DASHBOARD_LOCATION: 'prepareDashboardLocation',
|
|
TASK_SYNC_EXTERNAL_LDAP: 'syncExternalLdap',
|
|
TASK_CHANGE_MAIL_LOCATION: 'changeMailLocation',
|
|
TASK_SYNC_DNS_RECORDS: 'syncDnsRecords',
|
|
|
|
// error codes
|
|
ESTOPPED: 'stopped',
|
|
ECRASHED: 'crashed',
|
|
ETIMEOUT: 'timeout',
|
|
|
|
// testing
|
|
_TASK_IDENTITY: 'identity',
|
|
_TASK_CRASH: 'crash',
|
|
_TASK_ERROR: 'error',
|
|
_TASK_SLEEP: 'sleep'
|
|
};
|
|
|
|
const assert = require('node:assert'),
|
|
BoxError = require('./boxerror.js'),
|
|
database = require('./database.js'),
|
|
debug = require('debug')('box:tasks'),
|
|
logs = require('./logs.js'),
|
|
path = require('node:path'),
|
|
paths = require('./paths.js'),
|
|
safe = require('safetydance'),
|
|
shell = require('./shell.js')('tasks'),
|
|
_ = require('./underscore.js');
|
|
|
|
let gTasks = {}; // holds AbortControllers indexed by task id
|
|
|
|
const START_TASK_CMD = path.join(__dirname, 'scripts/starttask.sh');
|
|
const STOP_TASK_CMD = path.join(__dirname, 'scripts/stoptask.sh');
|
|
|
|
const TASKS_FIELDS = [ 'id', 'type', 'argsJson', 'percent', 'pending', 'completed', 'message', 'errorJson', 'creationTime', 'resultJson', 'ts' ];
|
|
|
|
function postProcess(task) {
|
|
assert.strictEqual(typeof task, 'object');
|
|
|
|
assert(task.argsJson === null || typeof task.argsJson === 'string');
|
|
task.args = safe.JSON.parse(task.argsJson) || [];
|
|
delete task.argsJson;
|
|
|
|
task.id = String(task.id);
|
|
|
|
task.pending = !!task.pending;
|
|
task.completed = !!task.completed;
|
|
|
|
task.result = JSON.parse(task.resultJson);
|
|
delete task.resultJson;
|
|
|
|
task.error = safe.JSON.parse(task.errorJson);
|
|
delete task.errorJson;
|
|
|
|
// result.pending - task is scheduled to run at some point
|
|
// result.completed - task finished and exit/crash was cleanly collected. internal flag.
|
|
task.running = !!gTasks[task.id]; // running means actively running
|
|
task.active = task.running || task.pending; // active mean task is 'done'. at this point, clients can stop polling this task.
|
|
task.success = task.completed && !task.error; // if task has completed without an error
|
|
|
|
// the error in db will be empty if task is done but the completed flag is not set
|
|
if (!task.active && !task.completed) {
|
|
task.error = { message: 'Task was stopped because the server restarted or crashed', code: exports.ECRASHED };
|
|
}
|
|
|
|
return task;
|
|
}
|
|
|
|
async function get(id) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
|
|
const result = await database.query(`SELECT ${TASKS_FIELDS} FROM tasks WHERE id = ?`, [ id ]);
|
|
if (result.length === 0) return null;
|
|
|
|
return postProcess(result[0]);
|
|
}
|
|
|
|
async function update(id, task) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
assert.strictEqual(typeof task, 'object');
|
|
|
|
debug(`updating task ${id} with: ${JSON.stringify(task)}`);
|
|
|
|
const args = [], fields = [];
|
|
for (const k in task) {
|
|
if (k === 'result' || k === 'error') {
|
|
fields.push(`${k}Json = ?`);
|
|
args.push(JSON.stringify(task[k]));
|
|
} else {
|
|
fields.push(k + ' = ?');
|
|
args.push(task[k]);
|
|
}
|
|
}
|
|
args.push(id);
|
|
|
|
const result = await database.query('UPDATE tasks SET ' + fields.join(', ') + ' WHERE id = ?', args);
|
|
if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found');
|
|
}
|
|
|
|
async function setCompleted(id, task) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
assert.strictEqual(typeof task, 'object');
|
|
|
|
debug(`setCompleted - ${id}: ${JSON.stringify(task)}`);
|
|
|
|
await update(id, Object.assign({ completed: true }, task));
|
|
}
|
|
|
|
async function setCompletedByType(type, task) {
|
|
assert.strictEqual(typeof type, 'string');
|
|
assert.strictEqual(typeof task, 'object');
|
|
|
|
const results = await listByTypePaged(type, 1, 1);
|
|
if (results.length !== 1) throw new BoxError(BoxError.NOT_FOUND, 'No such task');
|
|
|
|
await setCompleted(results[0].id, task);
|
|
}
|
|
|
|
async function add(type, args) {
|
|
assert.strictEqual(typeof type, 'string');
|
|
assert(Array.isArray(args));
|
|
|
|
const result = await database.query('INSERT INTO tasks (type, argsJson, percent, message, pending) VALUES (?, ?, ?, ?, ?)', [ type, JSON.stringify(args), 0, 'Queued', true ]);
|
|
return String(result.insertId);
|
|
}
|
|
|
|
async function startTask(id, options) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
assert.strictEqual(typeof options, 'object');
|
|
|
|
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`;
|
|
debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`);
|
|
|
|
const ac = new AbortController();
|
|
gTasks[id] = ac;
|
|
|
|
const sudoOptions = {
|
|
preserveEnv: true,
|
|
encoding: 'utf8',
|
|
abortSignal: ac.signal,
|
|
timeout: options.timeout || 0,
|
|
onTimeout: async () => { // custom stop because kill won't do. the task is running in some other process tree
|
|
debug(`onTimeout: ${id}`);
|
|
await stopTask(id);
|
|
}
|
|
};
|
|
|
|
safe(update(id, { pending: false }), { debug }); // background. we have to create the cp immediately to prevent race with stopTask()
|
|
|
|
const [sudoError] = await safe(shell.sudo([ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ], sudoOptions));
|
|
|
|
if (!gTasks[id]) { // when box code is shutting down, don't update the task status as "crashed". see stopAllTasks()
|
|
debug(`startTask: ${id} completed as a result of box shutdown`);
|
|
return null;
|
|
}
|
|
|
|
delete gTasks[id];
|
|
|
|
const task = await get(id);
|
|
if (!task) return null; // task disappeared on us. this can happen when db got cleared in tests
|
|
|
|
if (task.completed) { // task completed. we can trust the db result
|
|
debug(`startTask: ${id} completed. error: %o`, task.error);
|
|
if (task.error) throw task.error;
|
|
return task.result;
|
|
}
|
|
|
|
assert.ok(sudoError, 'sudo should have errored because task did not complete!');
|
|
|
|
// taskworker.sh forwards the exit code of the actual worker. It's either a raw signal number OR the exit code
|
|
let taskError = null;
|
|
if (sudoError.timedOut) taskError = { message: `Task ${id} timed out`, code: exports.ETIMEOUT };
|
|
else if (sudoError.code === 70) taskError = { message: `Task ${id} stopped`, code: exports.ESTOPPED }; // set by taskworker SIGTERM
|
|
else if (sudoError.code === 9 /* SIGKILL */) taskError = { message: `Task ${id} ran out of memory or terminated`, code: exports.ECRASHED }; // SIGTERM with oom gets set as 2 by nodejs
|
|
else if (sudoError.code === 50) taskError = { message:`Task ${id} crashed with code ${sudoError.code}`, code: exports.ECRASHED };
|
|
else taskError = { message:`Task ${id} crashed with unknown code ${sudoError.code}`, code: exports.ECRASHED };
|
|
|
|
debug(`startTask: ${id} done. error: %o`, taskError);
|
|
|
|
throw taskError;
|
|
}
|
|
|
|
async function stopTask(id) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
|
|
if (!gTasks[id]) throw new BoxError(BoxError.BAD_STATE, 'task is not active');
|
|
|
|
debug(`stopTask: stopping task ${id}`);
|
|
|
|
await shell.sudo([ STOP_TASK_CMD, id, ], {}); // note: this is stopping the systemd-run task. the sudo will exit when this exits
|
|
}
|
|
|
|
async function stopAllTasks() {
|
|
const acs = Object.values(gTasks);
|
|
debug(`stopAllTasks: ${acs.length} tasks are running. sending abort signal`);
|
|
gTasks = {}; // this signals startTask() to not set completion status as "crashed"
|
|
acs.forEach(ac => ac.abort()); // cleanup all the sudos and systemd-run
|
|
const [error] = await safe(shell.sudo([ STOP_TASK_CMD, 'all' ], { cwd: paths.baseDir() }));
|
|
if (error) debug(`stopAllTasks: error stopping stasks: ${error.message}`);
|
|
}
|
|
|
|
async function listByTypePaged(type, page, perPage) {
|
|
assert(typeof type === 'string' || type === null);
|
|
assert.strictEqual(typeof page, 'number');
|
|
assert.strictEqual(typeof perPage, 'number');
|
|
|
|
const data = [];
|
|
let query = `SELECT ${TASKS_FIELDS} FROM tasks`;
|
|
|
|
if (type) {
|
|
query += ' WHERE TYPE=?';
|
|
data.push(type);
|
|
}
|
|
|
|
query += ' ORDER BY creationTime DESC, id DESC LIMIT ?,?'; // put latest task first
|
|
|
|
data.push((page-1)*perPage);
|
|
data.push(perPage);
|
|
|
|
const results = await database.query(query, data);
|
|
results.forEach(postProcess);
|
|
return results;
|
|
}
|
|
|
|
async function getLogs(task, options) {
|
|
assert.strictEqual(typeof task, 'object');
|
|
assert(options && typeof options === 'object');
|
|
|
|
assert.strictEqual(typeof options.lines, 'number');
|
|
assert.strictEqual(typeof options.format, 'string');
|
|
assert.strictEqual(typeof options.follow, 'boolean');
|
|
|
|
const logFile = `${paths.TASKS_LOG_DIR}/${task.id}.log`;
|
|
|
|
if (!task.active && !safe.fs.existsSync(logFile)) throw new BoxError(BoxError.FS_ERROR, 'Log file removed/missing'); // logrotated
|
|
|
|
const cp = logs.tail([`${paths.TASKS_LOG_DIR}/${task.id}.log`], { lines: options.lines, follow: options.follow });
|
|
const logStream = new logs.LogStream({ format: options.format || 'json', source: task.id });
|
|
logStream.on('close', () => cp.terminate()); // the caller has to call destroy() on logStream. destroy() of Transform emits 'close'
|
|
|
|
cp.stdout.pipe(logStream);
|
|
|
|
return logStream;
|
|
}
|
|
|
|
// removes all fields that are strictly private and should never be returned by API calls
|
|
function removePrivateFields(task) {
|
|
return _.pick(task, ['id', 'type', 'percent', 'message', 'error', 'running', 'active', 'creationTime', 'result', 'ts', 'success']);
|
|
}
|
|
|
|
async function del(id) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
|
|
const result = await database.query('DELETE FROM tasks WHERE id = ?', [ id ]);
|
|
if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found');
|
|
}
|