database: rework connection logic
This commit is contained in:
+18
-75
@@ -14,7 +14,6 @@ exports = module.exports = {
|
|||||||
|
|
||||||
var assert = require('assert'),
|
var assert = require('assert'),
|
||||||
async = require('async'),
|
async = require('async'),
|
||||||
BoxError = require('./boxerror.js'),
|
|
||||||
child_process = require('child_process'),
|
child_process = require('child_process'),
|
||||||
constants = require('./constants.js'),
|
constants = require('./constants.js'),
|
||||||
debug = require('debug')('box:database'),
|
debug = require('debug')('box:database'),
|
||||||
@@ -22,8 +21,7 @@ var assert = require('assert'),
|
|||||||
once = require('once'),
|
once = require('once'),
|
||||||
util = require('util');
|
util = require('util');
|
||||||
|
|
||||||
var gConnectionPool = null,
|
var gConnectionPool = null;
|
||||||
gDefaultConnection = null;
|
|
||||||
|
|
||||||
const gDatabase = {
|
const gDatabase = {
|
||||||
hostname: '127.0.0.1',
|
hostname: '127.0.0.1',
|
||||||
@@ -43,14 +41,16 @@ function initialize(callback) {
|
|||||||
gDatabase.hostname = require('child_process').execSync('docker inspect -f "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}" mysql-server').toString().trim();
|
gDatabase.hostname = require('child_process').execSync('docker inspect -f "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}" mysql-server').toString().trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// https://github.com/mysqljs/mysql#pool-options
|
||||||
gConnectionPool = mysql.createPool({
|
gConnectionPool = mysql.createPool({
|
||||||
connectionLimit: 5, // this has to be > 1 since we store one connection as 'default'. the rest for transactions
|
connectionLimit: 5,
|
||||||
host: gDatabase.hostname,
|
host: gDatabase.hostname,
|
||||||
user: gDatabase.username,
|
user: gDatabase.username,
|
||||||
password: gDatabase.password,
|
password: gDatabase.password,
|
||||||
port: gDatabase.port,
|
port: gDatabase.port,
|
||||||
database: gDatabase.name,
|
database: gDatabase.name,
|
||||||
multipleStatements: false,
|
multipleStatements: false,
|
||||||
|
waitForConnections: true, // getConnection() will wait until a connection is avaiable
|
||||||
ssl: false,
|
ssl: false,
|
||||||
timezone: 'Z' // mysql follows the SYSTEM timezone. on Cloudron, this is UTC
|
timezone: 'Z' // mysql follows the SYSTEM timezone. on Cloudron, this is UTC
|
||||||
});
|
});
|
||||||
@@ -58,55 +58,20 @@ function initialize(callback) {
|
|||||||
gConnectionPool.on('connection', function (connection) {
|
gConnectionPool.on('connection', function (connection) {
|
||||||
// connection objects are re-used. so we have to attach to the event here (once) to prevent crash
|
// connection objects are re-used. so we have to attach to the event here (once) to prevent crash
|
||||||
// note the pool also has an 'acquire' event but that is called whenever we do a getConnection()
|
// note the pool also has an 'acquire' event but that is called whenever we do a getConnection()
|
||||||
connection.on('error', (error) => debug(`Connection ${connection.threadId} error: ${error.message}`));
|
connection.on('error', (error) => debug(`Connection ${connection.threadId} error: ${error.message} ${error.code}`));
|
||||||
|
|
||||||
connection.query('USE ' + gDatabase.name);
|
connection.query('USE ' + gDatabase.name);
|
||||||
connection.query('SET SESSION sql_mode = \'strict_all_tables\'');
|
connection.query('SET SESSION sql_mode = \'strict_all_tables\'');
|
||||||
});
|
});
|
||||||
|
|
||||||
reconnect(callback);
|
callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
function uninitialize(callback) {
|
function uninitialize(callback) {
|
||||||
if (gConnectionPool) {
|
if (!gConnectionPool) return callback(null);
|
||||||
|
|
||||||
gConnectionPool.end(callback);
|
gConnectionPool.end(callback);
|
||||||
gDefaultConnection = null;
|
|
||||||
gConnectionPool = null;
|
gConnectionPool = null;
|
||||||
} else {
|
|
||||||
callback(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function reconnect(callback) {
|
|
||||||
callback = callback ? once(callback) : function () {};
|
|
||||||
|
|
||||||
debug('reconnect: connecting to database');
|
|
||||||
|
|
||||||
gConnectionPool.getConnection(function (error, connection) {
|
|
||||||
if (error) {
|
|
||||||
debug(`reconnect: db connection error. ${error.message} fatal:${error.fatal} code:${error.code}. Will retry in 10 seconds`);
|
|
||||||
return setTimeout(reconnect.bind(null, callback), 10000);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug('reconnect: connected to database');
|
|
||||||
|
|
||||||
connection.on('error', function (error) {
|
|
||||||
// by design, we catch all normal errors by providing callbacks.
|
|
||||||
// this function should be invoked only when we have no callbacks pending and we have a fatal error
|
|
||||||
assert(error.fatal, 'Non-fatal error on connection object');
|
|
||||||
|
|
||||||
debug(`reconnect: db connection error. ${error.message} fatal:${error.fatal} code:${error.code}. Will retry in 10 seconds`);
|
|
||||||
|
|
||||||
gDefaultConnection = null;
|
|
||||||
|
|
||||||
// This is most likely an issue an can cause double callbacks from reconnect()
|
|
||||||
setTimeout(reconnect.bind(null, callback), 10000);
|
|
||||||
});
|
|
||||||
|
|
||||||
gDefaultConnection = connection;
|
|
||||||
|
|
||||||
callback(null);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function clear(callback) {
|
function clear(callback) {
|
||||||
@@ -119,40 +84,12 @@ function clear(callback) {
|
|||||||
child_process.exec(cmd, callback);
|
child_process.exec(cmd, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
function beginTransaction(callback) {
|
|
||||||
assert.strictEqual(typeof callback, 'function');
|
|
||||||
|
|
||||||
if (gConnectionPool === null) return callback(new BoxError(BoxError.DATABASE_ERROR, 'No database connection pool.'));
|
|
||||||
|
|
||||||
gConnectionPool.getConnection(function (error, connection) {
|
|
||||||
if (error) return callback(error);
|
|
||||||
|
|
||||||
connection.beginTransaction(function (error) {
|
|
||||||
if (error) return callback(error);
|
|
||||||
|
|
||||||
return callback(null, connection);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function query() {
|
function query() {
|
||||||
var args = Array.prototype.slice.call(arguments);
|
var args = Array.prototype.slice.call(arguments);
|
||||||
var callback = args[args.length - 1];
|
var callback = args[args.length - 1];
|
||||||
assert.strictEqual(typeof callback, 'function');
|
assert.strictEqual(typeof callback, 'function');
|
||||||
|
|
||||||
if (gDefaultConnection === null) return callback(new BoxError(BoxError.DATABASE_ERROR, 'No connection to database'));
|
gConnectionPool.query.apply(gConnectionPool, args); // this is same as getConnection/query/release
|
||||||
|
|
||||||
gDefaultConnection.query.apply(gDefaultConnection, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
function rollback(connection, transactionError, callback) {
|
|
||||||
connection.rollback(function (error) {
|
|
||||||
if (error) debug('rollback: error when rolling back', error);
|
|
||||||
|
|
||||||
connection.release();
|
|
||||||
|
|
||||||
callback(transactionError);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function transaction(queries, callback) {
|
function transaction(queries, callback) {
|
||||||
@@ -161,16 +98,21 @@ function transaction(queries, callback) {
|
|||||||
|
|
||||||
callback = once(callback);
|
callback = once(callback);
|
||||||
|
|
||||||
beginTransaction(function (error, connection) {
|
gConnectionPool.getConnection(function (error, connection) {
|
||||||
if (error) return callback(error);
|
if (error) return callback(error);
|
||||||
|
|
||||||
|
const releaseConnection = (error) => { connection.release(); callback(error); };
|
||||||
|
|
||||||
|
connection.beginTransaction(function (error) {
|
||||||
|
if (error) return releaseConnection(error);
|
||||||
|
|
||||||
async.mapSeries(queries, function iterator(query, done) {
|
async.mapSeries(queries, function iterator(query, done) {
|
||||||
connection.query(query.query, query.args, done);
|
connection.query(query.query, query.args, done);
|
||||||
}, function seriesDone(error, results) {
|
}, function seriesDone(error, results) {
|
||||||
if (error) return rollback(connection, error, callback);
|
if (error) return connection.rollback(() => releaseConnection(error));
|
||||||
|
|
||||||
connection.commit(function (error) {
|
connection.commit(function (error) {
|
||||||
if (error) return rollback(connection, error, callback);
|
if (error) return connection.rollback(() => releaseConnection(error));
|
||||||
|
|
||||||
connection.release();
|
connection.release();
|
||||||
|
|
||||||
@@ -178,6 +120,7 @@ function transaction(queries, callback) {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function importFromFile(file, callback) {
|
function importFromFile(file, callback) {
|
||||||
|
|||||||
Reference in New Issue
Block a user