InfluxDB continuous queries and tasks are fundamentally about time-series data housekeeping, but most people think of them as just "summarizing data," which is only half the story.

Let’s watch this in action. Imagine we have a sensor spitting out temperature readings every second.

# InfluxDB CLI
> CREATE DATABASE sensor_data
> USE sensor_data
> CREATE RETENTION POLICY "autogen" ON sensor_data DURATION 1h REPLICATION 1 DEFAULT
> INSERT temperature,location=room1 value=22.5
> INSERT temperature,location=room1 value=22.6
# ... imagine this runs for an hour ...

Now, we want to downsample this raw, high-frequency data to an hourly average to save space and speed up queries that don’t need millisecond precision. We can use a continuous query (CQ) for this, which runs automatically as data is ingested.

-- InfluxQL Continuous Query
CREATE CONTINUOUS QUERY cq_hourly_temperature ON sensor_data
RESAMPLE EVERY 1h FOR 1h
BEGIN
  SELECT mean(value) AS average_temperature INTO temperature_hourly FROM temperature
  GROUP BY time(1h), location
END

This CQ cq_hourly_temperature is attached to the sensor_data database. RESAMPLE EVERY 1h FOR 1h tells InfluxDB to evaluate this query for every 1-hour window, starting 1 hour ago. SELECT mean(value) AS average_temperature INTO temperature_hourly FROM temperature GROUP BY time(1h), location takes the raw temperature measurements, calculates the mean of the value field over 1-hour intervals (time(1h)), and stores the result in a new measurement called temperature_hourly, grouped by the location tag.

Once this CQ is created, InfluxDB automatically runs it. If you were to query temperature_hourly after an hour, you’d see your downsampled data.

However, InfluxDB 2.x and later deprecate CQs in favor of Tasks. Tasks offer more flexibility, including the ability to run on a schedule independent of data ingestion and to use Flux, InfluxDB’s powerful scripting language.

Here’s the equivalent using a Task:

// Flux Task script
task = {
    id: "my_hourly_downsample_task",
    every: 1h,
    cron: "0 * * * *", // Run at the start of every hour
    query: '''
        // Define the time range for this task run
        // 'v.window' is the duration of the window for the task run (e.g., 1h)
        // 'v.timeRangeStart' and 'v.timeRangeStop' define the actual start and end times
        startTime = v.timeRangeStart
        stopTime = v.timeRangeStop

        // Query the raw data within the time range
        raw_data = from(bucket: "sensor_data")
            |> range(start: startTime, stop: stopTime)
            |> filter(fn: (r) => r._measurement == "temperature" and r._field == "value")

        // Calculate the hourly average
        hourly_avg = raw_data
            |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
            |> set(key: "_measurement", value: "temperature_hourly") // Rename measurement
            |> yield(name: "hourly_average")
    '''
}

// Register the task (this would be done via the InfluxDB UI or API)
// Example using influx CLI:
// influx task create -f task_script.flux

In this Flux script:

  • task.every: 1h and task.cron: "0 * * * *" define when the task runs. cron offers precise scheduling.
  • from(bucket: "sensor_data") specifies the data source.
  • range(start: startTime, stop: stopTime) defines the window of data to process for this specific task execution. v.timeRangeStart and v.timeRangeStop are automatically populated by the task scheduler.
  • filter(fn: (r) => r._measurement == "temperature" and r._field == "value") selects the relevant raw data.
  • aggregateWindow(every: 1h, fn: mean, createEmpty: false) is the core of the downsampling: it groups data into 1-hour windows and computes the mean. createEmpty: false prevents creating records for empty windows.
  • set(key: "_measurement", value: "temperature_hourly") renames the output measurement.
  • yield(name: "hourly_average") prepares the data for writing to another bucket or for further processing.

The real power here is that tasks can be configured to write their results to a different bucket. This is a crucial pattern for data tiering: keep high-resolution data in a "hot" bucket for a short time (governed by a retention policy), and then downsampled, aggregated data in a "cold" bucket for long-term storage.

// Example of writing to a different bucket
task = {
    id: "my_hourly_downsample_task",
    every: 1h,
    cron: "0 * * * *",
    query: '''
        startTime = v.timeRangeStart
        stopTime = v.timeRangeStop

        from(bucket: "sensor_data")
            |> range(start: startTime, stop: stopTime)
            |> filter(fn: (r) => r._measurement == "temperature" and r._field == "value")
            |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
            |> set(key: "_measurement", value: "temperature_hourly")
            // Write the results to the 'aggregated_data' bucket
            |> to(bucket: "aggregated_data", org: "my_org")
    '''
}

The one thing most people don’t realize is that the aggregateWindow function in Flux, when used within a task, automatically aligns its windows to the task’s schedule. If your task is scheduled to run every hour, aggregateWindow(every: 1h, ...) will produce results that perfectly align with those hourly runs, simplifying the logic for time-based aggregation significantly. You don’t need to manually calculate offsets or deal with overlapping windows if your task schedule and aggregation window are congruent.

The next concept to explore is how to combine multiple aggregations (e.g., average, min, max, count) within a single task run.

Want structured learning?

Take the full Influxdb course →