diff --git a/dashboard/src/components/GraphItem.vue b/dashboard/src/components/GraphItem.vue index 9abf81144..929085abe 100644 --- a/dashboard/src/components/GraphItem.vue +++ b/dashboard/src/components/GraphItem.vue @@ -69,7 +69,7 @@ function createGraphOptions({ yscale, period, highMark }) { borderWidth: 0.5 } } - } + }, }, scales: { x: { diff --git a/src/metrics.js b/src/metrics.js index d22f1ef0f..8b193dded 100644 --- a/src/metrics.js +++ b/src/metrics.js @@ -18,7 +18,8 @@ const apps = require('./apps.js'), path = require('path'), { Readable } = require('stream'), safe = require('safetydance'), - superagent = require('./superagent.js'); + superagent = require('./superagent.js'), + _ = require('./underscore.js'); function translateContainerStatsSync(stats) { assert.strictEqual(typeof stats, 'object'); @@ -154,7 +155,7 @@ async function readSystemMetrics() { const networkMetrics = await readNetworkMetrics(); // { memoryUsed, swapUsed, userMsecs, sysMsecs, blockRead, blockWrite, networkRead, networkWrite } - return { ...memoryMetrics, ...cpuMetrics, ...diskMetrics, ...networkMetrics }; + return { ts: new Date(), ...memoryMetrics, ...cpuMetrics, ...diskMetrics, ...networkMetrics }; } async function sendToGraphite() { @@ -374,13 +375,11 @@ async function get(options) { return result; } -async function pipeContainerMetrics(name, metricsStream, ac) { +async function pipeContainerToMap(name, statsMap, ac) { assert.strictEqual(typeof name, 'string'); - assert.strictEqual(typeof metricsStream, 'object'); + assert.ok(statsMap instanceof Map); assert.strictEqual(typeof ac, 'object'); - let oldMetrics = null; - // we used to poll before instead of a stream. but docker caches metrics internally and rate logic has to handle dups const [error, statsStream] = await safe(docker.getStats(name, { stream: true })); if (error) return; // container stopped or missing, silently ignore @@ -393,6 +392,8 @@ async function pipeContainerMetrics(name, metricsStream, ac) { const { ts, networkRead, networkWrite, blockRead, blockWrite, memoryUsed, cpuUsageMsecs } = metrics; + const oldMetrics = statsMap.get(name)?.raw || null; + const gap = oldMetrics ? (ts - oldMetrics.ts) : null; const cpuPercent = oldMetrics ? (cpuUsageMsecs - oldMetrics.cpuUsageMsecs) * 100 / gap : null; @@ -401,11 +402,9 @@ async function pipeContainerMetrics(name, metricsStream, ac) { const networkReadRate = oldMetrics ? (networkRead - oldMetrics.networkRead) / (gap/1000) : null; const networkWriteRate = oldMetrics ? (networkWrite - oldMetrics.networkWrite) / (gap/1000) : null; - oldMetrics = metrics; - - const nowSecs = ts.getTime() / 1000; // conver to secs to match graphite return value - const result = {}; - result[name] = { + const nowSecs = ts.getTime() / 1000; // convert to secs to match graphite return value but this is thrown away and patched during streaming time + statsMap.set(name, { + raw: metrics, cpu: [ cpuPercent, nowSecs ], memory: [ memoryUsed, nowSecs ], @@ -418,8 +417,7 @@ async function pipeContainerMetrics(name, metricsStream, ac) { networkWriteRate: [ networkWriteRate, nowSecs ], networkReadTotal: metrics.networkRead, networkWriteTotal: metrics.networkWrite, - }; - metricsStream.push(result); + }); }); ac.signal.addEventListener('abort', () => { // there is event.type and ac.signal.reason @@ -427,71 +425,79 @@ async function pipeContainerMetrics(name, metricsStream, ac) { }, { once: true }); } -async function pipeSystemMetrics(metricsStream, ac) { - assert.strictEqual(typeof metricsStream, 'object'); - assert.strictEqual(typeof ac, 'object'); +async function pipeSystemToMap(statsMap) { + assert.ok(statsMap instanceof Map); - const INTERVAL_MSECS = 1000; - let oldMetrics = null; + const [error, metrics] = await safe(readSystemMetrics()); + if (error) return; // silently ignore error - const intervalId = setInterval(async () => { - const [error, metrics] = await safe(readSystemMetrics()); - if (error) return metricsStream.destroy(error); + const oldMetrics = statsMap.get('system')?.raw || null; - const cpuPercent = oldMetrics ? (metrics.userMsecs + metrics.sysMsecs - oldMetrics.userMsecs - oldMetrics.sysMsecs) * 100 / INTERVAL_MSECS : null; - const blockReadRate = oldMetrics ? (metrics.blockRead - oldMetrics.blockRead) / (INTERVAL_MSECS/1000) : null; - const blockWriteRate = oldMetrics ? (metrics.blockWrite - oldMetrics.blockWrite) / (INTERVAL_MSECS/1000) : null; - const networkReadRate = oldMetrics ? (metrics.networkRead - oldMetrics.networkRead) / (INTERVAL_MSECS/1000) : null; - const networkWriteRate = oldMetrics ? (metrics.networkWrite - oldMetrics.networkWrite) / (INTERVAL_MSECS/1000) : null; + const gap = oldMetrics ? (metrics.ts - oldMetrics.ts) : null; - oldMetrics = metrics; + const cpuPercent = oldMetrics ? (metrics.userMsecs + metrics.sysMsecs - oldMetrics.userMsecs - oldMetrics.sysMsecs) * 100 / gap : null; + const blockReadRate = oldMetrics ? (metrics.blockRead - oldMetrics.blockRead) / (gap/1000) : null; + const blockWriteRate = oldMetrics ? (metrics.blockWrite - oldMetrics.blockWrite) / (gap/1000) : null; + const networkReadRate = oldMetrics ? (metrics.networkRead - oldMetrics.networkRead) / (gap/1000) : null; + const networkWriteRate = oldMetrics ? (metrics.networkWrite - oldMetrics.networkWrite) / (gap/1000) : null; - const nowSecs = Date.now() / 1000; // to match graphite return value - const systemStats = { - cpu: [ cpuPercent, nowSecs ], - memory: [ metrics.memoryUsed, nowSecs ], - swap: [ metrics.swapUsed, nowSecs ], + const nowSecs = Date.now() / 1000; // convert to secs to match graphite return value but this is thrown away and patched during streaming time + const systemStats = { + raw: metrics, + cpu: [ cpuPercent, nowSecs ], + memory: [ metrics.memoryUsed, nowSecs ], + swap: [ metrics.swapUsed, nowSecs ], - blockReadRate: [ blockReadRate, nowSecs ], - blockWriteRate: [ blockWriteRate, nowSecs ], - blockReadTotal: metrics.blockRead, - blockWriteTotal: metrics.blockWrite, + blockReadRate: [ blockReadRate, nowSecs ], + blockWriteRate: [ blockWriteRate, nowSecs ], + blockReadTotal: metrics.blockRead, + blockWriteTotal: metrics.blockWrite, - networkReadRate: [ networkReadRate, nowSecs ], - networkWriteRate: [ networkWriteRate, nowSecs ], - networkReadTotal: metrics.networkRead, - networkWriteTotal: metrics.networkWrite, - }; - metricsStream.push({ system: systemStats }); - }, INTERVAL_MSECS); - - ac.signal.addEventListener('abort', () => { // there is event.type and ac.signal.reason - clearInterval(intervalId); - }, { once: true }); + networkReadRate: [ networkReadRate, nowSecs ], + networkWriteRate: [ networkWriteRate, nowSecs ], + networkReadTotal: metrics.networkRead, + networkWriteTotal: metrics.networkWrite, + }; + statsMap.set('system', systemStats); } async function getStream(options) { assert.strictEqual(typeof options, 'object'); const ac = new AbortController(); + const statsMap = new Map(); + let intervalId = null; const metricsStream = new Readable({ objectMode: true, read(/*size*/) { /* ignored, we push via interval */ }, destroy(error, callback) { + clearInterval(intervalId); ac.abort(error); callback(error); } }); - if (options.system) pipeSystemMetrics(metricsStream, ac); - for (const appId of (options.appIds || [])) { - pipeContainerMetrics(appId, metricsStream, ac); - } + const containerNames = (options.appIds || []).concat(options.serviceIds || []); + await Promise.all(containerNames.map(containerName => pipeContainerToMap(containerName, statsMap, ac))); - for (const serviceId of (options.serviceIds || [])) { - pipeContainerMetrics(serviceId, metricsStream, ac); - } + const INTERVAL_MSECS = 1000; + intervalId = setInterval(async () => { + if (options.system) await pipeSystemToMap(statsMap); + + const result = {}; + const nowSecs = Date.now() / 1000; // to match graphite return value + + // patch the stats to have the current timestamp + for (const [id, stats] of statsMap) { + result[id] = structuredClone(_.omit(stats, ['raw'])); + for (const statValue of Object.values(result[id])) { + if (Array.isArray(statValue)) statValue[1] = nowSecs; + } + } + + metricsStream.push(result); + }, INTERVAL_MSECS); return metricsStream; } diff --git a/src/underscore.js b/src/underscore.js index dc8e55d65..52b2d956b 100644 --- a/src/underscore.js +++ b/src/underscore.js @@ -11,12 +11,14 @@ exports = module.exports = { // IMPORTANT: this file is required from the migration logic. avoid requires const assert = require('assert'); +// note: returns shallow copy. use structuredClone() on top to get a deep copy function pick(obj, keys) { assert(Array.isArray(keys)); return Object.fromEntries(Object.entries(obj).filter(([key]) => keys.includes(key))); } +// note: returns shallow copy. use structuredClone() on top to get a deep copy function omit(obj, keys) { assert(Array.isArray(keys));