Documentation

Python client library for InfluxDB v3

The InfluxDB v3 influxdb3-python Python client library integrates InfluxDB Cloud Dedicated write and query operations with Python scripts and applications.

InfluxDB client libraries provide configurable batch writing of data to InfluxDB Cloud Dedicated. Client libraries can be used to construct line protocol data, transform data from other formats to line protocol, and batch write line protocol data to InfluxDB HTTP APIs.

InfluxDB v3 client libraries can query InfluxDB Cloud Dedicated using SQL or InfluxQL. The influxdb3-python Python client library wraps the Apache Arrow pyarrow.flight client in a convenient InfluxDB v3 interface for executing SQL and InfluxQL queries, requesting server metadata, and retrieving data from InfluxDB Cloud Dedicated using the Flight protocol with gRPC.

Installation

Install the client library and dependencies using pip:

pip install influxdb3-python

Importing the module

The influxdb3-python client library package provides the influxdb_client_3 module.

Import the module:

import influxdb_client_3

Import specific class methods from the module:

from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions

API reference

The influxdb_client_3 module includes the following classes and functions.

Classes

Class InfluxDBClient3

Provides an interface for interacting with InfluxDB APIs for writing and querying data.

Syntax

__init__(self, host=None, org=None, database=None, token=None,
        write_client_options=None, flight_client_options=None, **kwargs)

Initializes and returns an InfluxDBClient3 instance with the following:

  • A singleton write client configured for writing to the database.
  • A singleton Flight client configured for querying the database.

Parameters

  • org (str): The organization name (for InfluxDB Cloud Dedicated, set this to an empty string ("")).
  • database (str): The database to use for writing and querying.
  • write_client_options (dict): Options to use when writing to InfluxDB. If None, writes are synchronous.
  • flight_client_options (dict): Options to use when querying InfluxDB.

Non-batch writing

When writing data in non-batching mode, the client immediately tries to write the data, doesn’t retry failed requests, and doesn’t invoke response callbacks.

Batch writing

In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB. The client writes the batch to InfluxDB after reaching write_client_options.batch_size or write_client_options.flush_interval. If a write fails, the client reschedules the write according to the write_client_options retry options. When using batching mode, you can define success_callback, error_callback, and retry_callback functions.

To use batching mode, pass WriteOptions as key-value pairs to the client write_client_options parameter–for example:

  1. Instantiate WriteOptions() with defaults or with WriteOptions.write_type=WriteType.batching.

      # Create a WriteOptions instance for batch writes with batch size, flush, and retry defaults.
      write_options = WriteOptions()
    
  2. Pass write_options from the preceding step to the write_client_options function.

    wco = write_client_options(WriteOptions=write_options)
    

    The output is a dict with WriteOptions key-value pairs.

  3. Initialize the client, setting the write_client_options argument to wco from the preceding step.

    from influxdb_client_3 import Point, InfluxDBClient3
    
    points = [
        Point("home")
                .tag("room", "Kitchen")
                .field("temp", 25.3)
                .field('hum', 20.2)
                .field('co', 9),
        Point("home")
                .tag("room", "Living Room")
                .field("temp", 24.0)
                .field('hum', 20.0)
                .field('co', 5)]
    
    with InfluxDBClient3(token="
    DATABASE_TOKEN
    "
    ,
    host="cluster-id.influxdb.io", database="
    DATABASE_NAME
    "
    ,
    write_client_options=wco) as client: client.write(record=points)

Synchronous writing

Synchronous mode is the default mode for writing data (in batch and non-batch modes). To specify synchronous mode, set _write_client_options=None or _write_client_options.write_type=WriteType.synchronous.

Examples

Initialize a client

The following example initializes a client for writing and querying data in a InfluxDB Cloud Dedicated database. When writing or querying, the client waits synchronously for the response.

Given client.write_client_options doesn’t set WriteOptions, the client uses the default non-batch writing mode.

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(token="
DATABASE_TOKEN
"
,
host="cluster-id.influxdb.io", database="
DATABASE_NAME
"
)

Replace the following:

  • DATABASE_NAME: the name of your InfluxDB Cloud Dedicated database
  • DATABASE_TOKEN: an InfluxDB Cloud Dedicated database token with read permissions on the specified database
Initialize a client for batch writing

The following example shows how to initialize a client for batch writing data to the database. When writing data, the client uses batch mode with default options and invokes the callback function, if specified, for the response status (success, error, or retryable error).

  from influxdb_client_3 import InfluxDBClient3,
                                write_client_options,
                                WriteOptions,
                                InfluxDBError

  # Define callbacks for write responses
  def success(self, data: str):
      print(f"Successfully wrote batch: data: {data}")

  def error(self, data: str, exception: InfluxDBError):
      print(f"Failed writing batch: config: {self}, data: {data},
      error: {exception}")

  def retry(self, data: str, exception: InfluxDBError):
      print(f"Failed retry writing batch: config: {self}, data: {data},
      error: {exception}")

  # Instantiate WriteOptions for batching
  write_options = WriteOptions()
  wco = write_client_options(success_callback=success,
                              error_callback=error,
                              retry_callback=retry,
                              WriteOptions=write_options)

  # Use the with...as statement to ensure the file is properly closed and resources
  # are released.
  with InfluxDBClient3(token="
DATABASE_TOKEN
"
, host="cluster-id.influxdb.io",
org="", database="
DATABASE_NAME
"
,
write_client_options=wco) as client: client.write_file(file='./home.csv', timestamp_column='time', tag_columns=["room"])

Replace the following:

  • DATABASE_NAME: the name of your InfluxDB Cloud Dedicated database
  • DATABASE_TOKEN: an InfluxDB Cloud Dedicated database token with read permissions on the specified database

InfluxDBClient3 instance methods

InfluxDBClient3.write

Writes a record or a list of records to InfluxDB.

Syntax

write(self, record=None, **kwargs)

Parameters

  • record: A record or list of records to write. A record can be a Point object, a dict that represents a point, a line protocol string, or a DataFrame.
  • database: The database to write to. Default is to write to the database specified for the client.
  • **kwargs: Additional write options–for example:
    • write_precision: Optional. Default is "ns". Specifies the precision ("ms", "s", "us", "ns") for timestamps in record.
    • write_client_options: Optional. Specifies callback functions and options for batch writing mode.

Examples

Write a line protocol string
from influxdb_client_3 import InfluxDBClient3

points = "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1641024000"

client = InfluxDBClient3(token="
DATABASE_TOKEN
"
, host="cluster-id.influxdb.io",
database="
DATABASE_NAME
"
)
client.write(record=points, write_precision="s")
Write data using points

The influxdb_client_3.Point class provides an interface for constructing a data point for a measurement and setting fields, tags, and the timestamp for the point. The following example shows how to create a Point object, and then write the data to InfluxDB.

from influxdb_client_3 import Point, InfluxDBClient3
point = Point("home").tag("room", "Kitchen").field("temp", 72)
...
client.write(point)
Write data using a dict

InfluxDBClient3 can serialize a dictionary object into line protocol. If you pass a dict to InfluxDBClient3.write, the client expects the dict to have the following point attributes:

  • measurement (str): the measurement name
  • tags (dict): a dictionary of tag key-value pairs
  • fields (dict): a dictionary of field key-value pairs
  • time: the timestamp for the record

The following example shows how to define a dict that represents a point, and then write the data to InfluxDB.

from influxdb_client_3 import InfluxDBClient3

# Using point dictionary structure
points = {
          "measurement": "home",
          "tags": {"room": "Kitchen", "sensor": "K001"},
          "fields": {"temp": 72.2, "hum": 36.9, "co": 4},
          "time": 1641067200
          }

client = InfluxDBClient3(token="
DATABASE_TOKEN
"
,
host="cluster-id.influxdb.io", database="
DATABASE_NAME
"
,
org="") client.write(record=points, write_precision="s")

InfluxDBClient3.write_file

Writes data from a file to InfluxDB. Execution is synchronous.

Syntax

write_file(self, file, measurement_name=None, tag_columns=[],
          timestamp_column='time', **kwargs)

Parameters

  • file (str): A path to a file containing records to write to InfluxDB. The filename must end with one of the following supported extensions. For more information about encoding and formatting data, see the documentation for each supported format:

  • measurement_name: Defines the measurement name for records in the file. The specified value takes precedence over measurement and iox::measurement columns in the file. If no value is specified for the parameter, and a measurement column exists in the file, the measurement column value is used for the measurement name. If no value is specified for the parameter, and no measurement column exists, the iox::measurement column value is used for the measurement name.

  • tag_columns: A list containing the names of tag columns. Columns not included in the list and not specified by another parameter are assumed to be fields.

  • timestamp_column: The name of the column that contains timestamps. Default is 'time'.

  • database: The database to write to. Default is to write to the database specified for the client.

  • **kwargs: Additional write options–for example:

    • write_precision: Optional. Default is "ns". Specifies the precision ("ms", "s", "us", "ns") for timestamps in record.
    • write_client_options: Optional. Specifies callback functions and options for batch writing mode.

Examples

Write data from a file

The following example shows how to configure write options for batching, retries, and callbacks, and how to write data from CSV and JSON files to InfluxDB:

from influxdb_client_3 import InfluxDBClient3, write_client_options,
                              WritePrecision, WriteOptions, InfluxDBError

class BatchingCallback(object):

  # Define callbacks for write responses
  def success(self, data: str):
      print(f"Successfully wrote batch: data: {data}")

  def error(self, data: str, exception: InfluxDBError):
      print(f"Failed writing batch: config: {self}, data: {data},
      error: {exception}")

  def retry(self, data: str, exception: InfluxDBError):
      print(f"Failed retry writing batch: config: {self}, data: {data},
      error: {exception}")

# Instantiate the callbacks
callback = BatchingCallback()

write_options = WriteOptions(batch_size=500,
                            flush_interval=10_000,
                            jitter_interval=2_000,
                            retry_interval=5_000,
                            max_retries=5,
                            max_retry_delay=30_000,
                            exponential_base=2)

wco = write_client_options(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry,
                          WriteOptions=write_options)

with  InfluxDBClient3(token="
DATABASE_TOKEN
"
, host="cluster-id.influxdb.io",
database="
DATABASE_NAME
"
,
_write_client_options=wco) as client: client.write_file(file='./home.csv', timestamp_column='time', tag_columns=["room"]) client.write_file(file='./home.json', timestamp_column='time', tag_columns=["room"], date_unit='ns')

InfluxDBClient3.query

Sends a Flight request to execute the specified SQL or InfluxQL query. Returns all data in the query result as an Arrow table.

Syntax

query(self, query, language="sql", mode="all", **kwargs )

Parameters

  • query (str): the SQL or InfluxQL to execute.
  • language (str): the query language used in the query parameter–"sql" or "influxql". Default is "sql".
  • mode (str): Specifies the output to return from the pyarrow.flight.FlightStreamReader. Default is "all".
    • all: Read the entire contents of the stream and return it as a pyarrow.Table.
    • chunk: Read the next message (a FlightStreamChunk) and return data and app_metadata. Returns null if there are no more messages.
    • pandas: Read the contents of the stream and return it as a pandas.DataFrame.
    • reader: Convert the FlightStreamReader into a pyarrow.RecordBatchReader.
    • schema: Return the schema for all record batches in the stream.
  • **kwargs: FlightCallOptions

Examples

Query using SQL
table = client.query("SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'")
# Filter columns.
print(table.select(['room', 'temp']))
# Use PyArrow to aggregate data.
print(table.group_by('hum').aggregate([]))
Query using InfluxQL
query = "SELECT * FROM measurement WHERE time >= -90d"
table = client.query(query=query, language="influxql")
# Filter columns.
print(table.select(['room', 'temp']))
Read all data from the stream and return a pandas DataFrame
query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'"
pd = client.query(query=query, mode="pandas")
# Print the pandas DataFrame formatted as a Markdown table.
print(pd.to_markdown())
View the schema for all batches in the stream
table = client.query('''
  SELECT *
  FROM measurement
  WHERE time >= now() - INTERVAL '90 days''''
)
# Get the schema attribute value.
print(table.schema)
Retrieve the result schema and no data
query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'"
schema = client.query(query=query, mode="schema")
print(schema)
Specify a timeout

Pass timeout=<number of seconds> for FlightCallOptions to use a custom timeout.

query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'"
client.query(query=query, timeout=5)

InfluxDBClient3.close

Sends all remaining records from the batch to InfluxDB, and then closes the underlying write client and Flight client to release resources.

Syntax

close(self)

Examples

client.close()

Class Point

influxdb_client_3.Point

Provides an interface for constructing a time series data point for a measurement, and setting fields, tags, and timestamp.

The following example shows how to create a Point, and then write the data to InfluxDB.

point = Point("home").tag("room", "Living Room").field("temp", 72)
client.write(point)

Class WriteOptions

influxdb_client_3.WriteOptions

Options for configuring client behavior (batch size, retry, callbacks, etc.) when writing data to InfluxDB.

For client configuration examples, see Initialize a client.

Syntax

__init__(self, write_type: WriteType = WriteType.batching,
                 batch_size=1_000, flush_interval=1_000,
                 jitter_interval=0,
                 retry_interval=5_000,
                 max_retries=5,
                 max_retry_delay=125_000,
                 max_retry_time=180_000,
                 exponential_base=2,
                 max_close_wait=300_000,
                 write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:

Functions

Function write_client_options(**kwargs)

influxdb_client_3.write_client_options(kwargs)
  • Takes the following parameters:

    • kwargs: keyword arguments for WriteApi
  • Returns a dictionary of write client options.

Function flight_client_options(**kwargs)

influxdb_client_3.flight_client_options(kwargs)
  • Takes the following parameters:

    • kwargs: keyword arguments for FlightClient
  • Returns a dictionary of Flight client options.

Examples

Specify the root certificate path
from influxdb_client_3 import InfluxDBClient3, flight_client_options
import certifi

fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()

client = InfluxDBClient3(
    token="DATABASE_TOKEN",
    host="cluster-id.influxdb.io",
    database="DATABASE_NAME",
    flight_client_options=flight_client_options(
        tls_root_certs=cert))

Constants

  • influxdb_client_3.SYNCHRONOUS: Represents synchronous write mode
  • influxdb_client_3.WritePrecision: Enum class that represents write precision

Exceptions

  • influxdb_client_3.InfluxDBError: Exception class raised for InfluxDB-related errors

Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Flux is going into maintenance mode and will not be supported in InfluxDB 3.0. This was a decision based on the broad demand for SQL and the continued growth and adoption of InfluxQL. We are continuing to support Flux for users in 1.x and 2.x so you can continue using it with no changes to your code. If you are interested in transitioning to InfluxDB 3.0 and want to future-proof your code, we suggest using InfluxQL.

For information about the future of Flux, see the following: