diff --git a/src/backups.js b/src/backups.js index cdc1543fc..0daf8f59a 100644 --- a/src/backups.js +++ b/src/backups.js @@ -10,6 +10,7 @@ exports = module.exports = { get: get, + startBackupTask: startBackupTask, ensureBackup: ensureBackup, restore: restore, @@ -44,7 +45,10 @@ var addons = require('./addons.js'), database = require('./database.js'), DatabaseError = require('./databaseerror.js'), debug = require('debug')('box:backups'), + eventlog = require('./eventlog.js'), fs = require('fs'), + locker = require('./locker.js'), + mailer = require('./mailer.js'), mkdirp = require('mkdirp'), once = require('once'), path = require('path'), @@ -918,6 +922,25 @@ function backupBoxAndApps(progressCallback, callback) { }); } +function startBackupTask(auditSource, callback) { + let error = locker.lock(locker.OP_FULL_BACKUP); + if (error) return callback(error); + + let task = tasks.startTask(tasks.TASK_BACKUP, [], auditSource); + task.on('error', (error) => callback(new BackupsError(BackupsError.INTERNAL_ERROR, error))); + task.on('start', (taskId) => { + eventlog.add(eventlog.ACTION_BACKUP_START, auditSource, { taskId }); + callback(null, taskId); + }); + task.on('finish', (error, result) => { + locker.unlock(locker.OP_FULL_BACKUP); + + if (error) mailer.backupFailed(error); + + eventlog.add(eventlog.ACTION_BACKUP_FINISH, auditSource, { errorMessage: error ? error.message : null, backupId: result }); + }); +} + function ensureBackup(auditSource, callback) { assert.strictEqual(typeof auditSource, 'object'); @@ -937,7 +960,7 @@ function ensureBackup(auditSource, callback) { return callback(null); } - tasks.startTask(tasks.TASK_BACKUP, {}, auditSource, callback); + startBackupTask(auditSource, callback); }); }); } diff --git a/src/routes/backups.js b/src/routes/backups.js index c354ac469..ff9942aeb 100644 --- a/src/routes/backups.js +++ b/src/routes/backups.js @@ -9,9 +9,7 @@ var backupdb = require('../backupdb.js'), backups = require('../backups.js'), BackupsError = require('../backups.js').BackupsError, HttpError = require('connect-lastmile').HttpError, - HttpSuccess = require('connect-lastmile').HttpSuccess, - tasks = require('../tasks.js'), - TasksError = require('../tasks.js').TasksError; + HttpSuccess = require('connect-lastmile').HttpSuccess; function auditSource(req) { var ip = req.headers['x-forwarded-for'] || req.connection.remoteAddress || null; @@ -34,10 +32,8 @@ function list(req, res, next) { } function startBackup(req, res, next) { - // note that cloudron.backup only waits for backup initiation and not for backup to complete - // backup progress can be checked up ny polling the progress api call - tasks.startTask(tasks.TASK_BACKUP, {}, auditSource(req), function (error, taskId) { - if (error && error.reason === TasksError.BAD_STATE) return next(new HttpError(409, error.message)); + backups.startBackupTask(auditSource(req), function (error, taskId) { + if (error && error.reason === BackupsError.BAD_STATE) return next(new HttpError(409, error.message)); if (error) return next(new HttpError(500, error)); next(new HttpSuccess(202, { taskId })); diff --git a/src/routes/sysadmin.js b/src/routes/sysadmin.js index 62253efe7..427b78a54 100644 --- a/src/routes/sysadmin.js +++ b/src/routes/sysadmin.js @@ -11,12 +11,12 @@ exports = module.exports = { var apps = require('../apps.js'), AppsError = apps.AppsError, addons = require('../addons.js'), + backups = require('../backups.js'), + BackupsError = require('../backups.js').BackupsError, cloudron = require('../cloudron.js'), debug = require('debug')('box:routes/sysadmin'), HttpError = require('connect-lastmile').HttpError, HttpSuccess = require('connect-lastmile').HttpSuccess, - tasks = require('../tasks.js'), - TasksError = require('../tasks.js').TasksError, updater = require('../updater.js'), UpdaterError = require('../updater.js').UpdaterError; @@ -26,8 +26,8 @@ function backup(req, res, next) { // note that cloudron.backup only waits for backup initiation and not for backup to complete // backup progress can be checked up ny polling the progress api call var auditSource = { userId: null, username: 'sysadmin' }; - tasks.startTask(tasks.TASK_BACKUP, {}, auditSource, function (error, taskId) { - if (error && error.reason === TasksError.BAD_STATE) return next(new HttpError(409, error.message)); + backups.startBackupTask(auditSource, function (error, taskId) { + if (error && error.reason === BackupsError.BAD_STATE) return next(new HttpError(409, error.message)); if (error) return next(new HttpError(500, error)); next(new HttpSuccess(202, { taskId })); diff --git a/src/taskdb.js b/src/taskdb.js index c2e0c9260..969ac99da 100644 --- a/src/taskdb.js +++ b/src/taskdb.js @@ -19,7 +19,7 @@ function postProcess(result) { assert.strictEqual(typeof result, 'object'); assert(result.argsJson === null || typeof result.argsJson === 'string'); - result.args = safe.JSON.parse(result.argsJson) || {}; + result.args = safe.JSON.parse(result.argsJson) || []; delete result.argsJson; result.id = String(result.id); diff --git a/src/tasks.js b/src/tasks.js index eaf390e5d..9799a1324 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -12,7 +12,7 @@ exports = module.exports = { TaskError: TaskError, - // task types + // task types. if you add a task here, fill up the function table in taskworker TASK_BACKUP: 'backup', TASK_UPDATE: 'update', TASK_MIGRATE: 'migrate' @@ -22,36 +22,16 @@ let assert = require('assert'), child_process = require('child_process'), DatabaseError = require('./databaseerror.js'), debug = require('debug')('box:tasks'), - eventlog = require('./eventlog.js'), - locker = require('./locker.js'), - mailer = require('./mailer.js'), + EventEmitter = require('events'), paths = require('./paths.js'), safe = require('safetydance'), spawn = require('child_process').spawn, split = require('split'), taskdb = require('./taskdb.js'), - util = require('util'), - _ = require('underscore'); + util = require('util'); const NOOP_CALLBACK = function (error) { if (error) debug(error); }; -const TASKS = { // indexed by task type - backup: { - lock: locker.OP_FULL_BACKUP, - program: __dirname + '/tasks/backuptask.js', - onFailure: mailer.backupFailed, - startEventId: eventlog.ACTION_BACKUP_START, - finishEventId: eventlog.ACTION_BACKUP_FINISH - }, - update: { - lock: locker.OP_BOX_UPDATE, - program: __dirname + '/tasks/updatertask.js', - onFailure: NOOP_CALLBACK, - startEventId: eventlog.ACTION_UPDATE, - finishEventId: eventlog.ACTION_UPDATE - } -}; - let gTasks = {}; // indexed by task id function TaskError(reason, errorOrMessage) { @@ -106,33 +86,26 @@ function update(id, task, callback) { }); } -function startTask(type, args, auditSource, callback) { +function startTask(type, args, auditSource) { assert.strictEqual(typeof type, 'string'); - assert(args && typeof args === 'object'); + assert(Array.isArray(args)); assert.strictEqual(typeof auditSource, 'object'); - assert.strictEqual(typeof callback, 'function'); - const taskInfo = TASKS[type]; - if (!taskInfo) return callback(new TaskError(TaskError.NOT_FOUND, 'No such task')); - - let error = locker.lock(taskInfo.lock); - if (error) return callback(new TaskError(TaskError.BAD_STATE, error.message)); + let events = new EventEmitter(); taskdb.add({ type: type, percent: 0, message: 'Starting', args: args }, function (error, taskId) { - if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error)); + if (error) return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error)); - let fd = safe.fs.openSync(`${paths.TASKS_LOG_DIR}/${taskId}.log`, 'a'); // will autoclose + const logFile = `${paths.TASKS_LOG_DIR}/${taskId}.log`; + let fd = safe.fs.openSync(logFile, 'a'); // will autoclose if (!fd) { debug(`startTask: unable to get log filedescriptor ${safe.error.message}`); - locker.unlock(taskInfo.lock); - return callback(new TaskError(TaskError.INTERNAL_ERROR, error.message)); + return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error.message)); } - debug(`startTask - starting task ${type}. logs at ${taskInfo.logFile}. id ${taskId}`); + debug(`startTask - starting task ${type}. logs at ${logFile} id ${taskId}`); - eventlog.add(taskInfo.startEventId, auditSource, args); - - gTasks[taskId] = child_process.fork(taskInfo.program, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc + gTasks[taskId] = child_process.fork(`${__dirname}/taskworker.js`, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc gTasks[taskId].once('exit', function (code, signal) { debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`); @@ -144,20 +117,18 @@ function startTask(type, args, auditSource, callback) { error = new Error(task.errorMessage); } - eventlog.add(taskInfo.finishEventId, auditSource, _.extend({ errorMessage: error ? error.message : null }, task ? task.result : {})); - - locker.unlock(taskInfo.lock); - - if (error) taskInfo.onFailure(error); - gTasks[taskId] = null; + events.emit('finish', error, task.result); + debug(`startTask: ${taskId} done`); }); }); - callback(null, taskId); + events.emit('start', taskId); }); + + return events; } function stopTask(id, auditSource, callback) { diff --git a/src/tasks/backuptask.js b/src/tasks/backuptask.js deleted file mode 100755 index 07da5003e..000000000 --- a/src/tasks/backuptask.js +++ /dev/null @@ -1,30 +0,0 @@ -'use strict'; - -require('supererror')({ splatchError: true }); - -var assert = require('assert'), - backups = require('../backups.js'), - database = require('../database.js'), - debug = require('debug')('box:backuptask'), - tasks = require('../tasks.js'); - -const NOOP_CALLBACK = function (error) { if (error) debug(error); }; - -process.on('SIGTERM', function () { - process.exit(0); -}); - -assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument'); -const taskId = process.argv[2]; - -// Main process starts here -debug('Staring backup'); -database.initialize(function (error) { - if (error) return process.exit(50); - - backups.backupBoxAndApps((progress) => tasks.update(taskId, progress, NOOP_CALLBACK), function (error, backupId) { - const progress = { percent: 100, result: backupId || '', errorMessage: error ? error.message : '' }; - - tasks.update(taskId, progress, () => process.exit(error ? 50 : 0)); - }); -}); diff --git a/src/tasks/updatertask.js b/src/tasks/updatertask.js deleted file mode 100755 index 53596b7e0..000000000 --- a/src/tasks/updatertask.js +++ /dev/null @@ -1,42 +0,0 @@ -'use strict'; - -require('supererror')({ splatchError: true }); - -let assert = require('assert'), - database = require('../database.js'), - debug = require('debug')('box:updatertask'), - tasks = require('../tasks.js'), - updater = require('../updater.js'); - -const NOOP_CALLBACK = function (error) { if (error) debug(error); }; - -process.on('SIGTERM', function () { - process.exit(0); -}); - -function exit(error) { - if (!error) process.exit(0); - - debug(error); - process.exit(50); -} - -assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument'); -const taskId = process.argv[2]; - -// Main process starts here -debug('Staring update'); -database.initialize(function (error) { - if (error) return exit(error); - - tasks.get(taskId, function (error, result) { - if (error) return exit(error); - if (!result.args.boxUpdateInfo) return exit(new Error('Invalid args:' + JSON.stringify(result))); - - updater.update(result.args.boxUpdateInfo, (progress) => tasks.update(taskId, progress, NOOP_CALLBACK), function (updateError) { - const progress = { percent: 100, errorMessage: updateError ? updateError.message : '' }; - - tasks.update(taskId, progress, () => exit(updateError)); - }); - }); -}); diff --git a/src/taskworker.js b/src/taskworker.js new file mode 100755 index 000000000..b85dd4f87 --- /dev/null +++ b/src/taskworker.js @@ -0,0 +1,44 @@ +'use strict'; + +require('supererror')({ splatchError: true }); + +var assert = require('assert'), + backups = require('./backups.js'), + database = require('./database.js'), + debug = require('debug')('box:taskworker'), + tasks = require('./tasks.js'), + updater = require('./updater.js'); + +const NOOP_CALLBACK = function (error) { if (error) debug(error); }; + +const TASKS = { // indexed by task type + backup: backups.backupBoxAndApps, + updater: updater.update, +}; + +process.on('SIGTERM', function () { + process.exit(0); +}); + +assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument'); +const taskId = process.argv[2]; + +// Main process starts here +debug(`Staring task ${taskId}`); + +database.initialize(function (error) { + if (error) return process.exit(50); + + tasks.get(taskId, function (error, task) { + if (error) return process.exit(50); + + const progressCallback = (progress) => tasks.update(taskId, progress, NOOP_CALLBACK); + const resultCallback = (error, result) => { + const progress = { percent: 100, result: result || '', errorMessage: error ? error.message : '' }; + + tasks.update(taskId, progress, () => process.exit(error ? 50 : 0)); + }; + + TASKS[task.type].apply(null, task.args.concat(progressCallback).concat(resultCallback)); + }); +}); diff --git a/src/test/backups-test.js b/src/test/backups-test.js index be0a83088..24a80801a 100644 --- a/src/test/backups-test.js +++ b/src/test/backups-test.js @@ -23,7 +23,7 @@ var async = require('async'), tasks = require('../tasks.js'); function createBackup(callback) { - tasks.startTask(tasks.TASK_BACKUP, [], { username: 'test' }, function (error, taskId) { // this call does not wait for the backup! + backups.startBackupTask({ username: 'test' }, function (error, taskId) { // this call does not wait for the backup! if (error) return callback(error); function waitForBackup() { diff --git a/src/test/setupTest b/src/test/setupTest index 3418c73c8..7e5723fe6 100755 --- a/src/test/setupTest +++ b/src/test/setupTest @@ -13,7 +13,7 @@ cd $HOME/.cloudron_test mkdir -p configs mkdir -p appsdata mkdir -p boxdata/appicons boxdata/mail boxdata/certs boxdata/mail/dkim/localhost boxdata/mail/dkim/foobar.com -mkdir -p platformdata/addons/mail platformdata/nginx/cert platformdata/nginx/applications platformdata/collectd/collectd.conf.d platformdata/addons platformdata/logrotate.d platformdata/backup platformdata/logs/backup +mkdir -p platformdata/addons/mail platformdata/nginx/cert platformdata/nginx/applications platformdata/collectd/collectd.conf.d platformdata/addons platformdata/logrotate.d platformdata/backup platformdata/logs/tasks # put cert openssl req -x509 -newkey rsa:2048 -keyout platformdata/nginx/cert/host.key -out platformdata/nginx/cert/host.cert -days 3650 -subj '/CN=localhost' -nodes -config <(cat /etc/ssl/openssl.cnf <(printf "\n[SAN]\nsubjectAltName=DNS:*.localhost")) diff --git a/src/updater.js b/src/updater.js index 315db5016..097300deb 100644 --- a/src/updater.js +++ b/src/updater.js @@ -14,6 +14,8 @@ var assert = require('assert'), config = require('./config.js'), crypto = require('crypto'), debug = require('debug')('box:updater'), + eventlog = require('./eventlog.js'), + locker = require('./locker.js'), mkdirp = require('mkdirp'), os = require('os'), path = require('path'), @@ -21,7 +23,6 @@ var assert = require('assert'), safe = require('safetydance'), shell = require('./shell.js'), tasks = require('./tasks.js'), - TaskError = require('./tasks.js').TaskError, updateChecker = require('./updatechecker.js'), util = require('util'); @@ -188,10 +189,18 @@ function updateToLatest(auditSource, callback) { if (!boxUpdateInfo) return callback(new UpdaterError(UpdaterError.ALREADY_UPTODATE, 'No update available')); if (!boxUpdateInfo.sourceTarballUrl) return callback(new UpdaterError(UpdaterError.BAD_STATE, 'No automatic update available')); - tasks.startTask(tasks.TASK_UPDATE, { boxUpdateInfo }, auditSource, function (error, taskId) { - if (error && error.reason === TaskError.BAD_STATE) return callback(new UpdaterError(UpdaterError.BAD_STATE, error.message)); - if (error) return callback(new UpdaterError(UpdaterError.INTERNAL_ERROR, error)); + let error = locker.lock(locker.OP_BOX_UPDATE); + if (error) return callback(error); + let task = tasks.startTask(tasks.TASK_UPDATE, [ boxUpdateInfo ], auditSource); + task.on('error', (error) => callback(new UpdaterError(UpdaterError.INTERNAL_ERROR, error))); + task.on('start', (taskId) => { + eventlog.add(eventlog.ACTION_UPDATE, auditSource, { taskId }); callback(null, taskId); }); + task.on('finish', (error) => { + locker.unlock(locker.OP_BOX_UPDATE); + + debug('Update failed with error', error); + }); }