InfluxDB doesn’t just store time-series data; it can actively help you find the weird stuff within it, often without needing a separate anomaly detection service.
Let’s watch InfluxDB find a data anomaly in real-time. Imagine we’re tracking server CPU usage. We’re sending data every 10 seconds.
package main
import (
"fmt"
"log"
"math/rand"
"time"
client "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
const (
bucket = "my-bucket"
org = "my-org"
token = "my-token" // Replace with your actual token
url = "http://localhost:8086" // Replace with your InfluxDB URL
)
func main() {
// Initialize client
c := client.NewClient(url, token)
defer c.Close()
// Use the asynchronous write API
writeAPI := c.WriteAPI(org, bucket)
// Generate some normal data
for i := 0; i < 100; i++ {
cpuUsage := 10.0 + rand.Float64()*15.0 // Normal CPU usage between 10-25%
point := client.NewPoint("cpu_usage",
map[string]string{"host": "server-01"},
map[string]interface{}{"usage_percent": cpuUsage},
time.Now().Add(time.Duration(i)*10*time.Second))
writeAPI.WritePoint(point)
time.Sleep(10 * time.Millisecond) // Small delay to ensure distinct timestamps
}
// Inject an anomaly
anomalyUsage := 95.0 // Suddenly high CPU usage
point := client.NewPoint("cpu_usage",
map[string]string{"host": "server-01"},
map[string]interface{}{"usage_percent": anomalyUsage},
time.Now().Add(time.Duration(100)*10*time.Second))
writeAPI.WritePoint(point)
log.Println("Anomaly injected!")
// Generate more normal data
for i := 0; i < 100; i++ {
cpuUsage := 10.0 + rand.Float64()*15.0
point := client.NewPoint("cpu_usage",
map[string]string{"host": "server-01"},
map[string]interface{}{"usage_percent": cpuUsage},
time.Now().Add(time.Duration(101+i)*10*time.Second))
writeAPI.WritePoint(point)
time.Sleep(10 * time.Millisecond)
}
// Flush writes and wait for them to complete
writeAPI.Flush()
fmt.Println("Data generation complete. Check for anomalies.")
}
This Go program writes a stream of CPU usage data to InfluxDB. Most of it is within a predictable range (10-25%). Then, it injects a single data point with a usage of 95%, simulating an anomaly.
The core idea is to use InfluxDB’s query language, Flux, to define what "normal" looks like and then flag anything that deviates significantly. We can do this by looking at moving averages and standard deviations.
Here’s a Flux query that identifies an anomaly based on exceeding a certain number of standard deviations from the mean over a rolling window:
// Define the time range for analysis
dataRange = 5m
// Get the raw CPU usage data for the specified host and measurement
cpuData = from(bucket: "my-bucket")
|> range(start: -dataRange)
|> filter(fn: (r) => r._measurement == "cpu_usage" and r.host == "server-01")
|> filter(fn: (r) => r._field == "usage_percent")
// Calculate the moving average and standard deviation over a rolling window
// window = 1m, period = 30s means calculate over 1 minute, with a slide of 30 seconds
anomalies = cpuData
|> window(every: 30s) // Slide the window every 30 seconds
|> mean() // Calculate the mean within each window
|> join(
tables: {stddev: cpuData |> window(every: 30s) |> stddev()},
on: ["_start", "_stop"] // Join based on the window boundaries
)
|> map(fn: (r) => ({
r with
// Calculate the Z-score: how many standard deviations away from the mean
zScore: (r._value - r.stddev) / r.stddev, // Note: This is a simplified z-score. A more robust one would use the mean for subtraction.
// A more accurate Z-score calculation requires joining the mean table as well.
// Let's refine this to be more correct for anomaly detection.
}))
// A more robust anomaly detection query using mean and stddev
// Calculate the mean and standard deviation over a rolling window
stats = cpuData
|> window(every: 1m) // Window size of 1 minute
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false) // Calculate mean for every 10s interval within the window
|> join(
tables: {stddev: cpuData |> window(every: 1m) |> aggregateWindow(every: 10s, fn: stddev, createEmpty: false)},
on: ["_start", "_stop"]
)
|> map(fn: (r) => ({
r with
// Calculate the Z-score: (value - mean) / stddev
// We need to join the original data to compare each point
}))
// Let's try a different approach: calculate rolling mean and stddev, then compare
rollingStats = cpuData
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false) // Rolling mean over 1 minute
|> join(
tables: {stddev: cpuData |> aggregateWindow(every: 1m, fn: stddev, createEmpty: false)},
on: ["_start", "_stop"]
)
|> map(fn: (r) => ({
r with
// Now, compare individual data points to this rolling mean/stddev
// This requires joining back to the original data, which can be complex.
// A simpler approach for demonstration: flag points that are far from the *current* mean.
}))
// Final, practical approach: identify points significantly deviating from the rolling average.
// We'll calculate a rolling average and then flag points that are, say, 3 standard deviations away.
// This requires calculating the rolling average and standard deviation and then joining back to the raw data.
// First, get the raw data
rawCPU = from(bucket: "my-bucket")
|> range(start: -dataRange)
|> filter(fn: (r) => r._measurement == "cpu_usage" and r.host == "server-01")
|> filter(fn: (r) => r._field == "usage_percent")
// Calculate rolling statistics (mean and stddev)
// We use aggregateWindow with a period smaller than 'every' to get overlapping windows
// 'every: 10s' means a new calculation starts every 10 seconds.
// 'window_size: 1m' means each calculation considers the last 1 minute of data.
rollingStats = rawCPU
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false, period: 1m) // Rolling mean
|> join(
tables: {stddev: rawCPU |> aggregateWindow(every: 10s, fn: stddev, createEmpty: false, period: 1m)},
on: ["_start", "_stop"]
)
|> keep(columns: ["_start", "_stop", "_value_mean", "_value_stddev"]) // Rename columns for clarity
|> rename(columns: {_value_mean: "rollingMean", _value_stddev: "rollingStdDev"})
// Join the rolling statistics back to the raw data and identify anomalies
anomalies = rawCPU
|> join(
tables: {stats: rollingStats},
on: ["_start", "_stop"]
)
|> filter(fn: (r) =>
// Define anomaly threshold: e.g., more than 3 standard deviations from the mean
// Ensure stddev is not zero to avoid division by zero errors
r.rollingStdDev > 0.0 and
(r._value > r.rollingMean + (3.0 * r.rollingStdDev) or r._value < r.rollingMean - (3.0 * r.rollingStdDev))
)
|> yield(name: "anomalies")
This Flux query breaks down as follows:
rawCPU: We first grab all the raw CPU usage data forserver-01within the last5m.rollingStats: This is the crucial part. We useaggregateWindowtwice.- The first
aggregateWindowcalculates themeanofusage_percent.every: 10smeans a new calculation is triggered every 10 seconds.period: 1mspecifies that each calculation uses data from the preceding 1 minute. This effectively creates a rolling mean. - We then
jointhis with anotheraggregateWindowthat calculates thestddev(standard deviation) over the same rolling window. Thejoinaligns the mean and standard deviation for each time window. - We
keepandrenamethe columns for clarity.
- The first
anomalies: WejointherollingStatsback to therawCPUdata based on the time window boundaries (_start,_stop).- The
filterthen identifies points where the actual_value(the CPU usage) is more than 3 standard deviations away from the calculatedrollingMean. We also add a checkr.rollingStdDev > 0.0to prevent division by zero or nonsensical results if the standard deviation is zero (e.g., if all data points in a window are identical). yield(name: "anomalies")presents the identified anomalous data points.
- The
The output of this query will highlight the single data point with 95% CPU usage, as it falls far outside the normal statistical deviations of the preceding minute’s data.
The system works by treating statistical measures as first-class citizens within the query language. You’re not just retrieving data; you’re computing its statistical properties over time and then filtering based on those properties. The aggregateWindow function is key here; it allows you to define sliding or tumbling windows over your data, enabling the calculation of metrics like mean, median, standard deviation, and more, on a per-window basis. By joining these computed statistics back to the original data points, you can then apply thresholds and identify outliers.
What most people miss is how easily you can chain these statistical operations. You can calculate a rolling average, then use that average to normalize your data, and then calculate a rolling standard deviation on the normalized data to find unusual patterns of deviation, not just extreme values. This allows for detecting subtler anomalies.
The next step is often to automate these anomaly detection queries and trigger alerts or actions when anomalies are found.