metrics: group the metrics when sending live stats

without this the tooltip doesn't work. it needs to have the same
consistent timestamps
This commit is contained in:
Girish Ramakrishnan
2025-07-08 16:45:55 +02:00
parent d09f932834
commit 0952b6d68f
3 changed files with 63 additions and 55 deletions

View File

@@ -18,7 +18,8 @@ const apps = require('./apps.js'),
path = require('path'),
{ Readable } = require('stream'),
safe = require('safetydance'),
superagent = require('./superagent.js');
superagent = require('./superagent.js'),
_ = require('./underscore.js');
function translateContainerStatsSync(stats) {
assert.strictEqual(typeof stats, 'object');
@@ -154,7 +155,7 @@ async function readSystemMetrics() {
const networkMetrics = await readNetworkMetrics();
// { memoryUsed, swapUsed, userMsecs, sysMsecs, blockRead, blockWrite, networkRead, networkWrite }
return { ...memoryMetrics, ...cpuMetrics, ...diskMetrics, ...networkMetrics };
return { ts: new Date(), ...memoryMetrics, ...cpuMetrics, ...diskMetrics, ...networkMetrics };
}
async function sendToGraphite() {
@@ -374,13 +375,11 @@ async function get(options) {
return result;
}
async function pipeContainerMetrics(name, metricsStream, ac) {
async function pipeContainerToMap(name, statsMap, ac) {
assert.strictEqual(typeof name, 'string');
assert.strictEqual(typeof metricsStream, 'object');
assert.ok(statsMap instanceof Map);
assert.strictEqual(typeof ac, 'object');
let oldMetrics = null;
// we used to poll before instead of a stream. but docker caches metrics internally and rate logic has to handle dups
const [error, statsStream] = await safe(docker.getStats(name, { stream: true }));
if (error) return; // container stopped or missing, silently ignore
@@ -393,6 +392,8 @@ async function pipeContainerMetrics(name, metricsStream, ac) {
const { ts, networkRead, networkWrite, blockRead, blockWrite, memoryUsed, cpuUsageMsecs } = metrics;
const oldMetrics = statsMap.get(name)?.raw || null;
const gap = oldMetrics ? (ts - oldMetrics.ts) : null;
const cpuPercent = oldMetrics ? (cpuUsageMsecs - oldMetrics.cpuUsageMsecs) * 100 / gap : null;
@@ -401,11 +402,9 @@ async function pipeContainerMetrics(name, metricsStream, ac) {
const networkReadRate = oldMetrics ? (networkRead - oldMetrics.networkRead) / (gap/1000) : null;
const networkWriteRate = oldMetrics ? (networkWrite - oldMetrics.networkWrite) / (gap/1000) : null;
oldMetrics = metrics;
const nowSecs = ts.getTime() / 1000; // conver to secs to match graphite return value
const result = {};
result[name] = {
const nowSecs = ts.getTime() / 1000; // convert to secs to match graphite return value but this is thrown away and patched during streaming time
statsMap.set(name, {
raw: metrics,
cpu: [ cpuPercent, nowSecs ],
memory: [ memoryUsed, nowSecs ],
@@ -418,8 +417,7 @@ async function pipeContainerMetrics(name, metricsStream, ac) {
networkWriteRate: [ networkWriteRate, nowSecs ],
networkReadTotal: metrics.networkRead,
networkWriteTotal: metrics.networkWrite,
};
metricsStream.push(result);
});
});
ac.signal.addEventListener('abort', () => { // there is event.type and ac.signal.reason
@@ -427,71 +425,79 @@ async function pipeContainerMetrics(name, metricsStream, ac) {
}, { once: true });
}
async function pipeSystemMetrics(metricsStream, ac) {
assert.strictEqual(typeof metricsStream, 'object');
assert.strictEqual(typeof ac, 'object');
async function pipeSystemToMap(statsMap) {
assert.ok(statsMap instanceof Map);
const INTERVAL_MSECS = 1000;
let oldMetrics = null;
const [error, metrics] = await safe(readSystemMetrics());
if (error) return; // silently ignore error
const intervalId = setInterval(async () => {
const [error, metrics] = await safe(readSystemMetrics());
if (error) return metricsStream.destroy(error);
const oldMetrics = statsMap.get('system')?.raw || null;
const cpuPercent = oldMetrics ? (metrics.userMsecs + metrics.sysMsecs - oldMetrics.userMsecs - oldMetrics.sysMsecs) * 100 / INTERVAL_MSECS : null;
const blockReadRate = oldMetrics ? (metrics.blockRead - oldMetrics.blockRead) / (INTERVAL_MSECS/1000) : null;
const blockWriteRate = oldMetrics ? (metrics.blockWrite - oldMetrics.blockWrite) / (INTERVAL_MSECS/1000) : null;
const networkReadRate = oldMetrics ? (metrics.networkRead - oldMetrics.networkRead) / (INTERVAL_MSECS/1000) : null;
const networkWriteRate = oldMetrics ? (metrics.networkWrite - oldMetrics.networkWrite) / (INTERVAL_MSECS/1000) : null;
const gap = oldMetrics ? (metrics.ts - oldMetrics.ts) : null;
oldMetrics = metrics;
const cpuPercent = oldMetrics ? (metrics.userMsecs + metrics.sysMsecs - oldMetrics.userMsecs - oldMetrics.sysMsecs) * 100 / gap : null;
const blockReadRate = oldMetrics ? (metrics.blockRead - oldMetrics.blockRead) / (gap/1000) : null;
const blockWriteRate = oldMetrics ? (metrics.blockWrite - oldMetrics.blockWrite) / (gap/1000) : null;
const networkReadRate = oldMetrics ? (metrics.networkRead - oldMetrics.networkRead) / (gap/1000) : null;
const networkWriteRate = oldMetrics ? (metrics.networkWrite - oldMetrics.networkWrite) / (gap/1000) : null;
const nowSecs = Date.now() / 1000; // to match graphite return value
const systemStats = {
cpu: [ cpuPercent, nowSecs ],
memory: [ metrics.memoryUsed, nowSecs ],
swap: [ metrics.swapUsed, nowSecs ],
const nowSecs = Date.now() / 1000; // convert to secs to match graphite return value but this is thrown away and patched during streaming time
const systemStats = {
raw: metrics,
cpu: [ cpuPercent, nowSecs ],
memory: [ metrics.memoryUsed, nowSecs ],
swap: [ metrics.swapUsed, nowSecs ],
blockReadRate: [ blockReadRate, nowSecs ],
blockWriteRate: [ blockWriteRate, nowSecs ],
blockReadTotal: metrics.blockRead,
blockWriteTotal: metrics.blockWrite,
blockReadRate: [ blockReadRate, nowSecs ],
blockWriteRate: [ blockWriteRate, nowSecs ],
blockReadTotal: metrics.blockRead,
blockWriteTotal: metrics.blockWrite,
networkReadRate: [ networkReadRate, nowSecs ],
networkWriteRate: [ networkWriteRate, nowSecs ],
networkReadTotal: metrics.networkRead,
networkWriteTotal: metrics.networkWrite,
};
metricsStream.push({ system: systemStats });
}, INTERVAL_MSECS);
ac.signal.addEventListener('abort', () => { // there is event.type and ac.signal.reason
clearInterval(intervalId);
}, { once: true });
networkReadRate: [ networkReadRate, nowSecs ],
networkWriteRate: [ networkWriteRate, nowSecs ],
networkReadTotal: metrics.networkRead,
networkWriteTotal: metrics.networkWrite,
};
statsMap.set('system', systemStats);
}
async function getStream(options) {
assert.strictEqual(typeof options, 'object');
const ac = new AbortController();
const statsMap = new Map();
let intervalId = null;
const metricsStream = new Readable({
objectMode: true,
read(/*size*/) { /* ignored, we push via interval */ },
destroy(error, callback) {
clearInterval(intervalId);
ac.abort(error);
callback(error);
}
});
if (options.system) pipeSystemMetrics(metricsStream, ac);
for (const appId of (options.appIds || [])) {
pipeContainerMetrics(appId, metricsStream, ac);
}
const containerNames = (options.appIds || []).concat(options.serviceIds || []);
await Promise.all(containerNames.map(containerName => pipeContainerToMap(containerName, statsMap, ac)));
for (const serviceId of (options.serviceIds || [])) {
pipeContainerMetrics(serviceId, metricsStream, ac);
}
const INTERVAL_MSECS = 1000;
intervalId = setInterval(async () => {
if (options.system) await pipeSystemToMap(statsMap);
const result = {};
const nowSecs = Date.now() / 1000; // to match graphite return value
// patch the stats to have the current timestamp
for (const [id, stats] of statsMap) {
result[id] = structuredClone(_.omit(stats, ['raw']));
for (const statValue of Object.values(result[id])) {
if (Array.isArray(statValue)) statValue[1] = nowSecs;
}
}
metricsStream.push(result);
}, INTERVAL_MSECS);
return metricsStream;
}