Add taskworker that runs funcs out of process

This commit is contained in:
Girish Ramakrishnan
2018-12-09 03:20:00 -08:00
parent 3688371ce8
commit d43106b0af
11 changed files with 108 additions and 137 deletions
+24 -1
View File
@@ -10,6 +10,7 @@ exports = module.exports = {
get: get,
startBackupTask: startBackupTask,
ensureBackup: ensureBackup,
restore: restore,
@@ -44,7 +45,10 @@ var addons = require('./addons.js'),
database = require('./database.js'),
DatabaseError = require('./databaseerror.js'),
debug = require('debug')('box:backups'),
eventlog = require('./eventlog.js'),
fs = require('fs'),
locker = require('./locker.js'),
mailer = require('./mailer.js'),
mkdirp = require('mkdirp'),
once = require('once'),
path = require('path'),
@@ -918,6 +922,25 @@ function backupBoxAndApps(progressCallback, callback) {
});
}
function startBackupTask(auditSource, callback) {
let error = locker.lock(locker.OP_FULL_BACKUP);
if (error) return callback(error);
let task = tasks.startTask(tasks.TASK_BACKUP, [], auditSource);
task.on('error', (error) => callback(new BackupsError(BackupsError.INTERNAL_ERROR, error)));
task.on('start', (taskId) => {
eventlog.add(eventlog.ACTION_BACKUP_START, auditSource, { taskId });
callback(null, taskId);
});
task.on('finish', (error, result) => {
locker.unlock(locker.OP_FULL_BACKUP);
if (error) mailer.backupFailed(error);
eventlog.add(eventlog.ACTION_BACKUP_FINISH, auditSource, { errorMessage: error ? error.message : null, backupId: result });
});
}
function ensureBackup(auditSource, callback) {
assert.strictEqual(typeof auditSource, 'object');
@@ -937,7 +960,7 @@ function ensureBackup(auditSource, callback) {
return callback(null);
}
tasks.startTask(tasks.TASK_BACKUP, {}, auditSource, callback);
startBackupTask(auditSource, callback);
});
});
}
+3 -7
View File
@@ -9,9 +9,7 @@ var backupdb = require('../backupdb.js'),
backups = require('../backups.js'),
BackupsError = require('../backups.js').BackupsError,
HttpError = require('connect-lastmile').HttpError,
HttpSuccess = require('connect-lastmile').HttpSuccess,
tasks = require('../tasks.js'),
TasksError = require('../tasks.js').TasksError;
HttpSuccess = require('connect-lastmile').HttpSuccess;
function auditSource(req) {
var ip = req.headers['x-forwarded-for'] || req.connection.remoteAddress || null;
@@ -34,10 +32,8 @@ function list(req, res, next) {
}
function startBackup(req, res, next) {
// note that cloudron.backup only waits for backup initiation and not for backup to complete
// backup progress can be checked up ny polling the progress api call
tasks.startTask(tasks.TASK_BACKUP, {}, auditSource(req), function (error, taskId) {
if (error && error.reason === TasksError.BAD_STATE) return next(new HttpError(409, error.message));
backups.startBackupTask(auditSource(req), function (error, taskId) {
if (error && error.reason === BackupsError.BAD_STATE) return next(new HttpError(409, error.message));
if (error) return next(new HttpError(500, error));
next(new HttpSuccess(202, { taskId }));
+4 -4
View File
@@ -11,12 +11,12 @@ exports = module.exports = {
var apps = require('../apps.js'),
AppsError = apps.AppsError,
addons = require('../addons.js'),
backups = require('../backups.js'),
BackupsError = require('../backups.js').BackupsError,
cloudron = require('../cloudron.js'),
debug = require('debug')('box:routes/sysadmin'),
HttpError = require('connect-lastmile').HttpError,
HttpSuccess = require('connect-lastmile').HttpSuccess,
tasks = require('../tasks.js'),
TasksError = require('../tasks.js').TasksError,
updater = require('../updater.js'),
UpdaterError = require('../updater.js').UpdaterError;
@@ -26,8 +26,8 @@ function backup(req, res, next) {
// note that cloudron.backup only waits for backup initiation and not for backup to complete
// backup progress can be checked up ny polling the progress api call
var auditSource = { userId: null, username: 'sysadmin' };
tasks.startTask(tasks.TASK_BACKUP, {}, auditSource, function (error, taskId) {
if (error && error.reason === TasksError.BAD_STATE) return next(new HttpError(409, error.message));
backups.startBackupTask(auditSource, function (error, taskId) {
if (error && error.reason === BackupsError.BAD_STATE) return next(new HttpError(409, error.message));
if (error) return next(new HttpError(500, error));
next(new HttpSuccess(202, { taskId }));
+1 -1
View File
@@ -19,7 +19,7 @@ function postProcess(result) {
assert.strictEqual(typeof result, 'object');
assert(result.argsJson === null || typeof result.argsJson === 'string');
result.args = safe.JSON.parse(result.argsJson) || {};
result.args = safe.JSON.parse(result.argsJson) || [];
delete result.argsJson;
result.id = String(result.id);
+17 -46
View File
@@ -12,7 +12,7 @@ exports = module.exports = {
TaskError: TaskError,
// task types
// task types. if you add a task here, fill up the function table in taskworker
TASK_BACKUP: 'backup',
TASK_UPDATE: 'update',
TASK_MIGRATE: 'migrate'
@@ -22,36 +22,16 @@ let assert = require('assert'),
child_process = require('child_process'),
DatabaseError = require('./databaseerror.js'),
debug = require('debug')('box:tasks'),
eventlog = require('./eventlog.js'),
locker = require('./locker.js'),
mailer = require('./mailer.js'),
EventEmitter = require('events'),
paths = require('./paths.js'),
safe = require('safetydance'),
spawn = require('child_process').spawn,
split = require('split'),
taskdb = require('./taskdb.js'),
util = require('util'),
_ = require('underscore');
util = require('util');
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
const TASKS = { // indexed by task type
backup: {
lock: locker.OP_FULL_BACKUP,
program: __dirname + '/tasks/backuptask.js',
onFailure: mailer.backupFailed,
startEventId: eventlog.ACTION_BACKUP_START,
finishEventId: eventlog.ACTION_BACKUP_FINISH
},
update: {
lock: locker.OP_BOX_UPDATE,
program: __dirname + '/tasks/updatertask.js',
onFailure: NOOP_CALLBACK,
startEventId: eventlog.ACTION_UPDATE,
finishEventId: eventlog.ACTION_UPDATE
}
};
let gTasks = {}; // indexed by task id
function TaskError(reason, errorOrMessage) {
@@ -106,33 +86,26 @@ function update(id, task, callback) {
});
}
function startTask(type, args, auditSource, callback) {
function startTask(type, args, auditSource) {
assert.strictEqual(typeof type, 'string');
assert(args && typeof args === 'object');
assert(Array.isArray(args));
assert.strictEqual(typeof auditSource, 'object');
assert.strictEqual(typeof callback, 'function');
const taskInfo = TASKS[type];
if (!taskInfo) return callback(new TaskError(TaskError.NOT_FOUND, 'No such task'));
let error = locker.lock(taskInfo.lock);
if (error) return callback(new TaskError(TaskError.BAD_STATE, error.message));
let events = new EventEmitter();
taskdb.add({ type: type, percent: 0, message: 'Starting', args: args }, function (error, taskId) {
if (error) return callback(new TaskError(TaskError.INTERNAL_ERROR, error));
if (error) return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error));
let fd = safe.fs.openSync(`${paths.TASKS_LOG_DIR}/${taskId}.log`, 'a'); // will autoclose
const logFile = `${paths.TASKS_LOG_DIR}/${taskId}.log`;
let fd = safe.fs.openSync(logFile, 'a'); // will autoclose
if (!fd) {
debug(`startTask: unable to get log filedescriptor ${safe.error.message}`);
locker.unlock(taskInfo.lock);
return callback(new TaskError(TaskError.INTERNAL_ERROR, error.message));
return events.emit('error', new TaskError(TaskError.INTERNAL_ERROR, error.message));
}
debug(`startTask - starting task ${type}. logs at ${taskInfo.logFile}. id ${taskId}`);
debug(`startTask - starting task ${type}. logs at ${logFile} id ${taskId}`);
eventlog.add(taskInfo.startEventId, auditSource, args);
gTasks[taskId] = child_process.fork(taskInfo.program, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc
gTasks[taskId] = child_process.fork(`${__dirname}/taskworker.js`, [ taskId ], { stdio: [ 'pipe', fd, fd, 'ipc' ]}); // fork requires ipc
gTasks[taskId].once('exit', function (code, signal) {
debug(`startTask: ${taskId} completed with code ${code} and signal ${signal}`);
@@ -144,20 +117,18 @@ function startTask(type, args, auditSource, callback) {
error = new Error(task.errorMessage);
}
eventlog.add(taskInfo.finishEventId, auditSource, _.extend({ errorMessage: error ? error.message : null }, task ? task.result : {}));
locker.unlock(taskInfo.lock);
if (error) taskInfo.onFailure(error);
gTasks[taskId] = null;
events.emit('finish', error, task.result);
debug(`startTask: ${taskId} done`);
});
});
callback(null, taskId);
events.emit('start', taskId);
});
return events;
}
function stopTask(id, auditSource, callback) {
-30
View File
@@ -1,30 +0,0 @@
'use strict';
require('supererror')({ splatchError: true });
var assert = require('assert'),
backups = require('../backups.js'),
database = require('../database.js'),
debug = require('debug')('box:backuptask'),
tasks = require('../tasks.js');
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
process.on('SIGTERM', function () {
process.exit(0);
});
assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument');
const taskId = process.argv[2];
// Main process starts here
debug('Staring backup');
database.initialize(function (error) {
if (error) return process.exit(50);
backups.backupBoxAndApps((progress) => tasks.update(taskId, progress, NOOP_CALLBACK), function (error, backupId) {
const progress = { percent: 100, result: backupId || '', errorMessage: error ? error.message : '' };
tasks.update(taskId, progress, () => process.exit(error ? 50 : 0));
});
});
-42
View File
@@ -1,42 +0,0 @@
'use strict';
require('supererror')({ splatchError: true });
let assert = require('assert'),
database = require('../database.js'),
debug = require('debug')('box:updatertask'),
tasks = require('../tasks.js'),
updater = require('../updater.js');
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
process.on('SIGTERM', function () {
process.exit(0);
});
function exit(error) {
if (!error) process.exit(0);
debug(error);
process.exit(50);
}
assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument');
const taskId = process.argv[2];
// Main process starts here
debug('Staring update');
database.initialize(function (error) {
if (error) return exit(error);
tasks.get(taskId, function (error, result) {
if (error) return exit(error);
if (!result.args.boxUpdateInfo) return exit(new Error('Invalid args:' + JSON.stringify(result)));
updater.update(result.args.boxUpdateInfo, (progress) => tasks.update(taskId, progress, NOOP_CALLBACK), function (updateError) {
const progress = { percent: 100, errorMessage: updateError ? updateError.message : '' };
tasks.update(taskId, progress, () => exit(updateError));
});
});
});
+44
View File
@@ -0,0 +1,44 @@
'use strict';
require('supererror')({ splatchError: true });
var assert = require('assert'),
backups = require('./backups.js'),
database = require('./database.js'),
debug = require('debug')('box:taskworker'),
tasks = require('./tasks.js'),
updater = require('./updater.js');
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
const TASKS = { // indexed by task type
backup: backups.backupBoxAndApps,
updater: updater.update,
};
process.on('SIGTERM', function () {
process.exit(0);
});
assert.strictEqual(process.argv.length, 3, 'Pass the taskid as argument');
const taskId = process.argv[2];
// Main process starts here
debug(`Staring task ${taskId}`);
database.initialize(function (error) {
if (error) return process.exit(50);
tasks.get(taskId, function (error, task) {
if (error) return process.exit(50);
const progressCallback = (progress) => tasks.update(taskId, progress, NOOP_CALLBACK);
const resultCallback = (error, result) => {
const progress = { percent: 100, result: result || '', errorMessage: error ? error.message : '' };
tasks.update(taskId, progress, () => process.exit(error ? 50 : 0));
};
TASKS[task.type].apply(null, task.args.concat(progressCallback).concat(resultCallback));
});
});
+1 -1
View File
@@ -23,7 +23,7 @@ var async = require('async'),
tasks = require('../tasks.js');
function createBackup(callback) {
tasks.startTask(tasks.TASK_BACKUP, [], { username: 'test' }, function (error, taskId) { // this call does not wait for the backup!
backups.startBackupTask({ username: 'test' }, function (error, taskId) { // this call does not wait for the backup!
if (error) return callback(error);
function waitForBackup() {
+1 -1
View File
@@ -13,7 +13,7 @@ cd $HOME/.cloudron_test
mkdir -p configs
mkdir -p appsdata
mkdir -p boxdata/appicons boxdata/mail boxdata/certs boxdata/mail/dkim/localhost boxdata/mail/dkim/foobar.com
mkdir -p platformdata/addons/mail platformdata/nginx/cert platformdata/nginx/applications platformdata/collectd/collectd.conf.d platformdata/addons platformdata/logrotate.d platformdata/backup platformdata/logs/backup
mkdir -p platformdata/addons/mail platformdata/nginx/cert platformdata/nginx/applications platformdata/collectd/collectd.conf.d platformdata/addons platformdata/logrotate.d platformdata/backup platformdata/logs/tasks
# put cert
openssl req -x509 -newkey rsa:2048 -keyout platformdata/nginx/cert/host.key -out platformdata/nginx/cert/host.cert -days 3650 -subj '/CN=localhost' -nodes -config <(cat /etc/ssl/openssl.cnf <(printf "\n[SAN]\nsubjectAltName=DNS:*.localhost"))
+13 -4
View File
@@ -14,6 +14,8 @@ var assert = require('assert'),
config = require('./config.js'),
crypto = require('crypto'),
debug = require('debug')('box:updater'),
eventlog = require('./eventlog.js'),
locker = require('./locker.js'),
mkdirp = require('mkdirp'),
os = require('os'),
path = require('path'),
@@ -21,7 +23,6 @@ var assert = require('assert'),
safe = require('safetydance'),
shell = require('./shell.js'),
tasks = require('./tasks.js'),
TaskError = require('./tasks.js').TaskError,
updateChecker = require('./updatechecker.js'),
util = require('util');
@@ -188,10 +189,18 @@ function updateToLatest(auditSource, callback) {
if (!boxUpdateInfo) return callback(new UpdaterError(UpdaterError.ALREADY_UPTODATE, 'No update available'));
if (!boxUpdateInfo.sourceTarballUrl) return callback(new UpdaterError(UpdaterError.BAD_STATE, 'No automatic update available'));
tasks.startTask(tasks.TASK_UPDATE, { boxUpdateInfo }, auditSource, function (error, taskId) {
if (error && error.reason === TaskError.BAD_STATE) return callback(new UpdaterError(UpdaterError.BAD_STATE, error.message));
if (error) return callback(new UpdaterError(UpdaterError.INTERNAL_ERROR, error));
let error = locker.lock(locker.OP_BOX_UPDATE);
if (error) return callback(error);
let task = tasks.startTask(tasks.TASK_UPDATE, [ boxUpdateInfo ], auditSource);
task.on('error', (error) => callback(new UpdaterError(UpdaterError.INTERNAL_ERROR, error)));
task.on('start', (taskId) => {
eventlog.add(eventlog.ACTION_UPDATE, auditSource, { taskId });
callback(null, taskId);
});
task.on('finish', (error) => {
locker.unlock(locker.OP_BOX_UPDATE);
debug('Update failed with error', error);
});
}