run upto 5 apptasks in parallel

fixes #482
This commit is contained in:
Girish Ramakrishnan
2015-09-04 22:06:43 -07:00
parent 212d0bd55a
commit b3816615db
2 changed files with 25 additions and 11 deletions
+21 -5
View File
@@ -9,6 +9,7 @@ function Locker() {
this._operation = null;
this._timestamp = null;
this._watcherId = -1;
this._lockDepth = 0; // recursive locks
}
util.inherits(Locker, EventEmitter);
@@ -24,6 +25,7 @@ Locker.prototype.lock = function (operation) {
if (this._operation !== null) return new Error('Already locked for ' + this._operation);
this._operation = operation;
++this._lockDepth;
this._timestamp = new Date();
var that = this;
this._watcherId = setInterval(function () { debug('Lock unreleased %s', that._operation); }, 1000 * 60 * 5);
@@ -35,17 +37,31 @@ Locker.prototype.lock = function (operation) {
return null;
};
Locker.prototype.recursiveLock = function (operation) {
if (this._operation === operation) {
++this._lockDepth;
debug('Re-acquired : %s Depth : %s', this._operation, this._lockDepth);
return null;
}
return this.lock(operation);
};
Locker.prototype.unlock = function (operation) {
assert.strictEqual(typeof operation, 'string');
if (this._operation !== operation) throw new Error('Mismatched unlock. Current lock is for ' + this._operation); // throw because this is a programming error
debug('Released : %s', this._operation);
if (--this._lockDepth === 0) {
debug('Released : %s', this._operation);
this._operation = null;
this._timestamp = null;
clearInterval(this._watcherId);
this._watcherId = -1;
this._operation = null;
this._timestamp = null;
clearInterval(this._watcherId);
this._watcherId = -1;
} else {
debug('Recursive lock released : %s. Depth : %s', this._operation, this._lockDepth);
}
this.emit('unlocked', operation);
+4 -6
View File
@@ -17,10 +17,7 @@ var appdb = require('./appdb.js'),
var gActiveTasks = { };
var gPendingTasks = [ ];
// Task concurrency is 1 for two reasons:
// 1. The backup scripts (app and box) turn off swap after finish disregarding other backup processes
// 2. apptask getFreePort has race with multiprocess
var TASK_CONCURRENCY = 1;
var TASK_CONCURRENCY = 5;
var NOOP_CALLBACK = function (error) { console.error(error); };
function initialize(callback) {
@@ -54,7 +51,8 @@ function uninitialize(callback) {
function startNextTask() {
if (gPendingTasks.length === 0) return;
assert.strictEqual(Object.keys(gActiveTasks).length, 0); // since we allow only one task at a time
assert(Object.keys(gActiveTasks).length < TASK_CONCURRENCY);
startAppTask(gPendingTasks.shift());
}
@@ -63,7 +61,7 @@ function startAppTask(appId) {
assert.strictEqual(typeof appId, 'string');
assert(!(appId in gActiveTasks));
var lockError = locker.lock(locker.OP_APPTASK);
var lockError = locker.recursiveLock(locker.OP_APPTASK);
if (lockError || Object.keys(gActiveTasks).length >= TASK_CONCURRENCY) {
debug('Reached concurrency limit, queueing task for %s', appId);