9f9575f46a
restart service does not rebuild automatically, we should add a route for that. we need to figure where to scale services etc if we randomly create containers like that.
97 lines
3.2 KiB
JavaScript
97 lines
3.2 KiB
JavaScript
'use strict';
|
|
|
|
exports = module.exports = {
|
|
scheduleTask
|
|
};
|
|
|
|
let assert = require('assert'),
|
|
BoxError = require('./boxerror.js'),
|
|
debug = require('debug')('box:apptaskmanager'),
|
|
fs = require('fs'),
|
|
locker = require('./locker.js'),
|
|
safe = require('safetydance'),
|
|
path = require('path'),
|
|
paths = require('./paths.js'),
|
|
scheduler = require('./scheduler.js'),
|
|
services = require('./services.js'),
|
|
tasks = require('./tasks.js');
|
|
|
|
let gActiveTasks = { }; // indexed by app id
|
|
let gPendingTasks = [ ];
|
|
let gInitialized = false;
|
|
|
|
const TASK_CONCURRENCY = 3;
|
|
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
|
|
|
|
function waitText(lockOperation) {
|
|
if (lockOperation === locker.OP_BOX_UPDATE) return 'Waiting for Cloudron to finish updating. See the Settings view';
|
|
if (lockOperation === locker.OP_PLATFORM_START) return 'Waiting for Cloudron to initialize';
|
|
if (lockOperation === locker.OP_FULL_BACKUP) return 'Wait for Cloudron to finish backup. See the Backups view';
|
|
|
|
return ''; // cannot happen
|
|
}
|
|
|
|
function initializeSync() {
|
|
gInitialized = true;
|
|
locker.on('unlocked', startNextTask);
|
|
}
|
|
|
|
// callback is called when task is finished
|
|
function scheduleTask(appId, taskId, options, callback) {
|
|
assert.strictEqual(typeof appId, 'string');
|
|
assert.strictEqual(typeof taskId, 'string');
|
|
assert.strictEqual(typeof options, 'object');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
if (!gInitialized) initializeSync();
|
|
|
|
if (appId in gActiveTasks) {
|
|
return callback(new BoxError(BoxError.CONFLICT, `Task for %s is already active: ${appId}`));
|
|
}
|
|
|
|
if (Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) {
|
|
debug(`Reached concurrency limit, queueing task id ${taskId}`);
|
|
tasks.update(taskId, { percent: 1, message: 'Waiting for other app tasks to complete' }, NOOP_CALLBACK);
|
|
gPendingTasks.push({ appId, taskId, callback });
|
|
return;
|
|
}
|
|
|
|
var lockError = locker.recursiveLock(locker.OP_APPTASK);
|
|
|
|
if (lockError) {
|
|
debug(`Could not get lock. ${lockError.message}, queueing task id ${taskId}`);
|
|
tasks.update(taskId, { percent: 1, message: waitText(lockError.operation) }, NOOP_CALLBACK);
|
|
gPendingTasks.push({ appId, taskId, callback });
|
|
return;
|
|
}
|
|
|
|
gActiveTasks[appId] = {};
|
|
|
|
const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log');
|
|
|
|
if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory
|
|
|
|
scheduler.suspendJobs(appId);
|
|
|
|
tasks.startTask(taskId, Object.assign(options, { logFile }), function (error, result) {
|
|
callback(error, result);
|
|
|
|
delete gActiveTasks[appId];
|
|
locker.unlock(locker.OP_APPTASK); // unlock event will trigger next task
|
|
|
|
// post app task hooks
|
|
services.rebuildService('sftp', error => { if (error) debug('Unable to rebuild sftp:', error); });
|
|
scheduler.resumeJobs(appId);
|
|
});
|
|
}
|
|
|
|
function startNextTask() {
|
|
if (gPendingTasks.length === 0) return;
|
|
|
|
assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY);
|
|
|
|
const t = gPendingTasks.shift();
|
|
scheduleTask(t.appId, t.taskId, t.callback);
|
|
}
|
|
|