Files
cloudron-box/src/tasks.js
2021-06-16 14:21:19 -07:00

292 lines
9.4 KiB
JavaScript

'use strict';
exports = module.exports = {
get,
add,
update,
setCompleted,
setCompletedByType,
listByTypePaged,
getLogs,
startTask,
stopTask,
stopAllTasks,
removePrivateFields,
// task types. if you add a task here, fill up the function table in taskworker and dashboard client.js
TASK_APP: 'app',
TASK_BACKUP: 'backup',
TASK_UPDATE: 'update',
TASK_CHECK_CERTS: 'checkCerts',
TASK_SETUP_DNS_AND_CERT: 'setupDnsAndCert',
TASK_CLEAN_BACKUPS: 'cleanBackups',
TASK_SYNC_EXTERNAL_LDAP: 'syncExternalLdap',
TASK_CHANGE_MAIL_LOCATION: 'changeMailLocation',
TASK_SYNC_DNS_RECORDS: 'syncDnsRecords',
// error codes
ESTOPPED: 'stopped',
ECRASHED: 'crashed',
ETIMEOUT: 'timeout',
// testing
_TASK_IDENTITY: '_identity',
_TASK_CRASH: '_crash',
_TASK_ERROR: '_error',
_TASK_SLEEP: '_sleep'
};
const assert = require('assert'),
BoxError = require('./boxerror.js'),
debug = require('debug')('box:tasks'),
path = require('path'),
paths = require('./paths.js'),
shell = require('./shell.js'),
spawn = require('child_process').spawn,
split = require('split'),
taskdb = require('./taskdb.js'),
_ = require('underscore');
let gTasks = {}; // indexed by task id
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
const START_TASK_CMD = path.join(__dirname, 'scripts/starttask.sh');
const STOP_TASK_CMD = path.join(__dirname, 'scripts/stoptask.sh');
function postProcess(result) {
assert.strictEqual(typeof result, 'object');
result.active = !!gTasks[result.id];
// we rely on 'percent' to determine success. maybe this can become a db field
result.success = result.percent === 100 && !result.error;
// we rely on 'percent' to determine pending. maybe this can become a db field
result.pending = result.percent === 1;
// the error in db will be empty if we didn't get a chance to handle task exit
if (!result.active && result.percent !== 100 && !result.error) {
result.error = { message: 'Cloudron crashed/stopped', code: exports.ECRASHED };
}
}
function get(id, callback) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof callback, 'function');
taskdb.get(id, function (error, task) {
if (error) return callback(error);
postProcess(task);
callback(null, task);
});
}
function update(id, task, callback) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof task, 'object');
assert.strictEqual(typeof callback, 'function');
debug(`${id}: ${JSON.stringify(task)}`);
taskdb.update(id, task, function (error) {
if (error) return callback(error);
callback();
});
}
function setCompleted(id, task, callback) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof task, 'object');
assert.strictEqual(typeof callback, 'function');
debug(`setCompleted - ${id}: ${JSON.stringify(task)}`);
update(id, _.extend({ percent: 100 }, task), callback);
}
function setCompletedByType(type, task, callback) {
assert.strictEqual(typeof type, 'string');
assert.strictEqual(typeof task, 'object');
assert.strictEqual(typeof callback, 'function');
listByTypePaged(type, 1, 1, function (error, results) {
if (error) return callback(error);
if (results.length !== 1) return callback(new BoxError(BoxError.NOT_FOUND));
setCompleted(results[0].id, task, function (error) {
if (error) return callback(error);
callback();
});
});
}
function add(type, args, callback) {
assert.strictEqual(typeof type, 'string');
assert(Array.isArray(args));
assert.strictEqual(typeof callback, 'function');
taskdb.add({ type: type, percent: 0, message: 'Queued', args: args }, function (error, taskId) {
if (error) return callback(error);
callback(null, taskId);
});
}
function startTask(id, options, callback) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof options, 'object');
assert.strictEqual(typeof callback, 'function');
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`;
debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`);
let killTimerId = null, timedOut = false;
gTasks[id] = shell.sudo('startTask', [ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400 ], { preserveEnv: true }, function (error) {
if (!gTasks[id]) return; // ignore task exit since we are shutting down. see stopAllTasks
const code = error ? error.code : 0;
const signal = error ? error.signal : 0;
debug(`startTask: ${id} completed with code ${code} and signal ${signal}`);
if (options.timeout) clearTimeout(killTimerId);
get(id, function (getError, task) {
let taskError;
if (!getError && task.percent !== 100) { // taskworker crashed or was killed by us
if (code === 0) {
taskError = {
message: `Task ${id} ${timedOut ? 'timed out' : 'stopped'}` ,
code: timedOut ? exports.ETIMEOUT : exports.ESTOPPED
};
} else { // task crashed
taskError = {
message: signal === 9 ? `Task ${id} crashed as it ran out of memory` : `Task ${id} crashed with code ${code} and signal ${signal}`,
code: exports.ECRASHED
};
}
// note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit
setCompleted(id, { error: taskError }, NOOP_CALLBACK);
} else if (!getError && task.error) {
taskError = task.error;
} else if (!task) { // db got cleared in tests
taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${id}`);
}
delete gTasks[id];
callback(taskError, task ? task.result : null);
debug(`startTask: ${id} done`);
});
});
if (options.timeout) {
killTimerId = setTimeout(function () {
debug(`startTask: task ${id} took too long. killing`);
timedOut = true;
stopTask(id, NOOP_CALLBACK);
}, options.timeout);
}
}
function stopTask(id, callback) {
assert.strictEqual(typeof id, 'string');
assert.strictEqual(typeof callback, 'function');
if (!gTasks[id]) return callback(new BoxError(BoxError.BAD_STATE, 'task is not active'));
debug(`stopTask: stopping task ${id}`);
shell.sudo('stopTask', [ STOP_TASK_CMD, id, ], {}, NOOP_CALLBACK);
callback(null);
}
function stopAllTasks(callback) {
assert.strictEqual(typeof callback, 'function');
debug('stopTask: stopping all tasks');
gTasks = {}; // this signals startTask() to not set completion status as "crashed"
shell.sudo('stopTask', [ STOP_TASK_CMD, 'all' ], { cwd: paths.baseDir() }, callback);
}
function listByTypePaged(type, page, perPage, callback) {
assert(typeof type === 'string' || type === null);
assert.strictEqual(typeof page, 'number');
assert.strictEqual(typeof perPage, 'number');
assert.strictEqual(typeof callback, 'function');
taskdb.listByTypePaged(type, page, perPage, function (error, tasks) {
if (error) return callback(error);
tasks.forEach(postProcess);
callback(null, tasks);
});
}
function getLogs(taskId, options, callback) {
assert.strictEqual(typeof taskId, 'string');
assert(options && typeof options === 'object');
assert.strictEqual(typeof callback, 'function');
assert.strictEqual(typeof options.lines, 'number');
assert.strictEqual(typeof options.format, 'string');
assert.strictEqual(typeof options.follow, 'boolean');
debug(`Getting logs for ${taskId}`);
var lines = options.lines === -1 ? '+1' : options.lines,
format = options.format || 'json',
follow = options.follow;
let cmd = '/usr/bin/tail';
var args = [ '--lines=' + lines ];
if (follow) args.push('--follow', '--retry', '--quiet'); // same as -F. to make it work if file doesn't exist, --quiet to not output file headers, which are no logs
args.push(`${paths.TASKS_LOG_DIR}/${taskId}.log`);
var cp = spawn(cmd, args);
var transformStream = split(function mapper(line) {
if (format !== 'json') return line + '\n';
var data = line.split(' '); // logs are <ISOtimestamp> <msg>
var timestamp = (new Date(data[0])).getTime();
if (isNaN(timestamp)) timestamp = 0;
var message = line.slice(data[0].length+1);
// ignore faulty empty logs
if (!timestamp && !message) return;
return JSON.stringify({
realtimeTimestamp: timestamp * 1000,
message: message,
source: taskId
}) + '\n';
});
transformStream.close = cp.kill.bind(cp, 'SIGKILL'); // closing stream kills the child process
cp.stdout.pipe(transformStream);
callback(null, transformStream);
}
// removes all fields that are strictly private and should never be returned by API calls
function removePrivateFields(task) {
var result = _.pick(task, 'id', 'type', 'percent', 'message', 'error', 'active', 'pending', 'creationTime', 'result', 'ts', 'success');
return result;
}