'use strict'; exports = module.exports = { get, add, update, setCompleted, setCompletedByType, listByTypePaged, getLogs, startTask, stopTask, stopAllTasks, removePrivateFields, _del: del, // task types. if you add a task here, fill up the function table in taskworker and dashboard client.js TASK_APP: 'app', TASK_BACKUP: 'backup', TASK_UPDATE: 'update', TASK_CHECK_CERTS: 'checkCerts', TASK_SYNC_DYNDNS: 'syncDyndns', TASK_PREPARE_DASHBOARD_LOCATION: 'prepareDashboardLocation', TASK_CLEAN_BACKUPS: 'cleanBackups', TASK_SYNC_EXTERNAL_LDAP: 'syncExternalLdap', TASK_CHANGE_MAIL_LOCATION: 'changeMailLocation', TASK_SYNC_DNS_RECORDS: 'syncDnsRecords', TASK_UPDATE_DISK_USAGE: 'updateDiskUsage', // error codes ESTOPPED: 'stopped', ECRASHED: 'crashed', ETIMEOUT: 'timeout', // testing _TASK_IDENTITY: '_identity', _TASK_CRASH: '_crash', _TASK_ERROR: '_error', _TASK_SLEEP: '_sleep' }; const assert = require('assert'), BoxError = require('./boxerror.js'), constants = require('./constants.js'), database = require('./database.js'), debug = require('debug')('box:tasks'), fs = require('fs'), logs = require('./logs.js'), path = require('path'), paths = require('./paths.js'), safe = require('safetydance'), shell = require('./shell.js')('tasks'), _ = require('underscore'); let gTasks = {}; // indexed by task id const START_TASK_CMD = path.join(__dirname, 'scripts/starttask.sh'); const STOP_TASK_CMD = path.join(__dirname, 'scripts/stoptask.sh'); const TASKS_FIELDS = [ 'id', 'type', 'argsJson', 'percent', 'message', 'errorJson', 'creationTime', 'resultJson', 'ts' ]; function postProcess(task) { assert.strictEqual(typeof task, 'object'); assert(task.argsJson === null || typeof task.argsJson === 'string'); task.args = safe.JSON.parse(task.argsJson) || []; delete task.argsJson; task.id = String(task.id); task.result = JSON.parse(task.resultJson); delete task.resultJson; task.error = safe.JSON.parse(task.errorJson); delete task.errorJson; return task; } function updateStatus(result) { assert.strictEqual(typeof result, 'object'); // running means actively running // pending means not actively running // active mean task is 'done' or not. at this point, clients can stop polling this task. // the apptaskmanager sets percent=1 when queued. just a hack to figure non-started but scheduled tasks result.running = !!gTasks[result.id]; result.active = result.running || result.percent === 1; result.pending = !gTasks[result.id] && result.active; // we rely on 'percent' to determine success. maybe this can become a db field result.success = result.percent === 100 && !result.error; // the error in db will be empty if we didn't get a chance to handle task exit if (!result.active && result.percent !== 100 && !result.error) { result.error = { message: 'Task was stopped because the server was restarted or crashed', code: exports.ECRASHED }; } return result; } async function get(id) { assert.strictEqual(typeof id, 'string'); const result = await database.query(`SELECT ${TASKS_FIELDS} FROM tasks WHERE id = ?`, [ id ]); if (result.length === 0) return null; return updateStatus(postProcess(result[0])); } async function update(id, task) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof task, 'object'); debug(`update ${id}: ${JSON.stringify(task)}`); const args = [], fields = []; for (const k in task) { if (k === 'result' || k === 'error') { fields.push(`${k}Json = ?`); args.push(JSON.stringify(task[k])); } else { fields.push(k + ' = ?'); args.push(task[k]); } } args.push(id); const result = await database.query('UPDATE tasks SET ' + fields.join(', ') + ' WHERE id = ?', args); if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found'); } async function setCompleted(id, task) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof task, 'object'); debug(`setCompleted - ${id}: ${JSON.stringify(task)}`); await update(id, Object.assign({ percent: 100 }, task)); } async function setCompletedByType(type, task) { assert.strictEqual(typeof type, 'string'); assert.strictEqual(typeof task, 'object'); const results = await listByTypePaged(type, 1, 1); if (results.length !== 1) throw new BoxError(BoxError.NOT_FOUND, 'No such task'); await setCompleted(results[0].id, task); } async function add(type, args) { assert.strictEqual(typeof type, 'string'); assert(Array.isArray(args)); const result = await database.query('INSERT INTO tasks (type, argsJson, percent, message) VALUES (?, ?, ?, ?)', [ type, JSON.stringify(args), 0, 'Queued' ]); return String(result.insertId); } function startTask(id, options, onTaskFinished) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof options, 'object'); assert(typeof onTaskFinished === 'undefined' || typeof onTaskFinished === 'function'); const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${id}.log`; debug(`startTask - starting task ${id} with options ${JSON.stringify(options)}. logs at ${logFile}`); let killTimerId = null, timedOut = false; const sudoOptions = { preserveEnv: true, logStream: null }; if (constants.TEST) sudoOptions.logStream = fs.createWriteStream('/dev/null'); // without this output is messed up, not sure why gTasks[id] = shell.sudo([ START_TASK_CMD, id, logFile, options.nice || 0, options.memoryLimit || 400, options.oomScoreAdjust || 0 ], sudoOptions, async function (sudoError) { if (!gTasks[id]) return; // ignore task exit since we are shutting down. see stopAllTasks const code = sudoError ? sudoError.code : 0; debug(`startTask: ${id} completed with code ${code}`); if (options.timeout) clearTimeout(killTimerId); const [getError, task] = await safe(get(id)); let taskError = null; if (!getError && task.percent !== 100) { // taskworker crashed or was killed by us if (code === 0) { taskError = { message: `Task ${id} ${timedOut ? 'timed out' : 'stopped'}` , code: timedOut ? exports.ETIMEOUT : exports.ESTOPPED }; } else { // task crashed. for code, maybe we can check systemctl show box-task-1707 -p ExecMainStatus taskError = { message: code === 2 ? `Task ${id} crashed as it ran out of memory` : `Task ${id} crashed with code ${code}`, code: exports.ECRASHED }; } // note that despite the update() here, we should handle the case where the box code was restarted and never got taskworker exit await safe(setCompleted(id, { error: taskError })); } else if (!getError && task.error) { taskError = task.error; } else if (!task) { // db got cleared in tests taskError = new BoxError(BoxError.NOT_FOUND, `No such task ${id}`); } delete gTasks[id]; if (onTaskFinished) onTaskFinished(taskError, task ? task.result : null); debug(`startTask: ${id} done. error: %o`, taskError); }); if (options.timeout) { killTimerId = setTimeout(async function () { debug(`startTask: task ${id} took too long. killing`); timedOut = true; const [error] = await safe(stopTask(id)); if (error) debug(`startTask: error stopping task: ${error.message}`); }, options.timeout); } } async function stopTask(id) { assert.strictEqual(typeof id, 'string'); if (!gTasks[id]) throw new BoxError(BoxError.BAD_STATE, 'task is not active'); debug(`stopTask: stopping task ${id}`); await shell.promises.sudo([ STOP_TASK_CMD, id, ], {}); } async function stopAllTasks() { debug('stopAllTasks: stopping all tasks'); gTasks = {}; // this signals startTask() to not set completion status as "crashed" const [error] = await safe(shell.promises.sudo([ STOP_TASK_CMD, 'all' ], { cwd: paths.baseDir() })); if (error) debug(`stopAllTasks: error stopping stasks: ${error.message}`); } async function listByTypePaged(type, page, perPage) { assert(typeof type === 'string' || type === null); assert.strictEqual(typeof page, 'number'); assert.strictEqual(typeof perPage, 'number'); const data = []; let query = `SELECT ${TASKS_FIELDS} FROM tasks`; if (type) { query += ' WHERE TYPE=?'; data.push(type); } query += ' ORDER BY creationTime DESC, id DESC LIMIT ?,?'; // put latest task first data.push((page-1)*perPage); data.push(perPage); const results = await database.query(query, data); results.forEach(postProcess); results.forEach(updateStatus); return results; } async function getLogs(task, options) { assert.strictEqual(typeof task, 'object'); assert(options && typeof options === 'object'); assert.strictEqual(typeof options.lines, 'number'); assert.strictEqual(typeof options.format, 'string'); assert.strictEqual(typeof options.follow, 'boolean'); const logFile = `${paths.TASKS_LOG_DIR}/${task.id}.log`; if (!task.active && !safe.fs.existsSync(logFile)) throw new BoxError(BoxError.FS_ERROR, 'Log file removed/missing'); // logrotated const cp = logs.tail([`${paths.TASKS_LOG_DIR}/${task.id}.log`], { lines: options.lines, follow: options.follow }); const logStream = new logs.LogStream({ format: options.format || 'json', source: task.id }); logStream.on('close', () => cp.terminate()); // the caller has to call destroy() on logStream. destroy() of Transform emits 'close' cp.stdout.pipe(logStream); return logStream; } // removes all fields that are strictly private and should never be returned by API calls function removePrivateFields(task) { return _.pick(task, 'id', 'type', 'percent', 'message', 'error', 'active', 'pending', 'creationTime', 'result', 'ts', 'success'); } async function del(id) { assert.strictEqual(typeof id, 'string'); const result = await database.query('DELETE FROM tasks WHERE id = ?', [ id ]); if (result.affectedRows !== 1) throw new BoxError(BoxError.NOT_FOUND, 'Task not found'); }