Documentation

Downsample data with client libraries

Query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB.

This guide uses Python and the InfluxDB 3 Python client library, but you can use your runtime of choice and any of the available InfluxDB 3 client libraries. This guide also assumes you have already setup your Python project and virtual environment.

Install dependencies

Use pip to install the following dependencies:

  • influxdb_client_3
  • pandas
pip install influxdb3-python pandas

Prepare InfluxDB buckets

The downsampling process involves two InfluxDB buckets. Each bucket has a retention period that specifies how long data persists in the database before it expires and is deleted. By using two buckets, you can store unmodified, high-resolution data in a bucket with a shorter retention period and then downsampled, low-resolution data in a bucket with a longer retention period.

Ensure you have a bucket for each of the following:

  • One to query unmodified data from
  • The other to write downsampled data to

For information about creating buckets, see Create a bucket.

Create InfluxDB clients

Use the InfluxDBClient3 function in the influxdb_client_3 module to instantiate two InfluxDB clients:

  • One configured to connect to your InfluxDB bucket with unmodified data.
  • The other configured to connect to the InfluxDB bucket that you want to write downsampled data to.

Provide the following credentials for each client:

  • host: InfluxDB Cloud Serverless region URL (without the protocol)
  • org: InfluxDB organization name
  • token: InfluxDB API token with read and write permissions on the buckets you want to query and write to.
  • database: InfluxDB bucket name
from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your unmodified bucket
influxdb_raw = InfluxDBClient3(
    host='cloud2.influxdata.com',
    token='
API_TOKEN
'
,
database='
RAW_BUCKET_NAME
'
) # Instantiate an InfluxDBClient3 client configured for your downsampled database. # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cloud2.influxdata.com', token='
API_TOKEN
'
,
database='
DOWNSAMPLED_BUCKET_NAME
'
,
org='' )

Query InfluxDB

Define a query that performs time-based aggregations

The most common method used to downsample time series data is to perform aggregate or selector operations on intervals of time. For example, return the average value for each hour in the queried time range.

Use either SQL or InfluxQL to downsample data by applying aggregate or selector functions to time intervals.

  1. In the SELECT clause:

  2. Include a GROUP BY clause that groups by intervals returned from the DATE_BIN function in your SELECT clause and any other queried tags. The example below uses GROUP BY 1 to group by the first column in the SELECT clause.

  3. Include an ORDER BY clause that sorts data by time.

For more information, see Aggregate data with SQL - Downsample data by applying interval-based aggregates.

SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
  1. In the SELECT clause, apply an aggregate or selector function to queried fields.

  2. Include a GROUP BY clause that groups by time() at a specified interval.

SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)

Execute the query

  1. Assign the query string to a variable.

  2. Use the query method of your instantiated client to query raw data from InfluxDB. Provide the following arguments.

    • query: Query string to execute
    • language: sql or influxql
  3. Use the to_pandas method to convert the returned Arrow table to a Pandas DataFrame.

# ...

query = '''
SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
\

Write the downsampled data back to InfluxDB

  1. For InfluxQL query results, delete (drop) the iox::measurement column before writing data back to InfluxDB. You’ll avoid measurement name conflicts when querying your downsampled data later.

  2. Use the sort_values method to sort data in the Pandas DataFrame by time to ensure writing back to InfluxDB is as performant as possible.

  3. Use the write method of your instantiated downsampled client to write the query results back to your InfluxDB bucket for downsampled data. Include the following arguments:

    • record: Pandas DataFrame containing downsampled data
    • data_frame_measurement_name: Destination measurement name
    • data_frame_timestamp_column: Column containing timestamps for each point
    • data_frame_tag_columns: List of tag columns

    Columns not listed in the data_frame_tag_columns or data_frame_timestamp_column arguments are written to InfluxDB as fields.

# ...

data_frame = data_frame.sort_values(by="time")

influxdb_downsampled.write(
    record=data_frame,
    data_frame_measurement_name="home_ds",
    data_frame_timestamp_column="time",
    data_frame_tag_columns=['room']
)

Full downsampling script

from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cloud2.influxdata.com',
    token='
API_TOKEN
'
,
database='
RAW_BUCKET_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cloud2.influxdata.com', token='
API_TOKEN
'
,
database='
DOWNSAMPLED_BUCKET_NAME
'
,
org='' ) query = ''' SELECT DATE_BIN(INTERVAL '1 hour', time) AS time, room, AVG(temp) AS temp, AVG(hum) AS hum, AVG(co) AS co FROM home --In WHERE, time refers to <source_table>.time WHERE time >= now() - INTERVAL '24 hours' --1 refers to the DATE_BIN column GROUP BY 1, room ORDER BY 1 ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )
from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cloud2.influxdata.com',
    org='
ORG_NAME
'
,
token='
API_TOKEN
'
,
database='
RAW_BUCKET_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cloud2.influxdata.com', token='
API_TOKEN
'
,
database='
DOWNSAMPLED_BUCKET_NAME
'
,
org='' ) query = ''' SELECT MEAN(temp) AS temp, MEAN(hum) AS hum, MEAN(co) AS co FROM home WHERE time >= now() - 24h GROUP BY time(1h) ''' # To prevent naming conflicts when querying downsampled data, # drop the iox::measurement column before writing the data # with the new measurement. data_frame = data_frame.drop(columns=['iox::measurement']) table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )

Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

InfluxDB 3 Open Source Now in Public Alpha

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

We are releasing two products as part of the alpha.

InfluxDB 3 Core, is our new open source product. It is a recent-data engine for time series and event data. InfluxDB 3 Enterprise is a commercial version that builds on Core’s foundation, adding historical query capability, read replicas, high availability, scalability, and fine-grained security.

For more information on how to get started, check out:

InfluxDB Cloud Serverless