Files
cloudron-box/src/tasks.js
Girish Ramakrishnan 1ad94708b4 apps have to reconfigured in main thread
they cannot be done in the task process
2023-08-21 21:35:09 +05:30

298 lines
10 KiB
JavaScript

'use strict';
exports = module.exports = {
get,
add,
update,
setCompleted,
setCompletedByType,
listByTypePaged,
getLogs,
startTask,
stopTask,
stopAllTasks,
removePrivateFields,
_del: del,
// task types. if you add a task here, fill up the function table in taskworker and dashboard client.js
TASK_APP: 'app',
TASK_BACKUP: 'backup',
TASK_UPDATE: 'update',
TASK_CHECK_CERTS: 'checkCerts',
TASK_SYNC_DYNDNS: 'syncDyndns',
TASK_PREPARE_DASHBOARD_LOCATION: 'prepareDashboardLocation',
TASK_CLEAN_BACKUPS: 'cleanBackups',
TASK_SYNC_EXTERNAL_LDAP: 'syncExternalLdap',
TASK_CHANGE_MAIL_LOCATION: 'changeMailLocation',
TASK_SYNC_DNS_RECORDS: 'syncDnsRecords',
TASK_UPDATE_DISK_USAGE: 'updateDiskUsage',
// error codes
ESTOPPED: 'stopped',
ECRASHED: 'crashed',
ETIMEOUT: 'timeout',
// testing
_TASK_IDENTITY: '_identity',
_TASK_CRASH: '_crash',
_TASK_ERROR: '_error',
_TASK_SLEEP: '_sleep'
};
const assert = require('assert'),
BoxError = require('./boxerror.js'),
constants = require('./constants.js'),
database = require('./database.js'),
debug = require('debug')('box:tasks'),
fs = require('fs'),
logs = require('./logs.js'),
path = require('path'),
paths = require('./paths.js'),
safe = require('safetydance'),
shell = require('./shell.js'),
_ = require('underscore');
let gTasks = {}; // indexed by task id
const START_TASK_CMD = path.join(__dirname, 'scripts/starttask.sh');
const STOP_TASK_CMD = path.join(__dirname, 'scripts/stoptask.sh');
const TASKS_FIELDS = [ 'id', 'type', 'argsJson', 'percent', 'message', 'errorJson', 'creationTime', 'resultJson', 'ts' ];
function postProcess(task) {
assert.strictEqual(typeof task, 'object');
assert(task.argsJson === null || typeof task.argsJson === 'string');
task.args = safe.JSON.parse(task.argsJson) || [];
delete task.argsJson;
task.id = String(task.id);
task.result = JSON.parse(task.resultJson);
delete task.resultJson;
task.error = safe.JSON.parse(task.errorJson);
delete task.errorJson;
return task;
}
function updateStatus(result) {
assert.strictEqual(typeof result, 'object');
result.active = !!gTasks[result.id];
// we rely on 'percent' to determine success. maybe this can become a db field
result.success = result.percent === 100 && !result.error;
// we rely on 'percent' to determine pending. maybe this can become a db field
result.pending = result.percent === 1;
// the error in db will be empty if we didn't get a chance to handle task exit
if (!result.active && result.percent !== 100 && !result.error) {
result.error = { message: 'Task was stopped because the server was restarted or crashed', code: exports.ECRASHED };
}
return result;
}
async function get(id) {
assert.strictEqual(typeof id, 'string');
const result = await database.query(`SELECT ${TASKS_FIELDS} FROM tasks WHERE id = ?`, [ id ]);
if (result.length === 0) return null;
return updateStatus(postProcess(result[0]));
}
async function update(id, task) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof task, 'object');
debug(`update ${id}: ${JSON.stringify(task)}`);
let args = [ ];
let fields = [ ];
for (let k in task) {
if (k === 'result' || k === 'error') {
fields.push(`${k}Json = ?`);
args.push(JSON.stringify(task[k]));
} else {
fields.push(k + ' = ?');
args.push(task[k]);
}
}
args.push(id);
const result = await database.query('UPDATE tasks SET ' + fields.join(', ') + ' WHERE id = ?', args);
if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found');
}
async function setCompleted(id, task) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof task, 'object');
debug(`setCompleted - ${id}: ${JSON.stringify(task)}`);
await update(id, Object.assign({ percent: 100 }, task));
}
async function setCompletedByType(type, task) {
assert.strictEqual(typeof type, 'string');
assert.strictEqual(typeof task, 'object');
const results = await listByTypePaged(type, 1, 1);
if (results.length !== 1) throw new BoxError(BoxError.NOT_FOUND, 'No such task');
await setCompleted(results[0].id, task);
}
async function add(type, args) {
assert.strictEqual(typeof type, 'string');
assert(Array.isArray(args));
const result = await database.query('INSERT INTO tasks (type, argsJson, percent, message) VALUES (?, ?, ?, ?)', [ type, JSON.stringify(args), 0, 'Queued' ]);
return String(result.insertId);
}
function startTask(id, options, onTaskFinished) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof options, 'object');
assert(typeof onTaskFinished === 'undefined' || typeof onTaskFinished === 'function');
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`;
debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`);
let killTimerId = null, timedOut = false;
const sudoOptions = { preserveEnv: true, logStream: null };
if (constants.TEST) sudoOptions.logStream = fs.createWriteStream('/dev/null'); // without this output is messed up, not sure why
gTasks[id] = shell.sudo('startTask', [ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400 ], sudoOptions, async function (sudoError) {
if (!gTasks[id]) return; // ignore task exit since we are shutting down. see stopAllTasks
const code = sudoError ? sudoError.code : 0;
debug(`startTask: ${id} completed with code ${code}`);
if (options.timeout) clearTimeout(killTimerId);
const [getError, task] = await safe(get(id));
let taskError = null;
if (!getError && task.percent !== 100) { // taskworker crashed or was killed by us
if (code === 0) {
taskError = {
message: `Task ${id} ${timedOut ? 'timed out' : 'stopped'}` ,
code: timedOut ? exports.ETIMEOUT : exports.ESTOPPED
};
} else { // task crashed. for code, maybe we can check systemctl show box-task-1707 -p ExecMainStatus
taskError = {
message: code === 2 ? `Task ${id} crashed as it ran out of memory` : `Task ${id} crashed with code ${code}`,
code: exports.ECRASHED
};
}
// note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit
await safe(setCompleted(id, { error: taskError }));
} else if (!getError && task.error) {
taskError = task.error;
} else if (!task) { // db got cleared in tests
taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${id}`);
}
delete gTasks[id];
if (onTaskFinished) onTaskFinished(taskError, task ? task.result : null);
debug(`startTask: ${id} done. error: %o`, taskError);
});
if (options.timeout) {
killTimerId = setTimeout(async function () {
debug(`startTask: task ${id} took too long. killing`);
timedOut = true;
const [error] = await safe(stopTask(id));
if (error) debug(`startTask: error stopping task: ${error.message}`);
}, options.timeout);
}
}
async function stopTask(id) {
assert.strictEqual(typeof id, 'string');
if (!gTasks[id]) throw new BoxError(BoxError.BAD_STATE, 'task is not active');
debug(`stopTask: stopping task ${id}`);
await shell.promises.sudo('stopTask', [ STOP_TASK_CMD, id, ], {});
}
async function stopAllTasks() {
debug('stopAllTasks: stopping all tasks');
gTasks = {}; // this signals startTask() to not set completion status as "crashed"
const [error] = await safe(shell.promises.sudo('stopTask', [ STOP_TASK_CMD, 'all' ], { cwd: paths.baseDir() }));
if (error) debug(`stopAllTasks: error stopping stasks: ${error.message}`);
}
async function listByTypePaged(type, page, perPage) {
assert(typeof type === 'string' || type === null);
assert.strictEqual(typeof page, 'number');
assert.strictEqual(typeof perPage, 'number');
let data = [];
let query = `SELECT ${TASKS_FIELDS} FROM tasks`;
if (type) {
query += ' WHERE TYPE=?';
data.push(type);
}
query += ' ORDER BY creationTime DESC, id DESC LIMIT ?,?'; // put latest task first
data.push((page-1)*perPage);
data.push(perPage);
const results = await database.query(query, data);
results.forEach(postProcess);
results.forEach(updateStatus);
return results;
}
async function getLogs(task, options) {
assert.strictEqual(typeof task, 'object');
assert(options && typeof options === 'object');
assert.strictEqual(typeof options.lines, 'number');
assert.strictEqual(typeof options.format, 'string');
assert.strictEqual(typeof options.follow, 'boolean');
const logFile = `${paths.TASKS_LOG_DIR}/${task.id}.log`;
if (!task.active && !safe.fs.existsSync(logFile)) throw new BoxError(BoxError.FS_ERROR, 'Log file removed/missing'); // logrotated
const cp = logs.tail([`${paths.TASKS_LOG_DIR}/${task.id}.log`], { lines: options.lines, follow: options.follow });
const logStream = new logs.LogStream({ format: options.format || 'json', source: task.id });
logStream.close = cp.kill.bind(cp, 'SIGKILL'); // hook for caller. closing stream kills the child process
cp.stdout.pipe(logStream);
return logStream;
}
// removes all fields that are strictly private and should never be returned by API calls
function removePrivateFields(task) {
return _.pick(task, 'id', 'type', 'percent', 'message', 'error', 'active', 'pending', 'creationTime', 'result', 'ts', 'success');
}
async function del(id) {
assert.strictEqual(typeof id, 'string');
const result = await database.query('DELETE FROM tasks WHERE id = ?', [ id ]);
if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found');
}