Documentation

Use client libraries to downsample data

Query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB.

This guide uses Python and the InfluxDB v3 Python client library, but you can use your runtime of choice and any of the available InfluxDB v3 client libraries. This guide also assumes you have already setup your Python project and virtual environment.

Install dependencies

Use pip to install the following dependencies:

  • influxdb_client_3
  • pandas
pip install influxdb3-python pandas

Prepare InfluxDB databases

The downsampling process involves two InfluxDB databases. Each database has a retention period that specifies how long data persists in the database before it expires and is deleted. By using two databases, you can store unmodified, high-resolution data in a database with a shorter retention period and then downsampled, low-resolution data in a database with a longer retention period.

Ensure you have a database for each of the following:

  • One to query unmodified data from
  • The other to write downsampled data to

For information about creating databases, see Create a database.

Create InfluxDB clients

Use the InfluxDBClient3 function in the influxdb_client_3 module to instantiate two InfluxDB clients:

  • One configured to connect to your InfluxDB database with unmodified data.
  • The other configured to connect to the InfluxDB database that you want to write downsampled data to.

Provide the following credentials for each client:

  • host: InfluxDB Cloud Dedicated cluster URL (without the protocol)
  • token: InfluxDB database token with read and write permissions on the databases you want to query and write to.
  • database: InfluxDB database name
from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
    host='cluster-id.influxdb.io',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # Instantiate an InfluxDBClient3 client configured for your downsampled database. # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-id.influxdb.io', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' )

Query InfluxDB

Define a query that performs time-based aggregations

The most common method used to downsample time series data is to perform aggregate or selector operations on intervals of time. For example, return the average value for each hour in the queried time range.

Use either SQL or InfluxQL to downsample data by applying aggregate or selector functions to time intervals.

  1. In the SELECT clause:

  2. Include a GROUP BY clause that groups by intervals returned from the DATE_BIN function in your SELECT clause and any other queried tags. The example below uses GROUP BY 1 to group by the first column in the SELECT clause.

  3. Include an ORDER BY clause that sorts data by time.

For more information, see Aggregate data with SQL - Downsample data by applying interval-based aggregates.

SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
  1. In the SELECT clause, apply an aggregate or selector function to queried fields.

  2. Include a GROUP BY clause that groups by time() at a specified interval.

SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)

Execute the query

  1. Assign the query string to a variable.

  2. Use the query method of your instantiated client to query raw data from InfluxDB. Provide the following arguments.

    • query: Query string to execute
    • language: sql or influxql
  3. Use the to_pandas method to convert the returned Arrow table to a Pandas DataFrame.

# ...

query = '''
SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
\

Write the downsampled data back to InfluxDB

  1. For InfluxQL query results, delete (drop) the iox::measurement column before writing data back to InfluxDB. You’ll avoid measurement name conflicts when querying your downsampled data later.

  2. Use the sort_values method to sort data in the Pandas DataFrame by time to ensure writing back to InfluxDB is as performant as possible.

  3. Use the write method of your instantiated downsampled client to write the query results back to your InfluxDB database for downsampled data. Include the following arguments:

    • record: Pandas DataFrame containing downsampled data
    • data_frame_measurement_name: Destination measurement name
    • data_frame_timestamp_column: Column containing timestamps for each point
    • data_frame_tag_columns: List of tag columns

    Columns not listed in the data_frame_tag_columns or data_frame_timestamp_column arguments are written to InfluxDB as fields.

# ...

data_frame = data_frame.sort_values(by="time")

influxdb_downsampled.write(
    record=data_frame,
    data_frame_measurement_name="home_ds",
    data_frame_timestamp_column="time",
    data_frame_tag_columns=['room']
)

Full downsampling script

from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cluster-id.influxdb.io',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-id.influxdb.io', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) query = ''' SELECT DATE_BIN(INTERVAL '1 hour', time) AS time, room, AVG(temp) AS temp, AVG(hum) AS hum, AVG(co) AS co FROM home --In WHERE, time refers to <source_table>.time WHERE time >= now() - INTERVAL '24 hours' --1 refers to the DATE_BIN column GROUP BY 1, room ORDER BY 1 ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )
from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cluster-id.influxdb.io',
    token='
DATABASE_TOKEN
'
,
database='
RAW_DATABASE_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cluster-id.influxdb.io', token='
DATABASE_TOKEN
'
,
database='
DOWNSAMPLED_DATABASE_NAME
'
,
org='' ) query = ''' SELECT MEAN(temp) AS temp, MEAN(hum) AS hum, MEAN(co) AS co FROM home WHERE time >= now() - 24h GROUP BY time(1h) ''' # To prevent naming conflicts when querying downsampled data, # drop the iox::measurement column before writing the data # with the new measurement. data_frame = data_frame.drop(columns=['iox::measurement']) table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )

Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Flux is going into maintenance mode and will not be supported in InfluxDB 3.0. This was a decision based on the broad demand for SQL and the continued growth and adoption of InfluxQL. We are continuing to support Flux for users in 1.x and 2.x so you can continue using it with no changes to your code. If you are interested in transitioning to InfluxDB 3.0 and want to future-proof your code, we suggest using InfluxQL.

For information about the future of Flux, see the following: