graphs: directly stream docker stats

docker stats API caches the stat and reading it frequently gives back the same value.
this trips our "rate" code into thinking rate is 0.

one approach was to persist polling like we do now and ignore entries based on stats.read.
this works fine but the streaming approach seems better since we don't need to poll anymore.
This commit is contained in:
Girish Ramakrishnan
2025-07-03 19:01:40 +02:00
parent 03b7445cb9
commit 813409a8fb
4 changed files with 31 additions and 31 deletions

View File

@@ -43,10 +43,8 @@ let systemMemory = {};
let systemCpus = {};
let metricStream = null;
const LIVE_REFRESH_INTERVAL_MSECS = 500;
async function liveRefresh() {
metricStream = await appsModel.getMetricStream(app.id, LIVE_REFRESH_INTERVAL_MSECS);
metricStream = await appsModel.getMetricStream(app.id);
metricStream.onerror = (error) => console.log('event stream error:', error);
metricStream.onmessage = (message) => {
const data = JSON.parse(message.data);

View File

@@ -407,8 +407,8 @@ function create() {
if (result.status !== 200) return [result];
return [null, result.body];
},
async getMetricStream(id, intervalMsecs) {
return new EventSource(`${API_ORIGIN}/api/v1/apps/${id}/metricstream?access_token=${accessToken}&intervalMsecs=${intervalMsecs}`);
async getMetricStream(id) {
return new EventSource(`${API_ORIGIN}/api/v1/apps/${id}/metricstream?access_token=${accessToken}`);
},
async repair(id, data) {
let result;

View File

@@ -25,11 +25,8 @@ const apps = require('./apps.js'),
services = require('./services.js'),
superagent = require('./superagent.js');
async function readContainerMetric(name) {
assert.strictEqual(typeof name, 'string');
const [error, stats] = await safe(docker.getStats(name, { stream: false }));
if (error || Object.keys(stats.memory_stats).length === 0) return null; // the container is missing or stopped. better not to inspect and check State since a race is possible
function translateContainerStatsSync(stats) {
assert.strictEqual(typeof stats, 'object');
const networkRead = stats.networks ? stats.networks.eth0.rx_bytes : 0; // in host mode (turn), networks is missing
const networkWrite = stats.networks ? stats.networks.eth0.tx_bytes : 0; // in host mode (turn), networks is missing
@@ -42,8 +39,9 @@ async function readContainerMetric(name) {
const blockWrite = blkioStats.filter(entry => entry.op === 'write').reduce((sum, entry) => sum + entry.value, 0);
const cpuUsageMsecs = stats.cpu_stats.cpu_usage.total_usage / 1e6; // convert from nano to msecs (to match system metrics)
const systemUsageMsecs = stats.cpu_stats.system_cpu_usage / 1e6;
return { networkRead, networkWrite, blockRead, blockWrite, memoryUsed, memoryMax, cpuUsageMsecs };
return { ts: new Date(stats.read), networkRead, networkWrite, blockRead, blockWrite, memoryUsed, memoryMax, cpuUsageMsecs, systemUsageMsecs };
}
async function readContainerMetrics() {
@@ -57,8 +55,10 @@ async function readContainerMetrics() {
const metrics = {};
for (const containerName of containerNames) {
const stats = await readContainerMetric(containerName);
if (stats) metrics[containerName] = stats;
const [error, stats] = await safe(docker.getStats(containerName, { stream: false }));
if (error || Object.keys(stats.memory_stats).length === 0) continue; // the container is missing or stopped. better not to inspect and check State since a race is possible
metrics[containerName] = translateContainerStatsSync(stats);
}
return metrics;
@@ -422,32 +422,37 @@ async function getContainerStream(name, options) {
assert.strictEqual(typeof name, 'string');
assert.strictEqual(typeof options, 'object');
const intervalMsecs = options.intervalMsecs || 5000;
let intervalId = null, oldMetrics = null;
let oldMetrics = null;
const metricsStream = new Readable({
read(/*size*/) { /* ignored, we push via interval */ },
destroy(error, callback) {
clearInterval(intervalId);
statsStream.destroy(); // double destroy is a no-op
callback(error);
}
});
intervalId = setInterval(async () => {
const [error, metrics] = await safe(readContainerMetric(name));
if (error) return metricsStream.destroy(error);
const [error, statsStream] = await safe(docker.getStats(name, { stream: true }));
if (error) throw new Error(`Container stopped or missing: ${error.message}`);
const { networkRead, networkWrite, blockRead, blockWrite, memoryUsed, cpuUsageMsecs } = metrics;
statsStream.on('error', (error) => metricsStream.destroy(error)); // double destroy is a no-op
statsStream.on('data', (data) => {
const stats = JSON.parse(data.toString('utf8'));
const metrics = translateContainerStatsSync(stats);
const cpuPercent = oldMetrics ? (cpuUsageMsecs - oldMetrics.cpuUsageMsecs) * 0.1 / (intervalMsecs/1000) : null;
const blockReadRate = oldMetrics ? (blockRead - oldMetrics.blockRead) / (intervalMsecs/1000) : null;
const blockWriteRate = oldMetrics ? (blockWrite - oldMetrics.blockWrite) / (intervalMsecs/1000) : null;
const networkReadRate = oldMetrics ? (networkRead - oldMetrics.networkRead) / (intervalMsecs/1000) : null;
const networkWriteRate = oldMetrics ? (networkWrite - oldMetrics.networkWrite) / (intervalMsecs/1000) : null;
const { ts, networkRead, networkWrite, blockRead, blockWrite, memoryUsed, cpuUsageMsecs } = metrics;
const gap = oldMetrics ? (ts - oldMetrics.ts) : null;
const cpuPercent = oldMetrics ? (cpuUsageMsecs - oldMetrics.cpuUsageMsecs) * 100 / gap : null;
const blockReadRate = oldMetrics ? (blockRead - oldMetrics.blockRead) / (gap/1000) : null;
const blockWriteRate = oldMetrics ? (blockWrite - oldMetrics.blockWrite) / (gap/1000) : null;
const networkReadRate = oldMetrics ? (networkRead - oldMetrics.networkRead) / (gap/1000) : null;
const networkWriteRate = oldMetrics ? (networkWrite - oldMetrics.networkWrite) / (gap/1000) : null;
oldMetrics = metrics;
const nowSecs = Date.now() / 1000; // to match graphite return value
const nowSecs = ts.getTime() / 1000; // to match graphite return value
metricsStream.push(JSON.stringify({
cpu: [ cpuPercent, nowSecs ],
memory: [ memoryUsed, nowSecs ],
@@ -462,7 +467,7 @@ async function getContainerStream(name, options) {
networkReadTotal: metrics.networkRead,
networkWriteTotal: metrics.networkWrite,
}));
}, intervalMsecs);
});
return metricsStream;
}

View File

@@ -1085,10 +1085,7 @@ async function getMetrics(req, res, next) {
async function getMetricStream(req, res, next) {
if (req.headers.accept !== 'text/event-stream') return next(new HttpError(400, 'This API call requires EventStream'));
const intervalMsecs = typeof req.query.intervalMsecs !== 'undefined' ? parseInt(req.query.intervalMsecs, 10) : 5000;
if (!intervalMsecs || intervalMsecs < 100) return next(new HttpError(400, 'intervalSecs query param must be atleast 100'));
const [error, metricStream] = await safe(metrics.getContainerStream(req.resources.app.id, { intervalMsecs }));
const [error, metricStream] = await safe(metrics.getContainerStream(req.resources.app.id, {}));
if (error) return next(BoxError.toHttpError(error));
res.writeHead(200, {