278 lines
9.0 KiB
JavaScript
278 lines
9.0 KiB
JavaScript
'use strict';
|
|
|
|
exports = module.exports = {
|
|
get: get,
|
|
add: add,
|
|
update: update,
|
|
setCompleted: setCompleted,
|
|
setCompletedByType: setCompletedByType,
|
|
listByTypePaged: listByTypePaged,
|
|
|
|
getLogs: getLogs,
|
|
|
|
startTask: startTask,
|
|
stopTask: stopTask,
|
|
stopAllTasks: stopAllTasks,
|
|
|
|
removePrivateFields: removePrivateFields,
|
|
|
|
// task types. if you add a task here, fill up the function table in taskworker
|
|
TASK_APP: 'app',
|
|
TASK_BACKUP: 'backup',
|
|
TASK_UPDATE: 'update',
|
|
TASK_RENEW_CERTS: 'renewcerts',
|
|
TASK_PREPARE_DASHBOARD_DOMAIN: 'prepareDashboardDomain',
|
|
TASK_CLEAN_BACKUPS: 'cleanBackups',
|
|
TASK_SYNC_EXTERNAL_LDAP: 'syncExternalLdap',
|
|
|
|
// error codes
|
|
ESTOPPED: 'stopped',
|
|
ECRASHED: 'crashed',
|
|
ETIMEOUT: 'timeout',
|
|
|
|
// testing
|
|
_TASK_IDENTITY: '_identity',
|
|
_TASK_CRASH: '_crash',
|
|
_TASK_ERROR: '_error',
|
|
_TASK_SLEEP: '_sleep'
|
|
};
|
|
|
|
let assert = require('assert'),
|
|
async = require('async'),
|
|
BoxError = require('./boxerror.js'),
|
|
child_process = require('child_process'),
|
|
debug = require('debug')('box:tasks'),
|
|
paths = require('./paths.js'),
|
|
safe = require('safetydance'),
|
|
spawn = require('child_process').spawn,
|
|
split = require('split'),
|
|
taskdb = require('./taskdb.js'),
|
|
_ = require('underscore');
|
|
|
|
let gTasks = {}; // indexed by task id
|
|
|
|
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
|
|
|
|
function postProcess(result) {
|
|
assert.strictEqual(typeof result, 'object');
|
|
|
|
result.active = !!gTasks[result.id];
|
|
// we rely on 'percent' to determine success. maybe this can become a db field
|
|
result.success = result.percent === 100 && !result.error;
|
|
|
|
// the error in db will be empty if we didn't get a chance to handle task exit
|
|
if (!result.active && result.percent !== 100 && !result.error) {
|
|
result.error = { message: 'Cloudron crashed/stopped', code: exports.ECRASHED };
|
|
}
|
|
}
|
|
|
|
function get(id, callback) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
taskdb.get(id, function (error, task) {
|
|
if (error) return callback(error);
|
|
|
|
postProcess(task);
|
|
|
|
callback(null, task);
|
|
});
|
|
}
|
|
|
|
function update(id, task, callback) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
assert.strictEqual(typeof task, 'object');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
debug(`${id}: ${JSON.stringify(task)}`);
|
|
|
|
taskdb.update(id, task, function (error) {
|
|
if (error) return callback(error);
|
|
|
|
callback();
|
|
});
|
|
}
|
|
|
|
function setCompleted(id, task, callback) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
assert.strictEqual(typeof task, 'object');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
debug(`setCompleted - ${id}: ${JSON.stringify(task)}`);
|
|
|
|
update(id, _.extend({ percent: 100 }, task), callback);
|
|
}
|
|
|
|
function setCompletedByType(type, task, callback) {
|
|
assert.strictEqual(typeof type, 'string');
|
|
assert.strictEqual(typeof task, 'object');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
listByTypePaged(type, 1, 1, function (error, results) {
|
|
if (error) return callback(error);
|
|
if (results.length !== 1) return callback(new BoxError(BoxError.NOT_FOUND));
|
|
|
|
setCompleted(results[0].id, task, function (error) {
|
|
if (error) return callback(error);
|
|
|
|
callback();
|
|
});
|
|
});
|
|
}
|
|
|
|
function add(type, args, callback) {
|
|
assert.strictEqual(typeof type, 'string');
|
|
assert(Array.isArray(args));
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
taskdb.add({ type: type, percent: 1, message: 'Queued', args: args }, function (error, taskId) {
|
|
if (error) return callback(error);
|
|
|
|
callback(null, taskId);
|
|
});
|
|
}
|
|
|
|
function startTask(taskId, options, callback) {
|
|
assert.strictEqual(typeof taskId, 'string');
|
|
assert.strictEqual(typeof options, 'object');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${taskId}.log`;
|
|
let fd = safe.fs.openSync(logFile, 'a'); // will autoclose. append is for apptask logs
|
|
if (!fd) {
|
|
debug(`startTask: unable to get log filedescriptor ${safe.error.message}`);
|
|
return callback(new BoxError(BoxError.FS_ERROR, safe.error));
|
|
}
|
|
|
|
debug(`startTask - starting task ${taskId}. logs at ${logFile}`);
|
|
|
|
let killTimerId = null, timedOut = false;
|
|
|
|
gTasks[taskId] = child_process.fork(`${__dirname}/taskworker.js`, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc
|
|
gTasks[taskId].once('exit', function (code, signal) {
|
|
debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`);
|
|
|
|
if (options.timeout) clearTimeout(killTimerId);
|
|
|
|
get(taskId, function (error, task) {
|
|
let taskError;
|
|
if (!error && task.percent !== 100) { // task crashed or was killed by us
|
|
taskError = {
|
|
message: code === 0 ? `Task ${taskId} ${timedOut ? 'timed out' : 'stopped'}` : `Task ${taskId} crashed with code ${code} and signal ${signal}`,
|
|
code: code === 0 ? (timedOut ? exports.ETIMEOUT : exports.ESTOPPED) : exports.ECRASHED
|
|
};
|
|
// note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit
|
|
setCompleted(taskId, { error: taskError }, NOOP_CALLBACK);
|
|
} else if (!error && task.error) {
|
|
taskError = task.error;
|
|
} else if (!task) { // db got cleared in tests
|
|
taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${taskId}`);
|
|
}
|
|
|
|
delete gTasks[taskId];
|
|
|
|
callback(taskError, task ? task.result : null);
|
|
|
|
debug(`startTask: ${taskId} done`);
|
|
});
|
|
});
|
|
|
|
if (options.timeout) {
|
|
killTimerId = setTimeout(function () {
|
|
debug(`startTask: task ${taskId} took too long. killing`);
|
|
timedOut = true;
|
|
stopTask(taskId, NOOP_CALLBACK);
|
|
}, options.timeout);
|
|
}
|
|
}
|
|
|
|
function stopTask(id, callback) {
|
|
assert.strictEqual(typeof id, 'string');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
if (!gTasks[id]) return callback(new BoxError(BoxError.BAD_STATE, 'task is not active'));
|
|
|
|
debug(`stopTask: stopping task ${id}`);
|
|
|
|
gTasks[id].kill('SIGTERM'); // this will end up calling the 'exit' signal handler
|
|
|
|
callback(null);
|
|
}
|
|
|
|
function stopAllTasks(callback) {
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
async.eachSeries(Object.keys(gTasks), function (id, iteratorDone) {
|
|
stopTask(id, () => iteratorDone()); // ignore any error
|
|
}, callback);
|
|
}
|
|
|
|
function listByTypePaged(type, page, perPage, callback) {
|
|
assert(typeof type === 'string' || type === null);
|
|
assert.strictEqual(typeof page, 'number');
|
|
assert.strictEqual(typeof perPage, 'number');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
taskdb.listByTypePaged(type, page, perPage, function (error, tasks) {
|
|
if (error) return callback(error);
|
|
|
|
tasks.forEach(postProcess);
|
|
|
|
callback(null, tasks);
|
|
});
|
|
}
|
|
|
|
function getLogs(taskId, options, callback) {
|
|
assert.strictEqual(typeof taskId, 'string');
|
|
assert(options && typeof options === 'object');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
assert.strictEqual(typeof options.lines, 'number');
|
|
assert.strictEqual(typeof options.format, 'string');
|
|
assert.strictEqual(typeof options.follow, 'boolean');
|
|
|
|
debug(`Getting logs for ${taskId}`);
|
|
|
|
var lines = options.lines === -1 ? '+1' : options.lines,
|
|
format = options.format || 'json',
|
|
follow = options.follow;
|
|
|
|
let cmd = '/usr/bin/tail';
|
|
var args = [ '--lines=' + lines ];
|
|
|
|
if (follow) args.push('--follow', '--retry', '--quiet'); // same as -F. to make it work if file doesn't exist, --quiet to not output file headers, which are no logs
|
|
args.push(`${paths.TASKS_LOG_DIR}/${taskId}.log`);
|
|
|
|
var cp = spawn(cmd, args);
|
|
|
|
var transformStream = split(function mapper(line) {
|
|
if (format !== 'json') return line + '\n';
|
|
|
|
var data = line.split(' '); // logs are <ISOtimestamp> <msg>
|
|
var timestamp = (new Date(data[0])).getTime();
|
|
if (isNaN(timestamp)) timestamp = 0;
|
|
var message = line.slice(data[0].length+1);
|
|
|
|
// ignore faulty empty logs
|
|
if (!timestamp && !message) return;
|
|
|
|
return JSON.stringify({
|
|
realtimeTimestamp: timestamp * 1000,
|
|
message: message,
|
|
source: taskId
|
|
}) + '\n';
|
|
});
|
|
|
|
transformStream.close = cp.kill.bind(cp, 'SIGKILL'); // closing stream kills the child process
|
|
|
|
cp.stdout.pipe(transformStream);
|
|
|
|
callback(null, transformStream);
|
|
}
|
|
|
|
// removes all fields that are strictly private and should never be returned by API calls
|
|
function removePrivateFields(task) {
|
|
var result = _.pick(task, 'id', 'type', 'percent', 'message', 'error', 'active', 'creationTime', 'result', 'ts', 'success');
|
|
return result;
|
|
}
|