InfluxDB, the time-series database, can feel like a black box until you realize its core strength lies in how it samples and aggregates data before it even hits storage.

Let’s get some data in there. First, install the Python client:

pip install influxdb-client

Now, connect and write a single point. A point is a measurement, a set of tags, some fields, and a timestamp.

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

# Replace with your InfluxDB v2.x connection details
url = "http://localhost:8086"
token = "YOUR_API_TOKEN" # Get this from your InfluxDB UI -> Load Data -> API Tokens
org = "your_organization" # Your InfluxDB organization name
bucket = "your_bucket"   # The bucket you want to write to

with InfluxDBClient(url=url, token=token, org=org) as client:
    write_api = client.write_api(write_options=SYNCHRONOUS)

    # Create a point
    point = Point("cpu") \
        .tag("host", "server01") \
        .tag("region", "us-west") \
        .field("usage_user", 95.5) \
        .field("usage_system", 2.3) \
        .time(1678886400, WritePrecision.S) # Unix timestamp in seconds

    # Write the point
    write_api.write(bucket=bucket, record=point)
    print("Point written successfully.")

This Point object is the fundamental unit. You can chain .tag() and .field() calls. The .time() method is crucial; if you omit it, InfluxDB assigns the server’s current time. WritePrecision tells InfluxDB how precise your timestamp is (seconds, milliseconds, etc.). SYNCHRONOUS means the write call won’t return until the data is acknowledged by InfluxDB.

You can also write multiple points at once, which is much more efficient.

    # Create multiple points
    points = [
        Point("memory")
            .tag("host", "server01")
            .field("used_percent", 75.2),
        Point("disk")
            .tag("host", "server01")
            .tag("path", "/data")
            .field("free_bytes", 1024 * 1024 * 1024 * 500), # 500 GiB
    ]

    # Write multiple points
    write_api.write(bucket=bucket, record=points)
    print("Multiple points written successfully.")

Now, let’s query that data. The query language for InfluxDB v2.x is Flux.

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

# ... (connection details as above) ...

with InfluxDBClient(url=url, token=token, org=org) as client:
    query_api = client.query_api()

    # Flux query to get the last 5 CPU usage points for server01
    query = f'''
    from(bucket: "{bucket}")
      |> range(start: -10m)
      |> filter(fn: (r) => r["_measurement"] == "cpu")
      |> filter(fn: (r) => r["host"] == "server01")
      |> filter(fn: (r) => r["_field"] == "usage_user")
      |> sort(columns: ["_time"], desc: true)
      |> limit(n: 5)
    '''

    tables = query_api.query(query, org=org)

    # Process the results
    for table in tables:
        for record in table.records:
            print(f"Time: {record.get_time()}, Host: {record.values.get('host')}, Usage: {record.get_value()}")

This Flux query does several things:

  1. from(bucket: "{bucket}"): Specifies which bucket to query.
  2. range(start: -10m): Defines the time window for the query (last 10 minutes). You can use start: "2023-03-15T00:00:00Z" for absolute times.
  3. filter(): Narrows down the results by measurement, host, and field.
  4. sort(): Orders the results by time in descending order.
  5. limit(): Restricts the output to the top 5 records.

The query_api.query() method returns a list of Table objects, each containing Record objects. You access field values using .values.get('field_name') or .get_value() for the primary value.

A powerful aspect of InfluxDB is its ability to aggregate data on the fly. Instead of storing every single raw data point for long periods, you can use Flux to downsample and aggregate.

    # Flux query to get average CPU usage per hour for the last day
    query_agg = f'''
    from(bucket: "{bucket}")
      |> range(start: -24h)
      |> filter(fn: (r) => r["_measurement"] == "cpu")
      |> filter(fn: (r) => r["host"] == "server01")
      |> filter(fn: (r) => r["_field"] == "usage_user")
      |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
    '''

    tables_agg = query_api.query(query_agg, org=org)

    for table in tables_agg:
        for record in table.records:
            print(f"Average CPU Usage (1h window): {record.get_value()} at {record.get_time()}")

The aggregateWindow() function is key here. It groups data into time windows (every: 1h) and applies an aggregation function (fn: mean for average). createEmpty: false means it won’t output windows with no data. This is how you can store high-resolution data for a short time and lower-resolution, aggregated data for longer periods, optimizing storage.

The true magic of Flux is its streaming nature. It processes data point by point, applying transformations as it goes. This means that even complex queries involving multiple joins or aggregations can be surprisingly efficient, as data doesn’t need to be fully materialized in memory before the next step. It’s like a pipeline where each stage operates on data as it flows through.

The next concept you’ll grapple with is managing data retention policies, which determine how long InfluxDB keeps raw data before automatically deleting it, and how to set up tasks for continuous data aggregation and alerting.

Want structured learning?

Take the full Influxdb course →