InfluxDB’s Flux language lets you combine data from different measurements as if they were in the same table.

Let’s say you have two measurements in your InfluxDB: cpu and mem. The cpu measurement records CPU utilization, and mem records memory usage. You want to see them side-by-side, aligned by time.

// Fetch CPU data
cpuData = from(bucket: "my_bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")

// Fetch Memory data
memData = from(bucket: "my_bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")

// Join the two datasets
join(
  tables: {cpu: cpuData, mem: memData},
  on: ["_time", "_host"] // Join on time and hostname
)
|> yield(name: "cpu_and_mem_usage")

This script first defines cpuData and memData by querying your InfluxDB bucket for specific fields within their respective measurements. The range(start: -1h) ensures we’re looking at data from the last hour.

The magic happens with the join() function. It takes a record of tables ({cpu: cpuData, mem: memData}) and an on parameter. The on parameter specifies the columns that must match for rows to be joined. Here, we’re joining on _time and _host, meaning a CPU record will only be joined with a memory record if they occurred at the exact same time and on the exact same host.

The output table will contain columns from both cpuData and memData. For our example, you’d see _time, _host, _value_cpu (the user CPU usage), and _value_mem (the percent memory used). Flux automatically appends suffixes (_cpu, _mem) to the _value column to distinguish them.

The core problem Flux’s join() solves is bringing disparate time-series data together for correlation and analysis, which is a common requirement when monitoring systems. You might have metrics from different agents, different services, or different hardware components, and you need to see how they relate. Without join(), you’d typically have to do this post-processing in another tool, or by running multiple queries and manually aligning the results, which is tedious and error-prone.

Internally, join() works by comparing rows from the specified tables based on the on columns. For each matching row, it creates a new row containing all columns from both input rows. If there are non-matching rows in either table, they are dropped by default. You can control this behavior with the method parameter (e.g., method: "left" to keep all rows from the left table).

The on parameter is crucial. You can join on any combination of columns. If you only specify _time, you’ll join all CPU data with all memory data that happened at the same second, which might not be what you want if your data isn’t perfectly synchronized or if you have multiple hosts. Including _host (or any other common tag like region or service) ensures you’re comparing apples to apples within specific contexts.

A subtle but powerful aspect of join() is its ability to handle different data types in the joined columns, as long as they are compatible for comparison. For instance, if one field is a string representation of a number and another is an actual number, Flux might implicitly convert them for the join. However, it’s always best practice to ensure your data schema is consistent to avoid unexpected behavior.

The next hurdle you’ll likely encounter is performing calculations after the join, like finding the correlation coefficient between CPU and memory usage over time.

Want structured learning?

Take the full Influxdb course →