diff --git a/src/apps.js b/src/apps.js index d98a11223..02e3ce817 100644 --- a/src/apps.js +++ b/src/apps.js @@ -126,6 +126,8 @@ AppsError.PLAN_LIMIT = 'Plan Limit'; AppsError.ACCESS_DENIED = 'Access denied'; AppsError.BAD_CERTIFICATE = 'Invalid certificate'; +const NOOP_CALLBACK = function (error) { if (error) debug(error); }; + // validate the port bindings function validatePortBindings(portBindings, manifest) { assert.strictEqual(typeof portBindings, 'object'); @@ -578,19 +580,17 @@ function startAppTask(appId, args, callback) { if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory - let task = tasks.startTask(tasks.TASK_APP, [ appId, args ], { logFile }); - task.on('start', function (taskId) { + tasks.add(tasks.TASK_APP, [ appId, args ], function (error, taskId) { + if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); + appdb.update(appId, { taskId: taskId }, function (error) { if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - get(appId, function (error, result) { - if (error) return callback(error); + tasks.startTask(taskId, { logFile }, NOOP_CALLBACK); - callback(null, result); - }); + callback(null, taskId); }); }); - task.on('error', (error) => callback(new AppsError(AppsError.INTERNAL_ERROR, error))); } function install(data, user, auditSource, callback) { diff --git a/src/backups.js b/src/backups.js index e64d988ba..a2abd5c73 100644 --- a/src/backups.js +++ b/src/backups.js @@ -1020,19 +1020,21 @@ function startBackupTask(auditSource, callback) { let error = locker.lock(locker.OP_FULL_BACKUP); if (error) return callback(new BackupsError(BackupsError.BAD_STATE, `Cannot backup now: ${error.message}`)); - let task = tasks.startTask(tasks.TASK_BACKUP, []); - task.on('error', (error) => callback(new BackupsError(BackupsError.INTERNAL_ERROR, error))); - task.on('start', (taskId) => { + tasks.add(tasks.TASK_BACKUP, [ ], function (error, taskId) { + if (error) return callback(new BackupsError(BackupsError.INTERNAL_ERROR, error)); + eventlog.add(eventlog.ACTION_BACKUP_START, auditSource, { taskId }); + + tasks.startTask(taskId, {}, function (error, result) { + locker.unlock(locker.OP_FULL_BACKUP); + + const errorMessage = error ? error.message : ''; + + eventlog.add(eventlog.ACTION_BACKUP_FINISH, auditSource, { taskId: taskId, errorMessage: errorMessage, backupId: result }); + }); + callback(null, taskId); }); - task.on('finish', (error, result) => { - locker.unlock(locker.OP_FULL_BACKUP); - - const errorMessage = error ? error.message : ''; - - eventlog.add(eventlog.ACTION_BACKUP_FINISH, auditSource, { taskId: task.id, errorMessage: errorMessage, backupId: result }); - }); } function ensureBackup(auditSource, callback) { @@ -1256,18 +1258,21 @@ function cleanup(auditSource, progressCallback, callback) { } function startCleanupTask(auditSource, callback) { - let task = tasks.startTask(tasks.TASK_CLEAN_BACKUPS, [ auditSource ]); - task.on('error', (error) => callback(new BackupsError(BackupsError.INTERNAL_ERROR, error))); - task.on('start', (taskId) => { + + tasks.add(tasks.TASK_CLEAN_BACKUPS, [ auditSource ], function (error, taskId) { + if (error) return callback(new BackupsError(BackupsError.INTERNAL_ERROR, error)); + eventlog.add(eventlog.ACTION_BACKUP_CLEANUP_START, auditSource, { taskId }); - callback(null, taskId); - }); - task.on('finish', (error, result) => { // result is { removedBoxBackups, removedAppBackups } - eventlog.add(eventlog.ACTION_BACKUP_CLEANUP_FINISH, auditSource, { - errorMessage: error ? error.message : null, - removedBoxBackups: result ? result.removedBoxBackups : [], - removedAppBackups: result ? result.removedAppBackups : [] + + tasks.startTask(taskId, {}, (error, result) => { // result is { removedBoxBackups, removedAppBackups } + eventlog.add(eventlog.ACTION_BACKUP_CLEANUP_FINISH, auditSource, { + errorMessage: error ? error.message : null, + removedBoxBackups: result ? result.removedBoxBackups : [], + removedAppBackups: result ? result.removedAppBackups : [] + }); }); + + callback(null, taskId); }); } diff --git a/src/cloudron.js b/src/cloudron.js index d59105474..03cc136a0 100644 --- a/src/cloudron.js +++ b/src/cloudron.js @@ -307,9 +307,13 @@ function prepareDashboardDomain(domain, auditSource, callback) { const conflict = result.filter(app => app.fqdn === fqdn); if (conflict.length) return callback(new CloudronError(CloudronError.BAD_STATE, 'Dashboard location conflicts with an existing app')); - let task = tasks.startTask(tasks.TASK_PREPARE_DASHBOARD_DOMAIN, [ domain, auditSource ]); - task.on('error', (error) => callback(new CloudronError(CloudronError.INTERNAL_ERROR, error))); - task.on('start', (taskId) => callback(null, taskId)); + tasks.add(tasks.TASK_PREPARE_DASHBOARD_DOMAIN, [ domain, auditSource ], function (error, taskId) { + if (error) return callback(new CloudronError(CloudronError.INTERNAL_ERROR, error)); + + tasks.startTask(taskId, {}, NOOP_CALLBACK); + + callback(null, taskId); + }); }); }); } @@ -378,7 +382,11 @@ function renewCerts(options, auditSource, callback) { assert.strictEqual(typeof auditSource, 'object'); assert.strictEqual(typeof callback, 'function'); - let task = tasks.startTask(tasks.TASK_RENEW_CERTS, [ options, auditSource ]); - task.on('error', (error) => callback(new CloudronError(CloudronError.INTERNAL_ERROR, error))); - task.on('start', (taskId) => callback(null, taskId)); + tasks.add(tasks.TASK_RENEW_CERTS, [ options, auditSource ], function (error, taskId) { + if (error) return callback(new CloudronError(CloudronError.INTERNAL_ERROR, error)); + + tasks.startTask(taskId, {}, NOOP_CALLBACK); + + callback(null, taskId); + }); } diff --git a/src/routes/test/tasks-test.js b/src/routes/test/tasks-test.js index 9bc83ffed..26b3c3a84 100644 --- a/src/routes/test/tasks-test.js +++ b/src/routes/test/tasks-test.js @@ -53,105 +53,105 @@ describe('Tasks API', function () { after(cleanup); it('can get task', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_IDENTITY, [ 'ping' ]); - task.on('error', done); - task.on('start', (tid) => { taskId = tid; }); + tasks.add(tasks._TASK_IDENTITY, [ 'ping' ], function (error, taskId) { + if (error) return done(error); - task.on('finish', function () { - superagent.get(SERVER_URL + '/api/v1/tasks/' + taskId) - .query({ access_token: token }) - .end(function (err, res) { - expect(res.statusCode).to.equal(200); - expect(res.body.percent).to.be(100); - expect(res.body.args).to.be(undefined); - expect(res.body.active).to.be(false); // finished - expect(res.body.result).to.be('ping'); - expect(res.body.errorMessage).to.be(null); - done(); - }); + tasks.startTask(taskId, {}, function () { + superagent.get(SERVER_URL + '/api/v1/tasks/' + taskId) + .query({ access_token: token }) + .end(function (err, res) { + expect(res.statusCode).to.equal(200); + expect(res.body.percent).to.be(100); + expect(res.body.args).to.be(undefined); + expect(res.body.active).to.be(false); // finished + expect(res.body.success).to.be(true); + expect(res.body.result).to.be('ping'); + expect(res.body.errorMessage).to.be(null); + done(); + }); + }); }); }); it('can get logs', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_CRASH, [ 'ping' ]); - task.on('error', done); - task.on('start', (tid) => { taskId = tid; }); + tasks.add(tasks._TASK_CRASH, [ 'ping' ], function (error, taskId) { + if (error) return done(error); - task.on('finish', function () { - superagent.get(SERVER_URL + '/api/v1/tasks/' + taskId + '/logs') - .query({ access_token: token }) - .end(function (err, res) { - expect(res.statusCode).to.equal(200); - done(); - }); + tasks.startTask(taskId, {}, function () { + superagent.get(SERVER_URL + '/api/v1/tasks/' + taskId + '/logs') + .query({ access_token: token }) + .end(function (err, res) { + expect(res.statusCode).to.equal(200); + done(); + }); + }); }); }); it('cannot stop inactive task', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_IDENTITY, [ 'ping' ]); - task.on('error', done); - task.on('start', (tid) => { taskId = tid; }); + tasks.add(tasks._TASK_IDENTITY, [ 'ping' ], function (error, taskId) { + if (error) return done(error); - task.on('finish', function () { - superagent.post(SERVER_URL + '/api/v1/tasks/' + taskId + '/stop') - .query({ access_token: token }) - .end(function (err, res) { - expect(res.statusCode).to.equal(409); - done(); - }); + tasks.startTask(taskId, {}, function () { + superagent.post(SERVER_URL + '/api/v1/tasks/' + taskId + '/stop') + .query({ access_token: token }) + .end(function (err, res) { + expect(res.statusCode).to.equal(409); + done(); + }); + }); }); }); it('can stop task', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_SLEEP, [ 10000 ]); - task.on('error', done); - task.on('start', (tid) => { - taskId = tid; - superagent.post(SERVER_URL + '/api/v1/tasks/' + taskId + '/stop') - .query({ access_token: token }) - .end(function (err, res) { - expect(res.statusCode).to.equal(204); - }); - }); - task.on('finish', () => { - superagent.get(SERVER_URL + '/api/v1/tasks/' + taskId) - .query({ access_token: token }) - .end(function (err, res) { - expect(res.statusCode).to.equal(200); - expect(res.body.percent).to.be(100); - expect(res.body.active).to.be(false); // finished - expect(res.body.result).to.be(null); - expect(res.body.errorMessage).to.contain('signal SIGTERM'); - done(); - }); + tasks.add(tasks._TASK_SLEEP, [ 10000 ], function (error, taskId) { + if (error) return done(error); + + tasks.startTask(taskId, {}, function () { + superagent.get(SERVER_URL + '/api/v1/tasks/' + taskId) + .query({ access_token: token }) + .end(function (err, res) { + expect(res.statusCode).to.equal(200); + expect(res.body.percent).to.be(100); + expect(res.body.active).to.be(false); // finished + expect(res.body.success).to.be(false); + expect(res.body.result).to.be(null); + expect(res.body.errorMessage).to.contain('signal SIGTERM'); + done(); + }); + }); + + setTimeout(function () { + superagent.post(SERVER_URL + '/api/v1/tasks/' + taskId + '/stop') + .query({ access_token: token }) + .end(function (err, res) { + expect(res.statusCode).to.equal(204); + }); + }, 100); }); }); it('can list tasks', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_IDENTITY, [ 'ping' ]); - task.on('error', done); - task.on('start', (tid) => { taskId = tid; }); + tasks.add(tasks._TASK_IDENTITY, [ 'ping' ], function (error, taskId) { + if (error) return done(error); - task.on('finish', function () { - superagent.get(SERVER_URL + '/api/v1/tasks?type=' + tasks._TASK_IDENTITY) - .query({ access_token: token }) - .end(function (err, res) { - expect(res.statusCode).to.equal(200); - expect(res.body.tasks.length >= 1).to.be(true); - expect(res.body.tasks[0].id).to.be(taskId); - expect(res.body.tasks[0].percent).to.be(100); - expect(res.body.tasks[0].args).to.be(undefined); - expect(res.body.tasks[0].active).to.be(false); // finished - expect(res.body.tasks[0].result).to.be('ping'); - expect(res.body.tasks[0].errorMessage).to.be(null); - done(); - }); + tasks.startTask(taskId, {}, function () { + superagent.get(SERVER_URL + '/api/v1/tasks?type=' + tasks._TASK_IDENTITY) + .query({ access_token: token }) + .end(function (err, res) { + expect(res.statusCode).to.equal(200); + expect(res.body.tasks.length >= 1).to.be(true); + expect(res.body.tasks[0].id).to.be(taskId); + expect(res.body.tasks[0].percent).to.be(100); + expect(res.body.tasks[0].args).to.be(undefined); + expect(res.body.tasks[0].active).to.be(false); // finished + expect(res.body.tasks[0].success).to.be(true); // finished + expect(res.body.tasks[0].result).to.be('ping'); + expect(res.body.tasks[0].errorMessage).to.be(null); + done(); + }); + }); }); }); }); diff --git a/src/tasks.js b/src/tasks.js index 9ab369a8b..b97e1a057 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -2,6 +2,7 @@ exports = module.exports = { get: get, + add: add, update: update, listByTypePaged: listByTypePaged, @@ -33,7 +34,6 @@ let assert = require('assert'), child_process = require('child_process'), DatabaseError = require('./databaseerror.js'), debug = require('debug')('box:tasks'), - EventEmitter = require('events'), paths = require('./paths.js'), safe = require('safetydance'), spawn = require('child_process').spawn, @@ -69,6 +69,13 @@ TaskError.INTERNAL_ERROR = 'Internal Error'; TaskError.BAD_STATE = 'Bad State'; TaskError.NOT_FOUND = 'Not Found'; +function postProcess(result) { + assert.strictEqual(typeof result, 'object'); + + result.active = !!gTasks[result.id]; + result.success = result.percent === 100 && !result.errorMessage; +} + function get(id, callback) { assert.strictEqual(typeof id, 'string'); assert.strictEqual(typeof callback, 'function'); @@ -77,9 +84,7 @@ function get(id, callback) { if (error && error.reason == DatabaseError.NOT_FOUND) return callback(new TaskError(TaskError.NOT_FOUND)); if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error)); - // add some virtual fields - task.active = !!gTasks[id]; - task.success = task.percent === 100 && !task.errorMessage; + postProcess(task); callback(null, task); }); @@ -100,54 +105,53 @@ function update(id, task, callback) { }); } -function startTask(type, args, options) { +function add(type, args, callback) { assert.strictEqual(typeof type, 'string'); assert(Array.isArray(args)); - assert(!options || typeof options === 'object'); - - let events = new EventEmitter(); - - options = options || {}; + assert.strictEqual(typeof callback, 'function'); taskdb.add({ type: type, percent: 0, message: 'Starting', args: args }, function (error, taskId) { - if (error) return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error)); + if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error)); - const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${taskId}.log`; - let fd = safe.fs.openSync(logFile, 'w'); // will autoclose - if (!fd) { - debug(`startTask: unable to get log filedescriptor ${safe.error.message}`); - return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error.message)); - } - - debug(`startTask - starting task ${type}. logs at ${logFile} id ${taskId}`); - - gTasks[taskId] = child_process.fork(`${__dirname}/taskworker.js`, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc - gTasks[taskId].once('exit', function (code, signal) { - debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`); - - get(taskId, function (error, task) { - if (!error && task.percent !== 100) { // task crashed or was killed by us (code 50) - error = code === 0 ? new Error(`Task ${taskId} stopped`) : new Error(`Task ${taskId} crashed with code ${code} and signal ${signal}`); - update(taskId, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK); - } else if (!error && task.errorMessage) { - error = new Error(task.errorMessage); - } else if (!task) { // db got cleared in tests - error = new Error(`No such task ${taskId}`); - } - - gTasks[taskId] = null; - - events.emit('finish', error, task ? task.result : null); - - debug(`startTask: ${taskId} done`); - }); - }); - - events.id = taskId; - events.emit('start', taskId); + callback(null, taskId); }); +} - return events; +function startTask(taskId, options, callback) { + assert.strictEqual(typeof taskId, 'string'); + assert.strictEqual(typeof options, 'object'); + assert.strictEqual(typeof callback, 'function'); + + const logFile = options.logFile || `${paths.TASKS_LOG_DIR}/${taskId}.log`; + let fd = safe.fs.openSync(logFile, 'w'); // will autoclose + if (!fd) { + debug(`startTask: unable to get log filedescriptor ${safe.error.message}`); + return callback(new TaskError(TaskError.INTERNAL_ERROR, safe.error)); + } + + debug(`startTask - starting task ${taskId}. logs at ${logFile}`); + + gTasks[taskId] = child_process.fork(`${__dirname}/taskworker.js`, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc + gTasks[taskId].once('exit', function (code, signal) { + debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`); + + get(taskId, function (error, task) { + if (!error && task.percent !== 100) { // task crashed or was killed by us (code 50) + error = code === 0 ? new Error(`Task ${taskId} stopped`) : new Error(`Task ${taskId} crashed with code ${code} and signal ${signal}`); + update(taskId, { percent: 100, errorMessage: error.message }, NOOP_CALLBACK); + } else if (!error && task.errorMessage) { + error = new Error(task.errorMessage); + } else if (!task) { // db got cleared in tests + error = new Error(`No such task ${taskId}`); + } + + delete gTasks[taskId]; + + callback(error, task ? task.result : null); + + debug(`startTask: ${taskId} done`); + }); + }); } function stopTask(id, callback) { @@ -172,7 +176,7 @@ function listByTypePaged(type, page, perPage, callback) { taskdb.listByTypePaged(type, page, perPage, function (error, tasks) { if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error)); - tasks.forEach((task) => { task.active = !!gTasks[task.id]; }); + tasks.forEach(postProcess); callback(null, tasks); }); @@ -228,6 +232,6 @@ function getLogs(taskId, options, callback) { // removes all fields that are strictly private and should never be returned by API calls function removePrivateFields(task) { - var result = _.pick(task, 'id', 'type', 'percent', 'message', 'errorMessage', 'active', 'creationTime', 'result', 'ts'); + var result = _.pick(task, 'id', 'type', 'percent', 'message', 'errorMessage', 'active', 'creationTime', 'result', 'ts', 'success'); return result; } diff --git a/src/test/tasks-test.js b/src/test/tasks-test.js index 7c076ef3d..f966f9447 100644 --- a/src/test/tasks-test.js +++ b/src/test/tasks-test.js @@ -32,64 +32,65 @@ describe('task', function () { after(cleanup); it('can run valid task - success', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_IDENTITY, [ 'ping' ]); - task.on('error', done); - task.on('start', (tid) => { taskId = tid; }); - task.on('finish', function (error, result) { + tasks.add(tasks._TASK_IDENTITY, [ 'ping' ], function (error, taskId) { if (error) return done(error); - expect(result).to.equal('ping'); - expect(taskId).to.be.ok(); - done(); + + tasks.startTask(taskId, {}, function (error, result) { + if (error) return done(error); + expect(result).to.equal('ping'); + expect(taskId).to.be.ok(); + done(); + }); }); }); it('can run valid task - error', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_ERROR, [ 'ping' ]); - task.on('error', done); - task.on('start', (tid) => { taskId = tid; }); - task.on('finish', function (error, result) { - expect(error).to.be.ok(); - expect(error.message).to.be('Failed for arg: ping'); - expect(result).to.be(null); - expect(taskId).to.be.ok(); - done(); + tasks.add(tasks._TASK_ERROR, [ 'ping' ], function (error, taskId) { + if (error) return done(error); + + tasks.startTask(taskId, {}, function (error, result) { + expect(error).to.be.ok(); + expect(error.message).to.be('Failed for arg: ping'); + expect(result).to.be(null); + expect(taskId).to.be.ok(); + done(); + }); }); }); it('can get logs of crash', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_CRASH, [ 'ping' ]); - task.on('error', done); - task.on('start', (tid) => { taskId = tid; }); - task.on('finish', function (error, result) { - expect(error).to.be.ok(); - expect(error.message).to.contain(`Task ${taskId} crashed`); - expect(result).to.be(null); - expect(taskId).to.be.ok(); + tasks.add(tasks._TASK_CRASH, [ 'ping' ], function (error, taskId) { + if (error) return done(error); - let logs = fs.readFileSync(`${paths.TASKS_LOG_DIR}/${taskId}.log`, 'utf8'); - expect(logs).to.contain('Crashing for arg: ping'); - done(); + tasks.startTask(taskId, {}, function (error, result) { + expect(error).to.be.ok(); + + expect(error.message).to.contain(`Task ${taskId} crashed`); + expect(result).to.be(null); + expect(taskId).to.be.ok(); + + let logs = fs.readFileSync(`${paths.TASKS_LOG_DIR}/${taskId}.log`, 'utf8'); + expect(logs).to.contain('Crashing for arg: ping'); + done(); + }); }); }); it('can stop task', function (done) { - let taskId = null; - let task = tasks.startTask(tasks._TASK_SLEEP, [ 10000 ]); - task.on('error', done); - task.on('start', (tid) => { - taskId = tid; - tasks.stopTask(taskId, () => {}); - }); - task.on('finish', function (error, result) { - expect(error).to.be.ok(); - expect(error.message).to.contain('SIGTERM'); - expect(result).to.be(null); - expect(taskId).to.be.ok(); - done(); + tasks.add(tasks._TASK_SLEEP, [ 10000 ], function (error, taskId) { + if (error) return done(error); + + tasks.startTask(taskId, {}, function (error, result) { + expect(error).to.be.ok(); + expect(error.message).to.contain('SIGTERM'); + expect(result).to.be(null); + expect(taskId).to.be.ok(); + done(); + }); + + setTimeout(() => { + tasks.stopTask(taskId, () => {}); + }, 100); }); }); - }); diff --git a/src/updater.js b/src/updater.js index 1f751652b..aaa603ae3 100644 --- a/src/updater.js +++ b/src/updater.js @@ -244,16 +244,18 @@ function updateToLatest(options, auditSource, callback) { error = locker.lock(locker.OP_BOX_UPDATE); if (error) return callback(new UpdaterError(UpdaterError.BAD_STATE, `Cannot update now: ${error.message}`)); - let task = tasks.startTask(tasks.TASK_UPDATE, [ boxUpdateInfo, options ]); - task.on('error', (error) => callback(new UpdaterError(UpdaterError.INTERNAL_ERROR, error))); - task.on('start', (taskId) => { - eventlog.add(eventlog.ACTION_UPDATE, auditSource, { taskId, boxUpdateInfo }); - callback(null, taskId); - }); - task.on('finish', (error) => { - locker.unlock(locker.OP_BOX_UPDATE); + tasks.add(tasks.TASK_UPDATE, [ boxUpdateInfo, options ], function (error, taskId) { + if (error) return callback(new UpdaterError(UpdaterError.INTERNAL_ERROR, error)); - debug('Update failed with error', error); + eventlog.add(eventlog.ACTION_UPDATE, auditSource, { taskId, boxUpdateInfo }); + + tasks.startTask(taskId, {}, (error) => { + locker.unlock(locker.OP_BOX_UPDATE); + + debug('Update failed with error', error); + }); + + callback(null, taskId); }); }); }