Flux can calculate moving averages using the movingAverage() function.
Here’s a Flux query that calculates a 5-minute moving average for CPU usage:
data
|> range(start: -15m)
|> filter(fn: (r) => r["_measurement"] == "cpu" and r["_field"] == "usage_user")
|> movingAverage(n: 5m)
Let’s break down how this works and explore the concepts behind it.
The core problem moving averages solve is smoothing out noisy time-series data to reveal underlying trends. Raw sensor data, for instance, can fluctuate wildly due to transient events. A moving average, by averaging a subset of recent data points, effectively dampens these short-term variations.
Consider a scenario where you’re monitoring server CPU usage. A single high spike might be an anomaly, but a sustained period of elevated usage is a genuine concern. A moving average helps distinguish between these.
The movingAverage() function in Flux takes a single primary argument: n. This n defines the window size for the average. It can be specified as a duration (e.g., 5m, 1h) or a count of points.
- Duration-based windowing: When
nis a duration like5m, Flux looks at all data points that fall within the last 5 minutes relative to the current point being processed. If there are no points within that window, the result for that point will benull. - Point-based windowing: When
nis an integer (e.g.,n: 10), Flux takes thenmost recent points that have already been processed. This is often simpler to reason about in terms of consistent output points but can be misleading if your data ingest rate is highly variable.
Let’s look at a more elaborate example, including data generation for demonstration:
// Generate some sample noisy CPU data
cpu = stream.generate(
fn: (n) => ({
_time: uint(v: n) * uint(v: 1000000000), // 1 second intervals
_value: random.normal(mean: 50.0, stddev: 10.0),
_field: "usage_user",
_measurement: "cpu",
host: "serverA",
}),
count: 100,
)
// Calculate a 3-point moving average
cpu
|> window(period: 1s) // Ensure data is aligned to second boundaries for clarity
|> movingAverage(n: 3)
|> yield(name: "cpu_moving_avg_3_points")
// Calculate a 10-second moving average
cpu
|> range(start: -1m) // Fetch recent data
|> filter(fn: (r) => r["_measurement"] == "cpu" and r["_field"] == "usage_user")
|> movingAverage(n: 10s)
|> yield(name: "cpu_moving_avg_10s")
In the first movingAverage call (n: 3), we’re using point-based windowing. For each output point, Flux considers the current point and the two preceding points that have already passed through the pipeline. This results in the first two output points being null because there aren’t enough preceding points. The output will have one fewer point than the input after the movingAverage function.
In the second call (n: 10s), we’re using duration-based windowing. For each timestamp, Flux will look back 10 seconds and average all the _value points it finds within that interval. If no points exist in a 10-second window, the output for that timestamp will be null. This means the output density can vary depending on your data’s arrival rate.
A key detail often overlooked is how movingAverage handles missing data points or gaps in your time series. When using duration-based windowing (n: duration), if there are no data points within the specified duration leading up to a given timestamp, movingAverage will output null. This is a feature, not a bug, as it accurately reflects that no average could be computed. However, if you need to fill these gaps or ensure a continuous output, you might need to combine movingAverage with functions like fill() or aggregate.mean() after a window() operation with a specific every period.
The movingAverage function is a fundamental tool for time-series analysis, enabling clearer trend identification and more robust anomaly detection by filtering out short-term noise.
The next logical step after calculating moving averages is often to compare them against the raw data or to calculate multiple moving averages with different window sizes to identify crossovers.