diff --git a/src/apps.js b/src/apps.js index 6cbfdd392..27df88875 100644 --- a/src/apps.js +++ b/src/apps.js @@ -59,6 +59,7 @@ exports = module.exports = { var appdb = require('./appdb.js'), appstore = require('./appstore.js'), AppstoreError = require('./appstore.js').AppstoreError, + appTaskManager = require('./apptaskmanager.js'), assert = require('assert'), async = require('async'), backups = require('./backups.js'), @@ -575,10 +576,10 @@ function mailboxNameForLocation(location, manifest) { return (location ? location : manifest.title.toLowerCase().replace(/[^a-zA-Z0-9]/g, '')) + '.app'; } -function startAppTask(appId, args, callback) { - const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log'); - - if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory +function scheduleTask(appId, args, callback) { + assert.strictEqual(typeof appId, 'string'); + assert.strictEqual(typeof args, 'object'); + assert.strictEqual(typeof callback, 'function'); tasks.add(tasks.TASK_APP, [ appId, args ], function (error, taskId) { if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); @@ -586,7 +587,7 @@ function startAppTask(appId, args, callback) { appdb.update(appId, { taskId: taskId }, function (error) { if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - tasks.startTask(taskId, { logFile }, function (error) { + appTaskManager.scheduleTask(appId, taskId, function (error) { if (error && (error.crashed || error.stopped)) { debug(`Apptask crashed/stopped: ${error.message}`); appdb.update(appId, { installationState: appdb.ISTATE_ERROR, errorMessage: error.message, taskId: null }, NOOP_CALLBACK); @@ -727,7 +728,7 @@ function install(data, user, auditSource, callback) { const restoreConfig = backupId ? { backupId: backupId, backupFormat: backupFormat } : null; - startAppTask(appId, { restoreConfig }, function (error, result) { + scheduleTask(appId, { restoreConfig }, function (error, result) { if (error) return callback(error); eventlog.add(eventlog.ACTION_APP_INSTALL, auditSource, { appId: appId, app: result }); @@ -873,7 +874,7 @@ function configure(appId, data, user, auditSource, callback) { if (error && error.reason === DatabaseError.NOT_FOUND) return callback(new AppsError(AppsError.BAD_STATE)); if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - startAppTask(appId, { oldConfig: getAppConfig(app) }, function (error, result) { + scheduleTask(appId, { oldConfig: getAppConfig(app) }, function (error, result) { if (error) return callback(error); eventlog.add(eventlog.ACTION_APP_CONFIGURE, auditSource, { appId: appId, app: result }); @@ -949,7 +950,7 @@ function update(appId, data, auditSource, callback) { if (error && error.reason === DatabaseError.NOT_FOUND) return callback(new AppsError(AppsError.BAD_STATE)); // might be a bad guess if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - startAppTask(appId, { updateConfig: updateConfig }, function (error, result) { + scheduleTask(appId, { updateConfig: updateConfig }, function (error, result) { if (error) return callback(error); eventlog.add(eventlog.ACTION_APP_UPDATE, auditSource, { appId: appId, toManifest: manifest, fromManifest: app.manifest, force: data.force, app: app }); @@ -1052,7 +1053,7 @@ function restore(appId, data, auditSource, callback) { if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); const restoreConfig = data.backupId ? { backupId: data.backupId, backupFormat: backupInfo.format, oldManifest: app.manifest } : null; // when null, apptask simply reinstalls - startAppTask(appId, { restoreConfig }, function (error, result) { + scheduleTask(appId, { restoreConfig }, function (error, result) { if (error) return callback(error); eventlog.add(eventlog.ACTION_APP_RESTORE, auditSource, { app: app, backupId: backupInfo.id, fromManifest: app.manifest, toManifest: backupInfo.manifest }); @@ -1162,7 +1163,7 @@ function clone(appId, data, user, auditSource, callback) { const restoreConfig = { backupId: backupId, backupFormat: backupInfo.format }; - startAppTask(newAppId, { restoreConfig }, function (error, result) { + scheduleTask(newAppId, { restoreConfig }, function (error, result) { if (error) return callback(error); eventlog.add(eventlog.ACTION_APP_CLONE, auditSource, { appId: newAppId, oldAppId: appId, backupId: backupId, oldApp: app, newApp: result }); @@ -1197,7 +1198,7 @@ function uninstall(appId, auditSource, callback) { if (error && error.reason === DatabaseError.NOT_FOUND) return callback(new AppsError(AppsError.NOT_FOUND, 'No such app')); if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - startAppTask(appId, {}, function (error, result) { + scheduleTask(appId, {}, function (error, result) { if (error) return callback(error); eventlog.add(eventlog.ACTION_APP_UNINSTALL, auditSource, { appId: appId, app: result }); @@ -1219,7 +1220,7 @@ function start(appId, callback) { if (error && error.reason === DatabaseError.NOT_FOUND) return callback(new AppsError(AppsError.BAD_STATE)); // might be a bad guess if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - startAppTask(appId, {}, callback); + scheduleTask(appId, {}, callback); }); } @@ -1233,7 +1234,7 @@ function stop(appId, callback) { if (error && error.reason === DatabaseError.NOT_FOUND) return callback(new AppsError(AppsError.BAD_STATE)); // might be a bad guess if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - startAppTask(appId, {}, callback); + scheduleTask(appId, {}, callback); }); } @@ -1381,7 +1382,7 @@ function backup(appId, callback) { if (error && error.reason === DatabaseError.NOT_FOUND) return callback(new AppsError(AppsError.BAD_STATE)); // might be a bad guess if (error) return callback(new AppsError(AppsError.INTERNAL_ERROR, error)); - startAppTask(appId, { }, (error, result) => { + scheduleTask(appId, { }, (error, result) => { if (error) return callback(error); callback(null, { taskId: result.taskId }); diff --git a/src/apptaskmanager.js b/src/apptaskmanager.js new file mode 100644 index 000000000..745a270b8 --- /dev/null +++ b/src/apptaskmanager.js @@ -0,0 +1,103 @@ +'use strict'; + +exports = module.exports = { + resumeTasks: resumeTasks, + scheduleTask: scheduleTask +}; + +var appdb = require('./appdb.js'), + apps = require('./apps.js'), + assert = require('assert'), + debug = require('debug')('box:taskmanager'), + fs = require('fs'), + locker = require('./locker.js'), + safe = require('safetydance'), + path = require('path'), + paths = require('./paths.js'), + tasks = require('./tasks.js'); + +let gActiveTasks = { }; // indexed by app id +let gPendingTasks = [ ]; +let gPaused = true; // tracks if the platform is ready + +const TASK_CONCURRENCY = 3; +const NOOP_CALLBACK = function (error) { if (error) debug(error); }; + +// callback is called when task is finished +function scheduleTask(appId, taskId, callback) { + assert.strictEqual(typeof appId, 'string'); + assert.strictEqual(typeof taskId, 'string'); + assert.strictEqual(typeof callback, 'function'); + + if (appId in gActiveTasks) { + return callback(new Error(`Task for %s is already active: ${appId}`)); + } + + if (gPaused) { + debug(`Platform not ready yet, queueing task id ${taskId}`); + gPendingTasks.push({ appId, taskId, callback }); + return; + } + + if (Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) { + debug(`Reached concurrency limit, queueing task id ${taskId}`); + gPendingTasks.push({ appId, taskId, callback }); + return; + } + + var lockError = locker.recursiveLock(locker.OP_APPTASK); + + if (lockError) { + debug(`Locked for another operation, queueing task id ${taskId}`); + gPendingTasks.push({ appId, taskId, callback }); + return; + } + + gActiveTasks[appId] = {}; + + const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log'); + + if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory + + tasks.startTask(taskId, { logFile }, function (error, result) { + callback(error, result); + + delete gActiveTasks[appId]; + locker.unlock(locker.OP_APPTASK); // unlock event will trigger next task + }); +} + +function startNextTask() { + if (gPendingTasks.length === 0) return; + + assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY); + + const t = gPendingTasks.shift(); + scheduleTask(t.appId, t.taskId, t.callback); +} + +// resume app tasks when platform is ready or after a crash +function resumeTasks(callback) { + assert.strictEqual(typeof callback, 'function'); + + debug('resuming tasks'); + + locker.on('unlocked', startNextTask); + + gPaused = false; + + apps.getAll(function (error, result) { + if (error) return callback(error); + + result.forEach(function (app) { + if (app.installationState === appdb.ISTATE_INSTALLED && app.runState === appdb.RSTATE_RUNNING) return; + if (app.installationState === appdb.ISTATE_ERROR) return; + + debug(`resumeTask: starting app task for ${app.fqdn} ${app.id} and state ${app.installationState}`); + + scheduleTask(app.id, app.taskId, NOOP_CALLBACK); + }); + + callback(null); + }); +} diff --git a/src/auditsource.js b/src/auditsource.js index 82073f2a9..946938bd2 100644 --- a/src/auditsource.js +++ b/src/auditsource.js @@ -4,7 +4,6 @@ exports = module.exports = { CRON: { userId: null, username: 'cron' }, HEALTH_MONITOR: { userId: null, username: 'healthmonitor' }, SYSADMIN: { userId: null, username: 'sysadmin' }, - TASK_MANAGER: { userId: null, username: 'taskmanager' }, APP_TASK: { userId: null, username: 'apptask' }, fromRequest: fromRequest diff --git a/src/platform.js b/src/platform.js index 0383ed8f7..93e4516dc 100644 --- a/src/platform.js +++ b/src/platform.js @@ -10,6 +10,7 @@ exports = module.exports = { var addons = require('./addons.js'), apps = require('./apps.js'), + appTaskManager = require('./apptaskmanager.js'), assert = require('assert'), async = require('async'), debug = require('debug')('box:platform'), @@ -23,7 +24,7 @@ var addons = require('./addons.js'), settings = require('./settings.js'), sftp = require('./sftp.js'), shell = require('./shell.js'), - taskmanager = require('./taskmanager.js'), + tasks = require('./tasks.js'), _ = require('underscore'); var NOOP_CALLBACK = function (error) { if (error) debug(error); }; @@ -75,13 +76,13 @@ function start(callback) { } function stop(callback) { - taskmanager.pauseTasks(callback); + tasks.stopAllTasks(callback); } function onPlatformReady() { debug('onPlatformReady: platform is ready'); exports._isReady = true; - taskmanager.resumeTasks(); + appTaskManager.resumeTasks(NOOP_CALLBACK); applyPlatformConfig(NOOP_CALLBACK); pruneInfraImages(NOOP_CALLBACK); diff --git a/src/taskmanager.js b/src/taskmanager.js deleted file mode 100644 index 38c97de2e..000000000 --- a/src/taskmanager.js +++ /dev/null @@ -1,202 +0,0 @@ -'use strict'; - -exports = module.exports = { - resumeTasks: resumeTasks, - pauseTasks: pauseTasks, - - stopAppTask: stopAppTask, - startAppTask: startAppTask, - restartAppTask: restartAppTask, - - // exported for testing - _stopPendingTasks: stopPendingTasks, - _waitForPendingTasks: waitForPendingTasks -}; - -var appdb = require('./appdb.js'), - apps = require('./apps.js'), - assert = require('assert'), - async = require('async'), - auditSource = require('./auditsource.js'), - child_process = require('child_process'), - debug = require('debug')('box:taskmanager'), - fs = require('fs'), - locker = require('./locker.js'), - mkdirp = require('mkdirp'), - path = require('path'), - paths = require('./paths.js'), - eventlog = require('./eventlog.js'), - util = require('util'), - _ = require('underscore'); - -var gActiveTasks = { }; -var gPendingTasks = [ ]; - -var TASK_CONCURRENCY = 3; -var NOOP_CALLBACK = function (error) { if (error) debug(error); }; -var gPaused = true; - -// resume app tasks when platform is ready or after a crash -function resumeTasks(callback) { - callback = callback || NOOP_CALLBACK; - - debug('resuming tasks'); - - locker.on('unlocked', startNextTask); - - gPaused = false; - - apps.getAll(function (error, result) { - if (error) return callback(error); - - result.forEach(function (app) { - if (app.installationState === appdb.ISTATE_INSTALLED && app.runState === appdb.RSTATE_RUNNING) return; - if (app.installationState === appdb.ISTATE_ERROR) return; - - debug('Creating process for %s (%s) with state %s', app.fqdn, app.id, app.installationState); - restartAppTask(app.id, NOOP_CALLBACK); // restart because the auto-installer could have queued up tasks already - }); - - callback(null); - }); -} - -function pauseTasks(callback) { - assert.strictEqual(typeof callback, 'function'); - - gPendingTasks = [ ]; // clear this first, otherwise stopAppTask will resume them - - locker.removeListener('unlocked', startNextTask); - - gPaused = true; - - async.eachSeries(Object.keys(gActiveTasks), stopAppTask, callback); -} - -function stopPendingTasks(callback) { - assert.strictEqual(typeof callback, 'function'); - - gPendingTasks = []; - - async.eachSeries(Object.keys(gActiveTasks), stopAppTask, callback); -} - -function waitForPendingTasks(callback) { - assert.strictEqual(typeof callback, 'function'); - - function checkTasks() { - if (Object.keys(gActiveTasks).length === 0 && gPendingTasks.length === 0) return callback(); - setTimeout(checkTasks, 1000); - } - - checkTasks(); -} - -function startNextTask() { - if (gPendingTasks.length === 0) return; - - assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY); - - startAppTask(gPendingTasks.shift(), NOOP_CALLBACK); -} - -// WARNING callback has to be called in sync for the concurrency check to work! -function startAppTask(appId, callback) { - assert.strictEqual(typeof appId, 'string'); - assert.strictEqual(typeof callback, 'function'); - - if (appId in gActiveTasks) { - return callback(new Error(util.format('Task for %s is already active', appId))); - } - - if (gPaused) { - debug('Platform not ready yet, queueing task for %s', appId); - gPendingTasks.push(appId); - return callback(); - } - - if (Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) { - debug('Reached concurrency limit, queueing task for %s', appId); - gPendingTasks.push(appId); - return callback(); - } - - var lockError = locker.recursiveLock(locker.OP_APPTASK); - - if (lockError) { - debug('Locked for another operation, queueing task for %s', appId); - gPendingTasks.push(appId); - return callback(); - } - - var logFilePath = path.join(paths.LOG_DIR, appId, 'apptask.log'); - var fd; - - // have to use sync here to avoid async callback, breaking concurrency check - try { - mkdirp.sync(path.join(paths.LOG_DIR, appId)); // ensure log folder - fd = fs.openSync(logFilePath, 'a'); // will autoclose - } catch (e) { - debug('Unable to get log filedescriptor, queueing task for %s', appId, e); - gPendingTasks.push(appId); - return callback(); - } - - // when running tests, we have to inject the DEBUG env. in cloudron, the value is inherited - const env = process.env.BOX_ENV === 'test' ? _.extend({}, process.env, { DEBUG: 'box*,connect-lastmile' }) : process.env; - - // when parent process dies, apptask processes are killed because KillMode=control-group in systemd unit file - gActiveTasks[appId] = child_process.fork(__dirname + '/apptask.js', [ appId ], { stdio: [ 'pipe', fd, fd, 'ipc' ], env: env }); - - var pid = gActiveTasks[appId].pid; - debug('Started task of %s pid: %s. See logs at %s', appId, pid, logFilePath); - - eventlog.add(eventlog.ACTION_APP_TASK_START, auditSource.TASK_MANAGER, { appId: appId, logFile: logFilePath }, NOOP_CALLBACK); - - gActiveTasks[appId].once('exit', function (code, signal) { - debug('Task for %s pid %s completed with status %s', appId, pid, code); - if (code === null /* signal */ || (code !== 0 && code !== 50)) { // apptask crashed - debug('Apptask crashed with code %s and signal %s', code, signal); - appdb.update(appId, { installationState: appdb.ISTATE_ERROR, errorMessage: 'Apptask crashed with code ' + code + ' and signal ' + signal }, NOOP_CALLBACK); - eventlog.add(eventlog.ACTION_APP_TASK_CRASH, auditSource.TASK_MANAGER, { appId: appId, crashLogFile: logFilePath }, NOOP_CALLBACK); - } else if (code === 50) { // task exited cleanly but with an error - eventlog.add(eventlog.ACTION_APP_TASK_CRASH, auditSource.TASK_MANAGER, { appId: appId, crashLogFile: logFilePath }, NOOP_CALLBACK); - } else { - eventlog.add(eventlog.ACTION_APP_TASK_SUCCESS, auditSource.TASK_MANAGER, { appId: appId, logFile: logFilePath }, NOOP_CALLBACK); - } - delete gActiveTasks[appId]; - locker.unlock(locker.OP_APPTASK); // unlock event will trigger next task - }); - - callback(); -} - -function stopAppTask(appId, callback) { - assert.strictEqual(typeof appId, 'string'); - assert.strictEqual(typeof callback, 'function'); - - if (gActiveTasks[appId]) { - debug('stopAppTask : Killing existing task of %s with pid %s', appId, gActiveTasks[appId].pid); - gActiveTasks[appId].once('exit', function () { callback(); }); - gActiveTasks[appId].kill('SIGTERM'); // this will end up calling the 'exit' handler - return; - } - - if (gPendingTasks.indexOf(appId) !== -1) { - debug('stopAppTask: Removing pending task : %s', appId); - gPendingTasks = _.without(gPendingTasks, appId); - } else { - debug('stopAppTask: no task for %s to be stopped', appId); - } - - callback(); -} - -function restartAppTask(appId, callback) { - callback = callback || NOOP_CALLBACK; - - async.series([ - stopAppTask.bind(null, appId), - startAppTask.bind(null, appId) - ], callback); -} diff --git a/src/tasks.js b/src/tasks.js index b62f13c10..d58f8c670 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -10,6 +10,7 @@ exports = module.exports = { startTask: startTask, stopTask: stopTask, + stopAllTasks: stopAllTasks, removePrivateFields: removePrivateFields, @@ -31,6 +32,7 @@ exports = module.exports = { }; let assert = require('assert'), + async = require('async'), child_process = require('child_process'), DatabaseError = require('./databaseerror.js'), debug = require('debug')('box:tasks'), @@ -169,6 +171,14 @@ function stopTask(id, callback) { callback(null); } +function stopAllTasks(callback) { + assert.strictEqual(typeof callback, 'function'); + + async.eachSeries(Object.keys(gTasks), function (id, iteratorDone) { + stopTask(id, () => iteratorDone()); // ignore any error + }, callback); +} + function listByTypePaged(type, page, perPage, callback) { assert(typeof type === 'string' || type === null); assert.strictEqual(typeof page, 'number');