InfluxDB 3.0 is a complete rewrite that shifts from a single-node time-series database to a distributed, cloud-native architecture, meaning you can’t just "upgrade" InfluxDB 2.x in place.
Let’s walk through migrating your InfluxDB 2.x data to InfluxDB 3.0. We’ll use the influxdb-client for Python to export data from 2.x and then use the InfluxDB 3.0 SDKs or APIs to import it.
First, ensure you have InfluxDB 3.0 running. This could be InfluxDB Cloud or a self-hosted InfluxDB 3.0 cluster. For this example, let’s assume you’re targeting a self-hosted InfluxDB 3.0 instance.
Exporting Data from InfluxDB 2.x
You’ll need to query your InfluxDB 2.x instance and write the results to a file. A common and efficient format for this is CSV.
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import csv
import os
# InfluxDB 2.x connection details
influxdb_2x_url = "http://localhost:8086" # Replace with your InfluxDB 2.x URL
influxdb_2x_token = "YOUR_INFLUXDB_2X_TOKEN" # Replace with your InfluxDB 2.x token
influxdb_2x_org = "your_org" # Replace with your InfluxDB 2.x organization
influxdb_2x_bucket = "your_bucket" # Replace with your InfluxDB 2.x bucket
# Output CSV file
output_csv_file = "influxdb_export.csv"
# Initialize InfluxDB 2.x client
client_2x = InfluxDBClient(url=influxdb_2x_url, token=influxdb_2x_token, org=influxdb_2x_org)
query_api_2x = client_2x.query_api()
# Construct your Flux query. This query selects all data from the specified bucket.
# You might need to adjust the time range or add filters for your specific needs.
flux_query = f"""
from(bucket: "{influxdb_2x_bucket}")
|> range(start: 0)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
"""
print(f"Exporting data from InfluxDB 2.x bucket '{influxdb_2x_bucket}' to {output_csv_file}...")
# Execute the query and write to CSV
try:
tables = query_api_2x.query(flux_query)
with open(output_csv_file, 'w', newline='') as csvfile:
csv_writer = csv.writer(csvfile)
# Write header (from the first table's columns)
if tables and tables[0].records:
header = ["_time"] + [record.get_field() for record in tables[0].records[0].values.keys() if record.get_field() != "_time"]
# Add tag keys to the header dynamically
tag_keys = sorted(list(tables[0].records[0].values.keys() - {"_time", "_field", "_value", "_start", "_stop", "_measurement"}))
header.extend(tag_keys)
csv_writer.writerow(header)
# Write data rows
for table in tables:
for record in table.records:
row_data = [record.get_time()]
# Extract field values, handling potential missing fields
fields = {}
for field_key, field_value in record.values.items():
if field_key not in ["_time", "_field", "_value", "_start", "_stop", "_measurement"] and field_key not in tag_keys:
fields[field_key] = field_value
# Ensure all header fields are present, fill with None if missing
for col_name in header[1:]: # Skip _time
if col_name in tag_keys:
row_data.append(record.values.get(col_name))
elif col_name in fields:
row_data.append(fields[col_name])
else:
row_data.append(None) # Or some default value
csv_writer.writerow(row_data)
else:
print("No data found in the specified bucket.")
print("Data export complete.")
except Exception as e:
print(f"An error occurred during export: {e}")
finally:
client_2x.close()
This script queries your InfluxDB 2.x instance, pivots the data to have fields as columns, and writes it to a CSV file. The pivot function is crucial here to transform the typical InfluxDB line protocol format into a tabular structure suitable for CSV. We explicitly add tag keys to the header to ensure they are preserved.
Importing Data into InfluxDB 3.0
InfluxDB 3.0 uses Apache Arrow for efficient data ingestion. You can use the InfluxDB 3.0 SDKs (available for Python, Java, JavaScript, etc.) or the Arrow Flight endpoint to load this data.
Here’s an example using the InfluxDB 3.0 Python SDK to import the CSV data. You’ll need to install the influxdb3-python package: pip install influxdb3-python.
from influxdb3.client import InfluxDBClient3
from influxdb3.exceptions import InfluxDBError
import pandas as pd
import os
# InfluxDB 3.0 connection details
# If using InfluxDB Cloud, use the appropriate service URL and API key.
# Example for InfluxDB Cloud:
# influxdb_3x_url = "YOUR_INFLUXDB_CLOUD_HOST"
# influxdb_3x_token = "YOUR_INFLUXDB_CLOUD_API_KEY"
# influxdb_3x_org = "YOUR_INFLUXDB_CLOUD_ORG_NAME" # This is typically your username
# Example for self-hosted InfluxDB 3.0:
influxdb_3x_url = "http://localhost:8086" # Replace with your InfluxDB 3.0 URL
influxdb_3x_token = "YOUR_INFLUXDB_3X_TOKEN" # Replace with your InfluxDB 3.0 token
influxdb_3x_org = "your_org" # Replace with your InfluxDB 3.0 organization name
# Input CSV file from export
input_csv_file = "influxdb_export.csv"
# InfluxDB 3.0 database/table name
# In InfluxDB 3.0, data is organized into databases and tables, not buckets.
# You'll need to create a database and table if they don't exist.
influxdb_3x_database = "my_database" # Replace with your desired database name
influxdb_3x_table = "my_table" # Replace with your desired table name
# Initialize InfluxDB 3.0 client
client_3x = InfluxDBClient3(host=influxdb_3x_url, token=influxdb_3x_token, org=influxdb_3x_org)
print(f"Importing data from {input_csv_file} to InfluxDB 3.0 database '{influxdb_3x_database}', table '{influxdb_3x_table}'...")
# Read the CSV file into a Pandas DataFrame
try:
df = pd.read_csv(input_csv_file)
# Ensure the '_time' column is in datetime format
df['_time'] = pd.to_datetime(df['_time'])
# InfluxDB 3.0 requires a specific schema.
# The DataFrame column names will become InfluxDB 3.0 table column names.
# 'time' is a reserved column name for the timestamp.
# '_measurement' from InfluxDB 2.x often becomes a column in InfluxDB 3.0.
# If your CSV has a '_measurement' column, ensure it's mapped correctly.
# For this example, we assume the CSV columns directly map to desired InfluxDB 3.0 columns.
# If your CSV doesn't have a '_measurement' column and you want to set one:
# df['measurement'] = 'your_default_measurement' # Rename to 'measurement'
# If you need to rename columns to match expected InfluxDB 3.0 schema (e.g., 'measurement'):
# df.rename(columns={'old_col_name': 'new_col_name'}, inplace=True)
# Create database and table if they don't exist
# In a real-world scenario, you might want to check existence first.
# For simplicity, we'll assume they can be created on the fly or pre-created.
# InfluxDB 3.0 SDK doesn't have direct `create_database` or `create_table` methods
# in the client itself. You would typically manage this via UI, CLI, or API calls
# to the metadata service. For this script, we'll proceed assuming they exist or
# will be created implicitly by the write operation if permissions allow.
# Write data to InfluxDB 3.0
# The write_pandas method handles the conversion to Arrow and ingestion.
# It automatically maps the DataFrame index to the time column if it's named 'time'.
# If your time column is named '_time', you might need to set it as the index first:
# df.set_index('_time', inplace=True)
# Or specify `time_column='_time'` if the SDK supports it directly for DataFrame writes.
# For simplicity, let's ensure '_time' is the time column and other columns are data.
# The SDK typically expects a 'time' column for the timestamp.
# Let's rename '_time' to 'time' for standard ingestion.
df.rename(columns={'_time': 'time'}, inplace=True)
# If you have a '_measurement' column from your export, it will be ingested as is.
# If you want to use it as the measurement name, you might need to process it.
# However, InfluxDB 3.0 typically uses a single table name per write operation.
# If you need to partition by measurement, you'd write to different tables.
# The write_pandas method maps DataFrame columns to InfluxDB 3.0 table columns.
# The `database` and `table` arguments specify where to write.
client_3x.write_pandas(
dataframe=df,
database=influxdb_3x_database,
table=influxdb_3x_table,
# If your timestamp column is not named 'time', specify it here.
# time_column='time' # Already renamed to 'time' above
)
print("Data import complete.")
except FileNotFoundError:
print(f"Error: The file {input_csv_file} was not found.")
except InfluxDBError as e:
print(f"InfluxDB 3.0 error occurred: {e}")
# For more detailed error inspection:
# print(f"Error details: {e.response.text}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
client_3x.close()
This Python script reads the exported CSV, converts the _time column to datetime objects, renames it to time (as time is a special column name in InfluxDB 3.0 for timestamps), and then uses client_3x.write_pandas() to ingest the data into your specified InfluxDB 3.0 database and table.
Key Considerations for Migration:
- Schema Evolution: InfluxDB 2.x uses a schema-less model where fields can be added dynamically. InfluxDB 3.0, built on Apache Arrow, has a more structured schema. While it supports schema evolution, it’s good practice to have a consistent schema for your tables. The
pivotin the export step helps consolidate fields. - Data Model: InfluxDB 2.x uses
bucket,measurement,tag set, andfield set. InfluxDB 3.0 usesdatabase,table, and columns. You’ll need to map your InfluxDB 2.x buckets/measurements to InfluxDB 3.0 databases/tables. A common pattern is to use a database per application or environment and tables for different data types or logical groupings. - Query Language: InfluxDB 2.x uses Flux. InfluxDB 3.0 primarily uses SQL, with support for Arrow Flight. You’ll need to rewrite your Flux queries into SQL.
- Data Volume: For very large datasets, consider breaking the export/import into smaller chunks to manage memory and avoid timeouts. You can adjust the
range()in your Flux query to export data by time chunks. - InfluxDB 3.0 Setup: Ensure your InfluxDB 3.0 environment (Cloud or self-hosted) is properly configured with the necessary databases and tables, or that your ingestion process has permissions to create them.
This step-by-step process provides a solid foundation for migrating your InfluxDB 2.x data to the new InfluxDB 3.0 architecture. Remember to thoroughly test your migration with a subset of data before performing a full migration.