88 lines
2.8 KiB
JavaScript
88 lines
2.8 KiB
JavaScript
'use strict';
|
|
|
|
exports = module.exports = {
|
|
scheduleTask: scheduleTask
|
|
};
|
|
|
|
let assert = require('assert'),
|
|
BoxError = require('./boxerror.js'),
|
|
debug = require('debug')('box:apptaskmanager'),
|
|
fs = require('fs'),
|
|
locker = require('./locker.js'),
|
|
safe = require('safetydance'),
|
|
path = require('path'),
|
|
paths = require('./paths.js'),
|
|
tasks = require('./tasks.js');
|
|
|
|
let gActiveTasks = { }; // indexed by app id
|
|
let gPendingTasks = [ ];
|
|
let gInitialized = false;
|
|
|
|
const TASK_CONCURRENCY = 3;
|
|
const NOOP_CALLBACK = function (error) { if (error) debug(error); };
|
|
|
|
function waitText(lockOperation) {
|
|
if (lockOperation === locker.OP_BOX_UPDATE) return 'Waiting for Cloudron to finish updating. See the Settings view';
|
|
if (lockOperation === locker.OP_PLATFORM_START) return 'Waiting for Cloudron to initialize';
|
|
if (lockOperation === locker.OP_FULL_BACKUP) return 'Wait for Cloudron to finish backup. See the Backups view';
|
|
|
|
return ''; // cannot happen
|
|
}
|
|
|
|
function initializeSync() {
|
|
gInitialized = true;
|
|
locker.on('unlocked', startNextTask);
|
|
}
|
|
|
|
// callback is called when task is finished
|
|
function scheduleTask(appId, taskId, callback) {
|
|
assert.strictEqual(typeof appId, 'string');
|
|
assert.strictEqual(typeof taskId, 'string');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
if (!gInitialized) initializeSync();
|
|
|
|
if (appId in gActiveTasks) {
|
|
return callback(new BoxError(BoxError.CONFLICT, `Task for %s is already active: ${appId}`));
|
|
}
|
|
|
|
if (Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) {
|
|
debug(`Reached concurrency limit, queueing task id ${taskId}`);
|
|
tasks.update(taskId, { percent: 1, message: 'Waiting for other app tasks to complete' }, NOOP_CALLBACK);
|
|
gPendingTasks.push({ appId, taskId, callback });
|
|
return;
|
|
}
|
|
|
|
var lockError = locker.recursiveLock(locker.OP_APPTASK);
|
|
|
|
if (lockError) {
|
|
debug(`Could not get lock. ${lockError.message}, queueing task id ${taskId}`);
|
|
tasks.update(taskId, { percent: 1, message: waitText(lockError.operation) }, NOOP_CALLBACK);
|
|
gPendingTasks.push({ appId, taskId, callback });
|
|
return;
|
|
}
|
|
|
|
gActiveTasks[appId] = {};
|
|
|
|
const logFile = path.join(paths.LOG_DIR, appId, 'apptask.log');
|
|
|
|
if (!fs.existsSync(path.dirname(logFile))) safe.fs.mkdirSync(path.dirname(logFile)); // ensure directory
|
|
|
|
tasks.startTask(taskId, { logFile, timeout: 20 * 60 * 60 * 1000 /* 20 hours */ }, function (error, result) {
|
|
callback(error, result);
|
|
|
|
delete gActiveTasks[appId];
|
|
locker.unlock(locker.OP_APPTASK); // unlock event will trigger next task
|
|
});
|
|
}
|
|
|
|
function startNextTask() {
|
|
if (gPendingTasks.length === 0) return;
|
|
|
|
assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY);
|
|
|
|
const t = gPendingTasks.shift();
|
|
scheduleTask(t.appId, t.taskId, t.callback);
|
|
}
|
|
|