diff --git a/migrations/20181116191032-tasks-add-table.js b/migrations/20181116191032-tasks-add-table.js index 7d64cb04a..500e656ff 100644 --- a/migrations/20181116191032-tasks-add-table.js +++ b/migrations/20181116191032-tasks-add-table.js @@ -1,16 +1,17 @@ 'use strict'; exports.up = function(db, callback) { - var cmd = "CREATE TABLE tasks(" + - "id VARCHAR(32) NOT NULL UNIQUE," + - "argsJson TEXT," + - "percent INTEGER DEFAULT 0," + - "message TEXT," + - "errorMessage TEXT," + - "result TEXT," + - "creationTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP," + - "ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + - "PRIMARY KEY (id))"; + var cmd = 'CREATE TABLE tasks(' + + 'id int NOT NULL AUTO_INCREMENT,' + + 'type VARCHAR(32) NOT NULL,' + + 'argsJson TEXT,' + + 'percent INTEGER DEFAULT 0,' + + 'message TEXT,' + + 'errorMessage TEXT,' + + 'result TEXT,' + + 'creationTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,' + + 'ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,' + + 'PRIMARY KEY (id))'; db.runSql(cmd, function (error) { if (error) console.error(error); diff --git a/migrations/schema.sql b/migrations/schema.sql index df84eb282..61bc25f2e 100644 --- a/migrations/schema.sql +++ b/migrations/schema.sql @@ -202,7 +202,8 @@ CREATE TABLE IF NOT EXISTS subdomains( UNIQUE (subdomain, domain)); CREATE TABLE IF NOT EXISTS tasks( - id VARCHAR(32) NOT NULL UNIQUE, + id int NOT NULL AUTO_INCREMENT, + type VARCHAR(32) NOT NULL, percent INTEGER DEFAULT 0, message TEXT, errorMessage TEXT, diff --git a/src/routes/backups.js b/src/routes/backups.js index 3986adeee..c354ac469 100644 --- a/src/routes/backups.js +++ b/src/routes/backups.js @@ -36,10 +36,10 @@ function list(req, res, next) { function startBackup(req, res, next) { // note that cloudron.backup only waits for backup initiation and not for backup to complete // backup progress can be checked up ny polling the progress api call - tasks.startTask(tasks.TASK_BACKUP, {}, auditSource(req), function (error) { + tasks.startTask(tasks.TASK_BACKUP, {}, auditSource(req), function (error, taskId) { if (error && error.reason === TasksError.BAD_STATE) return next(new HttpError(409, error.message)); if (error) return next(new HttpError(500, error)); - next(new HttpSuccess(202, {})); + next(new HttpSuccess(202, { taskId })); }); } diff --git a/src/routes/cloudron.js b/src/routes/cloudron.js index d4a250382..6c7287961 100644 --- a/src/routes/cloudron.js +++ b/src/routes/cloudron.js @@ -65,12 +65,12 @@ function getDisks(req, res, next) { function update(req, res, next) { // this only initiates the update, progress can be checked via the progress route - updater.updateToLatest(auditSource(req), function (error) { + updater.updateToLatest(auditSource(req), function (error, taskId) { if (error && error.reason === UpdaterError.ALREADY_UPTODATE) return next(new HttpError(422, error.message)); if (error && error.reason === UpdaterError.BAD_STATE) return next(new HttpError(409, error.message)); if (error) return next(new HttpError(500, error)); - next(new HttpSuccess(202, {})); + next(new HttpSuccess(202, { taskId })); }); } diff --git a/src/routes/sysadmin.js b/src/routes/sysadmin.js index 181a56b06..62253efe7 100644 --- a/src/routes/sysadmin.js +++ b/src/routes/sysadmin.js @@ -26,11 +26,11 @@ function backup(req, res, next) { // note that cloudron.backup only waits for backup initiation and not for backup to complete // backup progress can be checked up ny polling the progress api call var auditSource = { userId: null, username: 'sysadmin' }; - tasks.startTask(tasks.TASK_BACKUP, {}, auditSource, function (error) { + tasks.startTask(tasks.TASK_BACKUP, {}, auditSource, function (error, taskId) { if (error && error.reason === TasksError.BAD_STATE) return next(new HttpError(409, error.message)); if (error) return next(new HttpError(500, error)); - next(new HttpSuccess(202, {})); + next(new HttpSuccess(202, { taskId })); }); } @@ -39,12 +39,12 @@ function update(req, res, next) { // this only initiates the update, progress can be checked via the progress route var auditSource = { userId: null, username: 'sysadmin' }; - updater.updateToLatest(auditSource, function (error) { + updater.updateToLatest(auditSource, function (error, taskId) { if (error && error.reason === UpdaterError.ALREADY_UPTODATE) return next(new HttpError(422, error.message)); if (error && error.reason === UpdaterError.BAD_STATE) return next(new HttpError(409, error.message)); if (error) return next(new HttpError(500, error)); - next(new HttpSuccess(202, {})); + next(new HttpSuccess(202, { taskId })); }); } diff --git a/src/taskdb.js b/src/taskdb.js index 16ed3c8bc..99250c0c9 100644 --- a/src/taskdb.js +++ b/src/taskdb.js @@ -1,17 +1,18 @@ 'use strict'; exports = module.exports = { + get: get, + add: add, update: update, - get: get + del: del }; let assert = require('assert'), database = require('./database.js'), DatabaseError = require('./databaseerror'), - safe = require('safetydance'), - _ = require('underscore'); + safe = require('safetydance'); -const TASKS_FIELDS = [ 'id', 'argsJson', 'percent', 'message', 'errorMessage', 'creationTime', 'result', 'ts' ]; +const TASKS_FIELDS = [ 'id', 'type', 'argsJson', 'percent', 'message', 'errorMessage', 'creationTime', 'result', 'ts' ]; function postProcess(result) { assert.strictEqual(typeof result, 'object'); @@ -19,39 +20,42 @@ function postProcess(result) { assert(result.argsJson === null || typeof result.argsJson === 'string'); result.args = safe.JSON.parse(result.argsJson) || {}; delete result.argsJson; + + result.id = String(result.id); } -function update(id, progress, callback) { - assert.strictEqual(typeof id, 'string'); - assert.strictEqual(typeof progress, 'object'); +function add(task, callback) { + assert.strictEqual(typeof task, 'object'); assert.strictEqual(typeof callback, 'function'); - let data = _.extend({ id: id }, progress); + const query = 'INSERT INTO tasks (type, argsJson, percent, message) VALUES (?, ?, ?, ?)'; + const args = [ task.type, JSON.stringify(task.args), task.percent, task.message ]; - let keys = [ ], - questionMarks = Array(Object.keys(data).length).fill('?').join(','), - fields = [ ], values = [ ]; - - for (var f in data) { - let key, value; - if (f === 'args') { - key = 'argsJson'; - value = JSON.stringify(data[f]); - } else { - key = f; - value = data[f]; - } - keys.push(key); - fields.push(`${key} = ?`); - values.push(value); // for the INSERT fields - } - - values = values.concat(values); // for the UPDATE fields - - database.query(`INSERT INTO tasks (${keys.join(', ')}) VALUES (${questionMarks}) ON DUPLICATE KEY UPDATE ${fields}`, values, function (error) { + database.query(query, args, function (error, result) { if (error) return callback(new DatabaseError(DatabaseError.INTERNAL_ERROR, error)); - callback(null); + callback(null, String(result.insertId)); + }); +} + +function update(id, data, callback) { + assert.strictEqual(typeof id, 'string'); + assert.strictEqual(typeof data, 'object'); + assert.strictEqual(typeof callback, 'function'); + + let args = [ ]; + let fields = [ ]; + for (let k in data) { + fields.push(k + ' = ?'); + args.push(data[k]); + } + args.push(id); + + database.query('UPDATE tasks SET ' + fields.join(', ') + ' WHERE id = ?', args, function (error, result) { + if (error) return callback(new DatabaseError(DatabaseError.INTERNAL_ERROR, error)); + if (result.affectedRows !== 1) return callback(new DatabaseError(DatabaseError.NOT_FOUND)); + + return callback(null); }); } @@ -68,3 +72,15 @@ function get(id, callback) { callback(null, result[0]); }); } + +function del(id, callback) { + assert.strictEqual(typeof id, 'string'); + assert.strictEqual(typeof callback, 'function'); + + database.query('DELETE FROM tasks WHERE id = ?', [ id ], function (error, result) { + if (error) return callback(new DatabaseError(DatabaseError.INTERNAL_ERROR, error)); + if (result.affectedRows !== 1) return callback(new DatabaseError(DatabaseError.NOT_FOUND)); + + callback(null); + }); +} diff --git a/src/tasks.js b/src/tasks.js index ef7637f00..8e6a47ad9 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -1,14 +1,15 @@ 'use strict'; exports = module.exports = { - update: update, get: get, + update: update, startTask: startTask, stopTask: stopTask, TaskError: TaskError, + // task types TASK_BACKUP: 'backup', TASK_UPDATE: 'update', TASK_MIGRATE: 'migrate' @@ -29,7 +30,7 @@ let assert = require('assert'), const NOOP_CALLBACK = function (error) { if (error) debug(error); }; -const TASKS = { +const TASKS = { // indexed by task type backup: { lock: locker.OP_FULL_BACKUP, logFile: paths.BACKUP_LOG_FILE, @@ -48,7 +49,7 @@ const TASKS = { } }; -let gTasks = {}; +let gTasks = {}; // indexed by task id function TaskError(reason, errorOrMessage) { assert.strictEqual(typeof reason, 'string'); @@ -73,20 +74,6 @@ TaskError.INTERNAL_ERROR = 'Internal Error'; TaskError.BAD_STATE = 'Bad State'; TaskError.NOT_FOUND = 'Not Found'; -function update(id, progress, callback) { - assert.strictEqual(typeof id, 'string'); - assert.strictEqual(typeof progress, 'object'); - assert.strictEqual(typeof callback, 'function'); - - debug(`${id}: ${JSON.stringify(progress)}`); - - taskdb.update(id, progress, function (error) { - if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error)); - - callback(); - }); -} - function get(id, callback) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof callback, 'function'); @@ -101,21 +88,28 @@ function get(id, callback) { }); } -function clear(id, args, callback) { +function update(id, task, callback) { assert.strictEqual(typeof id, 'string'); - assert(args && typeof args === 'object'); + assert.strictEqual(typeof task, 'object'); assert.strictEqual(typeof callback, 'function'); - update(id, { percent: 0, message: 'Starting', result: '', errorMessage: '', args: args }, callback); + debug(`${id}: ${JSON.stringify(task)}`); + + taskdb.update(id, task, function (error) { + if (error && error.reason == DatabaseError.NOT_FOUND) return callback(new TaskError(TaskError.NOT_FOUND)); + if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error)); + + callback(); + }); } -function startTask(id, args, auditSource, callback) { - assert.strictEqual(typeof id, 'string'); +function startTask(type, args, auditSource, callback) { + assert.strictEqual(typeof type, 'string'); assert(args && typeof args === 'object'); assert.strictEqual(typeof auditSource, 'object'); assert.strictEqual(typeof callback, 'function'); - const taskInfo = TASKS[id]; + const taskInfo = TASKS[type]; if (!taskInfo) return callback(new TaskError(TaskError.NOT_FOUND, 'No such task')); let error = locker.lock(taskInfo.lock); @@ -128,39 +122,39 @@ function startTask(id, args, auditSource, callback) { return callback(new TaskError(TaskError.INTERNAL_ERROR, error.message)); } - debug(`startTask - starting task ${id}. logs at ${taskInfo.logFile}`); + taskdb.add({ type: type, percent: 0, message: 'Starting', args: args }, function (error, taskId) { + if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error)); - // when parent process dies, this process is killed because KillMode=control-group in systemd unit file - assert(!gTasks[id], 'Task is already running'); + debug(`startTask - starting task ${type}. logs at ${taskInfo.logFile}. id ${taskId}`); - clear(id, args, NOOP_CALLBACK); - eventlog.add(taskInfo.startEventId, auditSource, args); + eventlog.add(taskInfo.startEventId, auditSource, args); - gTasks[id] = child_process.fork(taskInfo.program, [], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc - gTasks[id].once('exit', function (code, signal) { - debug(`startTask: ${id} completed with code ${code} and signal ${signal}`); + gTasks[taskId] = child_process.fork(taskInfo.program, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc + gTasks[taskId].once('exit', function (code, signal) { + debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`); - get(id, function (error, progress) { - if (!error && progress.percent !== 100) { // task crashed or was killed by us (code 50) - error = code === 0 ? new Error(`${id} task stopped`) : new Error(`${id} task crashed with code ${code} and signal ${signal}`); - update(id, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK); - } else if (!error && progress.errorMessage) { - error = new Error(progress.errorMessage); - } + get(taskId, function (error, task) { + if (!error && task.percent !== 100) { // task crashed or was killed by us (code 50) + error = code === 0 ? new Error(`${taskId} task stopped`) : new Error(`${taskId} task crashed with code ${code} and signal ${signal}`); + update(taskId, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK); + } else if (!error && task.errorMessage) { + error = new Error(task.errorMessage); + } - eventlog.add(taskInfo.finishEventId, auditSource, _.extend({ errorMessage: error ? error.message : null }, progress ? progress.result : {})); + eventlog.add(taskInfo.finishEventId, auditSource, _.extend({ errorMessage: error ? error.message : null }, task ? task.result : {})); - locker.unlock(taskInfo.lock); + locker.unlock(taskInfo.lock); - if (error) taskInfo.onFailure(error); + if (error) taskInfo.onFailure(error); - gTasks[id] = null; + gTasks[taskId] = null; - debug(`startTask: ${id} done`); + debug(`startTask: ${taskId} done`); + }); }); - }); - callback(null); + callback(null, taskId); + }); } function stopTask(id, auditSource, callback) { @@ -168,9 +162,6 @@ function stopTask(id, auditSource, callback) { assert.strictEqual(typeof auditSource, 'object'); assert.strictEqual(typeof callback, 'function'); - const taskInfo = TASKS[id]; - if (!taskInfo) return callback(new TaskError(TaskError.NOT_FOUND, 'No such task')); - if (!gTasks[id]) return callback(new TaskError(TaskError.BAD_STATE, 'task is not active')); debug(`stopTask: stopping task ${id}`); diff --git a/src/tasks/backuptask.js b/src/tasks/backuptask.js index b5f29d65f..07da5003e 100755 --- a/src/tasks/backuptask.js +++ b/src/tasks/backuptask.js @@ -14,14 +14,17 @@ process.on('SIGTERM', function () { process.exit(0); }); +assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument'); +const taskId = process.argv[2]; + // Main process starts here debug('Staring backup'); database.initialize(function (error) { if (error) return process.exit(50); - backups.backupBoxAndApps((progress) => tasks.update(tasks.TASK_BACKUP, progress, NOOP_CALLBACK), function (error, backupId) { + backups.backupBoxAndApps((progress) => tasks.update(taskId, progress, NOOP_CALLBACK), function (error, backupId) { const progress = { percent: 100, result: backupId || '', errorMessage: error ? error.message : '' }; - tasks.update(tasks.TASK_BACKUP, progress, () => process.exit(error ? 50 : 0)); + tasks.update(taskId, progress, () => process.exit(error ? 50 : 0)); }); }); diff --git a/src/tasks/updatertask.js b/src/tasks/updatertask.js index 9004e14d6..53596b7e0 100755 --- a/src/tasks/updatertask.js +++ b/src/tasks/updatertask.js @@ -2,7 +2,8 @@ require('supererror')({ splatchError: true }); -let database = require('../database.js'), +let assert = require('assert'), + database = require('../database.js'), debug = require('debug')('box:updatertask'), tasks = require('../tasks.js'), updater = require('../updater.js'); @@ -20,19 +21,22 @@ function exit(error) { process.exit(50); } +assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument'); +const taskId = process.argv[2]; + // Main process starts here debug('Staring update'); database.initialize(function (error) { if (error) return exit(error); - tasks.get(tasks.TASK_UPDATE, function (error, result) { + tasks.get(taskId, function (error, result) { if (error) return exit(error); if (!result.args.boxUpdateInfo) return exit(new Error('Invalid args:' + JSON.stringify(result))); - updater.update(result.args.boxUpdateInfo, (progress) => tasks.update(tasks.TASK_UPDATE, progress, NOOP_CALLBACK), function (updateError) { + updater.update(result.args.boxUpdateInfo, (progress) => tasks.update(taskId, progress, NOOP_CALLBACK), function (updateError) { const progress = { percent: 100, errorMessage: updateError ? updateError.message : '' }; - tasks.update(tasks.TASK_UPDATE, progress, () => exit(updateError)); + tasks.update(taskId, progress, () => exit(updateError)); }); }); }); diff --git a/src/test/backups-test.js b/src/test/backups-test.js index 56a8f7fe5..be0a83088 100644 --- a/src/test/backups-test.js +++ b/src/test/backups-test.js @@ -23,11 +23,11 @@ var async = require('async'), tasks = require('../tasks.js'); function createBackup(callback) { - tasks.startTask(tasks.TASK_BACKUP, [], { username: 'test' }, function (error) { // this call does not wait for the backup! + tasks.startTask(tasks.TASK_BACKUP, [], { username: 'test' }, function (error, taskId) { // this call does not wait for the backup! if (error) return callback(error); function waitForBackup() { - tasks.get(tasks.TASK_BACKUP, function (error, p) { + tasks.get(taskId, function (error, p) { if (error) return callback(error); if (p.percent !== 100) return setTimeout(waitForBackup, 1000); diff --git a/src/test/database-test.js b/src/test/database-test.js index 8628a77ab..a020303f7 100644 --- a/src/test/database-test.js +++ b/src/test/database-test.js @@ -21,6 +21,7 @@ var appdb = require('../appdb.js'), mailboxdb = require('../mailboxdb.js'), maildb = require('../maildb.js'), settingsdb = require('../settingsdb.js'), + taskdb = require('../taskdb.js'), tokendb = require('../tokendb.js'), userdb = require('../userdb.js'), _ = require('underscore'); @@ -1098,6 +1099,56 @@ describe('database', function () { }); }); + describe('tasks', function () { + let taskId; + + let TASK = { + type: 'tasktype', + args: { x: 1 }, + percent: 0, + message: 'starting task' + }; + + it('add succeeds', function (done) { + taskdb.add(TASK, function (error, id) { + expect(error).to.be(null); + expect(id).to.be.ok(); + taskId = id; + done(); + }); + }); + + it('get succeeds', function (done) { + taskdb.get(taskId, function (error, task) { + expect(error).to.be(null); + expect(_.pick(task, Object.keys(TASK))).to.eql(TASK); + done(); + }); + }); + + it('update succeeds', function (done) { + TASK.percent = 34; + TASK.message = 'almost ther'; + taskdb.update(taskId, { percent: TASK.percent, message: TASK.message }, function (error) { + expect(error).to.be(null); + taskdb.get(taskId, function (error, task) { + expect(_.pick(task, Object.keys(TASK))).to.eql(TASK); + done(); + }); + }); + }); + + it('del succeeds', function (done) { + taskdb.del(taskId, function (error) { + expect(error).to.be(null); + taskdb.get(taskId, function (error) { + expect(error.reason).to.be(DatabaseError.NOT_FOUND); + done(); + }); + }); + }); + }); + describe('client', function () { var CLIENT_0 = { id: 'cid-0', diff --git a/src/updater.js b/src/updater.js index 673b2e421..315db5016 100644 --- a/src/updater.js +++ b/src/updater.js @@ -188,10 +188,10 @@ function updateToLatest(auditSource, callback) { if (!boxUpdateInfo) return callback(new UpdaterError(UpdaterError.ALREADY_UPTODATE, 'No update available')); if (!boxUpdateInfo.sourceTarballUrl) return callback(new UpdaterError(UpdaterError.BAD_STATE, 'No automatic update available')); - tasks.startTask(tasks.TASK_UPDATE, { boxUpdateInfo }, auditSource, function (error) { + tasks.startTask(tasks.TASK_UPDATE, { boxUpdateInfo }, auditSource, function (error, taskId) { if (error && error.reason === TaskError.BAD_STATE) return callback(new UpdaterError(UpdaterError.BAD_STATE, error.message)); if (error) return callback(new UpdaterError(UpdaterError.INTERNAL_ERROR, error)); - callback(null); + callback(null, taskId); }); }