metrics: overlay app metrics over system metrics
This commit is contained in:
154
src/metrics.js
154
src/metrics.js
@@ -1,12 +1,8 @@
|
||||
'use strict';
|
||||
|
||||
exports = module.exports = {
|
||||
getSystem,
|
||||
getSystemStream,
|
||||
|
||||
getContainer,
|
||||
getContainerStream,
|
||||
|
||||
get,
|
||||
getStream,
|
||||
sendToGraphite
|
||||
};
|
||||
|
||||
@@ -22,7 +18,6 @@ const apps = require('./apps.js'),
|
||||
path = require('path'),
|
||||
{ Readable } = require('stream'),
|
||||
safe = require('safetydance'),
|
||||
services = require('./services.js'),
|
||||
superagent = require('./superagent.js');
|
||||
|
||||
function translateContainerStatsSync(stats) {
|
||||
@@ -361,7 +356,7 @@ async function readSystemFromGraphite(options) {
|
||||
// Disk:
|
||||
// writing: fio --name=rate-test --filename=tempfile --rw=write --bs=4k --ioengine=libaio --rate=20M --size=5000M --runtime=150 --direct=1. test with iotop
|
||||
// reading: fio --name=rate-test --filename=tempfile --rw=read --bs=4k --ioengine=libaio --rate=20M --size=5000M --runtime=150 --direct=1. test with iotop
|
||||
async function getSystem(options) {
|
||||
async function get(options) {
|
||||
assert.strictEqual(typeof options, 'object');
|
||||
|
||||
const result = {};
|
||||
@@ -379,74 +374,18 @@ async function getSystem(options) {
|
||||
return result;
|
||||
}
|
||||
|
||||
async function getSystemStream(options) {
|
||||
assert.strictEqual(typeof options, 'object');
|
||||
|
||||
const INTERVAL_MSECS = 1000;
|
||||
let intervalId = null, oldMetrics = null;
|
||||
|
||||
const metricsStream = new Readable({
|
||||
objectMode: true,
|
||||
read(/*size*/) { /* ignored, we push via interval */ },
|
||||
destroy(error, callback) {
|
||||
clearInterval(intervalId);
|
||||
callback(error);
|
||||
}
|
||||
});
|
||||
|
||||
intervalId = setInterval(async () => {
|
||||
const [error, metrics] = await safe(readSystemMetrics());
|
||||
if (error) return metricsStream.destroy(error);
|
||||
|
||||
const cpuPercent = oldMetrics ? (metrics.userMsecs + metrics.sysMsecs - oldMetrics.userMsecs - oldMetrics.sysMsecs) * 100 / INTERVAL_MSECS : null;
|
||||
const blockReadRate = oldMetrics ? (metrics.blockRead - oldMetrics.blockRead) / (INTERVAL_MSECS/1000) : null;
|
||||
const blockWriteRate = oldMetrics ? (metrics.blockWrite - oldMetrics.blockWrite) / (INTERVAL_MSECS/1000) : null;
|
||||
const networkReadRate = oldMetrics ? (metrics.networkRead - oldMetrics.networkRead) / (INTERVAL_MSECS/1000) : null;
|
||||
const networkWriteRate = oldMetrics ? (metrics.networkWrite - oldMetrics.networkWrite) / (INTERVAL_MSECS/1000) : null;
|
||||
|
||||
oldMetrics = metrics;
|
||||
|
||||
const nowSecs = Date.now() / 1000; // to match graphite return value
|
||||
metricsStream.push({
|
||||
cpu: [ cpuPercent, nowSecs ],
|
||||
memory: [ metrics.memoryUsed, nowSecs ],
|
||||
swap: [ metrics.swapUsed, nowSecs ],
|
||||
|
||||
blockReadRate: [ blockReadRate, nowSecs ],
|
||||
blockWriteRate: [ blockWriteRate, nowSecs ],
|
||||
blockReadTotal: metrics.blockRead,
|
||||
blockWriteTotal: metrics.blockWrite,
|
||||
|
||||
networkReadRate: [ networkReadRate, nowSecs ],
|
||||
networkWriteRate: [ networkWriteRate, nowSecs ],
|
||||
networkReadTotal: metrics.networkRead,
|
||||
networkWriteTotal: metrics.networkWrite,
|
||||
});
|
||||
}, INTERVAL_MSECS);
|
||||
|
||||
return metricsStream;
|
||||
}
|
||||
|
||||
async function getContainerStream(name, options) {
|
||||
async function pipeContainerMetrics(name, metricsStream, ac) {
|
||||
assert.strictEqual(typeof name, 'string');
|
||||
assert.strictEqual(typeof options, 'object');
|
||||
assert.strictEqual(typeof metricsStream, 'object');
|
||||
assert.strictEqual(typeof ac, 'object');
|
||||
|
||||
let oldMetrics = null;
|
||||
|
||||
const metricsStream = new Readable({
|
||||
objectMode: true,
|
||||
read(/*size*/) { /* ignored, we push via interval */ },
|
||||
destroy(error, callback) {
|
||||
statsStream.destroy(); // double destroy is a no-op
|
||||
callback(error);
|
||||
}
|
||||
});
|
||||
|
||||
// we used to poll before instead of a stream. but docker caches metrics internally and rate logic has to handle dups
|
||||
const [error, statsStream] = await safe(docker.getStats(name, { stream: true }));
|
||||
if (error) throw new Error(`Container stopped or missing: ${error.message}`);
|
||||
if (error) return; // container stopped or missing, silently ignore
|
||||
|
||||
statsStream.on('error', (error) => metricsStream.destroy(error)); // double destroy is a no-op
|
||||
statsStream.on('error', (error) => debug(error));
|
||||
statsStream.on('data', (data) => {
|
||||
const stats = JSON.parse(data.toString('utf8'));
|
||||
const metrics = translateContainerStatsSync(stats);
|
||||
@@ -465,7 +404,8 @@ async function getContainerStream(name, options) {
|
||||
oldMetrics = metrics;
|
||||
|
||||
const nowSecs = ts.getTime() / 1000; // conver to secs to match graphite return value
|
||||
metricsStream.push({
|
||||
const result = {};
|
||||
result[name] = {
|
||||
cpu: [ cpuPercent, nowSecs ],
|
||||
memory: [ memoryUsed, nowSecs ],
|
||||
|
||||
@@ -478,8 +418,80 @@ async function getContainerStream(name, options) {
|
||||
networkWriteRate: [ networkWriteRate, nowSecs ],
|
||||
networkReadTotal: metrics.networkRead,
|
||||
networkWriteTotal: metrics.networkWrite,
|
||||
});
|
||||
};
|
||||
metricsStream.push(result);
|
||||
});
|
||||
|
||||
ac.signal.addEventListener('abort', () => { // there is event.type and ac.signal.reason
|
||||
statsStream.destroy(ac.signal.reason);
|
||||
}, { once: true });
|
||||
}
|
||||
|
||||
async function pipeSystemMetrics(metricsStream, ac) {
|
||||
assert.strictEqual(typeof metricsStream, 'object');
|
||||
assert.strictEqual(typeof ac, 'object');
|
||||
|
||||
const INTERVAL_MSECS = 1000;
|
||||
let oldMetrics = null;
|
||||
|
||||
const intervalId = setInterval(async () => {
|
||||
const [error, metrics] = await safe(readSystemMetrics());
|
||||
if (error) return metricsStream.destroy(error);
|
||||
|
||||
const cpuPercent = oldMetrics ? (metrics.userMsecs + metrics.sysMsecs - oldMetrics.userMsecs - oldMetrics.sysMsecs) * 100 / INTERVAL_MSECS : null;
|
||||
const blockReadRate = oldMetrics ? (metrics.blockRead - oldMetrics.blockRead) / (INTERVAL_MSECS/1000) : null;
|
||||
const blockWriteRate = oldMetrics ? (metrics.blockWrite - oldMetrics.blockWrite) / (INTERVAL_MSECS/1000) : null;
|
||||
const networkReadRate = oldMetrics ? (metrics.networkRead - oldMetrics.networkRead) / (INTERVAL_MSECS/1000) : null;
|
||||
const networkWriteRate = oldMetrics ? (metrics.networkWrite - oldMetrics.networkWrite) / (INTERVAL_MSECS/1000) : null;
|
||||
|
||||
oldMetrics = metrics;
|
||||
|
||||
const nowSecs = Date.now() / 1000; // to match graphite return value
|
||||
const systemStats = {
|
||||
cpu: [ cpuPercent, nowSecs ],
|
||||
memory: [ metrics.memoryUsed, nowSecs ],
|
||||
swap: [ metrics.swapUsed, nowSecs ],
|
||||
|
||||
blockReadRate: [ blockReadRate, nowSecs ],
|
||||
blockWriteRate: [ blockWriteRate, nowSecs ],
|
||||
blockReadTotal: metrics.blockRead,
|
||||
blockWriteTotal: metrics.blockWrite,
|
||||
|
||||
networkReadRate: [ networkReadRate, nowSecs ],
|
||||
networkWriteRate: [ networkWriteRate, nowSecs ],
|
||||
networkReadTotal: metrics.networkRead,
|
||||
networkWriteTotal: metrics.networkWrite,
|
||||
};
|
||||
metricsStream.push({ system: systemStats });
|
||||
}, INTERVAL_MSECS);
|
||||
|
||||
ac.signal.addEventListener('abort', () => { // there is event.type and ac.signal.reason
|
||||
clearInterval(intervalId);
|
||||
}, { once: true });
|
||||
}
|
||||
|
||||
async function getStream(options) {
|
||||
assert.strictEqual(typeof options, 'object');
|
||||
|
||||
const ac = new AbortController();
|
||||
|
||||
const metricsStream = new Readable({
|
||||
objectMode: true,
|
||||
read(/*size*/) { /* ignored, we push via interval */ },
|
||||
destroy(error, callback) {
|
||||
ac.abort(error);
|
||||
callback(error);
|
||||
}
|
||||
});
|
||||
|
||||
if (options.system) pipeSystemMetrics(metricsStream, ac);
|
||||
for (const appId of options.appIds) {
|
||||
pipeContainerMetrics(appId, metricsStream, ac);
|
||||
}
|
||||
|
||||
for (const serviceId of options.serviceIds) {
|
||||
pipeContainerMetrics(serviceId, metricsStream, ac);
|
||||
}
|
||||
|
||||
return metricsStream;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user