Documentation

Query plans

A query plan is a sequence of steps that the InfluxDB 3 Querier devises and executes to calculate the result of a query. The Querier uses DataFusion and Arrow to build and execute query plans that call DataFusion and InfluxDB-specific operators that read data from the Object store, and the Ingester, and apply query transformations, such as deduplicating, filtering, aggregating, merging, projecting, and sorting to calculate the final result.

Like many other databases, the Querier contains a Query Optimizer. After it parses an incoming query, the Querier builds a logical plan–a sequence of high-level steps such as scanning, filtering, and sorting, required for the query. Following the logical plan, the Querier then builds the optimal physical plan to calculate the correct result in the least amount of time. The plan takes advantage of data partitioning by the Ingester to parallelize plan operations and prune unnecessary data before executing the plan. The Querier also applies common techniques of predicate and projection pushdown to further prune data as early as possible.

Display syntax

Logical and physical query plans are represented (for example, in an EXPLAIN report) in tree syntax.

  • Each plan is represented as an upside-down tree composed of nodes.
  • A parent node awaits the output of its child nodes.
  • Data flows up from the bottom innermost nodes of the tree to the outermost root node at the top.

Example logical and physical plan

The following query generates an EXPLAIN report that includes a logical and a physical plan:

EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;
  • Copy
  • Fill window

The output is the following:

Figure 1. EXPLAIN report

| plan_type     | plan                                                                     |
+---------------+--------------------------------------------------------------------------+
| logical_plan  | Sort: h2o.city ASC NULLS LAST, h2o.time DESC NULLS FIRST                 |
|               |   TableScan: h2o projection=[city, min_temp, time]                       |
| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST,time@2 DESC]             |
|               |   UnionExec                                                              |
|               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]                   |
|               |       ParquetExec: file_groups={...}, projection=[city, min_temp, time]  |
|               |     SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC]                   |
|               |       ParquetExec: file_groups={...}, projection=[city, min_temp, time]  |
|               |                                                                          |
  • Copy
  • Fill window

Output from EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;

The leaf nodes in the Figure 1 physical plan are parallel ParquetExec nodes:

      ParquetExec: file_groups={...}, projection=[city, min_temp, time]
...
      ParquetExec: file_groups={...}, projection=[city, min_temp, time]
  • Copy
  • Fill window

Data flow

A physical plan node represents a specific implementation of ExecutionPlan that receives an input stream, applies expressions for filtering and sorting, and then yields an output stream to its parent node.

The following diagram shows the data flow and sequence of ExecutionPlan nodes in the Figure 1 physical plan:

SortPreservingMergeExec
UnionExec
SortExec
ParquetExec
SortExec
ParquetExec

InfluxDB Clustered includes the following plan expressions:

Logical plan

A logical plan for a query:

  • is a high-level plan that expresses the “intent” of a query and the steps required for calculating the result.
  • requires information about the data schema
  • is independent of the physical execution, cluster configuration, data source (Ingester or Object store), or how data is organized or partitioned
  • is displayed as a tree of DataFusion LogicalPlan nodes

LogicalPlan nodes

Each node in an InfluxDB Clustered logical plan tree represents a LogicalPlan implementation that receives criteria extracted from the query and applies relational operators and optimizations for transforming input data to an output table.

The following are some LogicalPlan nodes used in InfluxDB logical plans.

TableScan

Tablescan retrieves rows from a table provider by reference or from the context.

Projection

Projection evaluates an arbitrary list of expressions on the input; equivalent to an SQL SELECT statement with an expression list.

Filter

Filter filters rows from the input that do not satisfy the specified expression; equivalent to an SQL WHERE clause with a predicate expression.

Sort

Sort sorts the input according to a list of sort expressions; used to implement SQL ORDER BY.

For details and a list of LogicalPlan implementations, see Enum datafusion::logical_expr::LogicalPlan Variants in the DataFusion documentation.

Physical plan

A physical plan, or execution plan, for a query:

  • is an optimized plan that derives from the logical plan and contains the low-level steps for query execution.
  • considers the cluster configuration (for example, CPU and memory allocation) and data organization (for example: partitions, the number of files, and whether files overlap)–for example:
    • If you run the same query with the same data on different clusters with different configurations, each cluster may generate a different physical plan for the query.
    • If you run the same query on the same cluster at different times, the physical plan may differ each time, depending on the data at query time.
  • if generated using ANALYZE, includes runtime metrics sampled during query execution
  • is displayed as a tree of ExecutionPlan nodes

ExecutionPlan nodes

Each node in an InfluxDB Clustered physical plan represents a call to a specific implementation of the DataFusion ExecutionPlan that receives input data, query criteria expressions, and an output schema.

The following are some ExecutionPlan nodes used in InfluxDB physical plans.

DeduplicateExec

InfluxDB DeduplicateExec takes an input stream of RecordBatch sorted on sort_key and applies InfluxDB-specific deduplication logic. The output is dependent on the order of the input rows that have the same key.

EmptyExec

DataFusion EmptyExec is an execution plan for an empty relation and indicates that the table doesn’t contain data for the time range of the query.

FilterExec

The execution plan for the Filter LogicalPlan.

DataFusion FilterExec evaluates a boolean predicate against all input batches to determine which rows to include in the output batches.

ParquetExec

DataFusion ParquetExec scans one or more Parquet partitions.

ParquetExec expressions

file_groups

A file group is a list of files to scan. Files are referenced by path:

  • 1/1/b862a7e9b.../243db601-....parquet
  • 1/1/b862a7e9b.../f5fb7c7d-....parquet

In InfluxDB 3, the path structure represents how data is organized.

A path has the following structure:

<namespace_id>/<table_id>/<partition_hash_id>/<uuid_of_the_file>.parquet
    1         /    1    /b862a7e9b329ee6a4.../243db601-f3f1-4....parquet
  • Copy
  • Fill window
  • namespace_id: the namespace (database) being queried
  • table_id: the table (measurement) being queried
  • partition_hash_id: the partition this file belongs to. You can count partition IDs to find how many partitions the query reads.
  • uuid_of_the_file: the file identifier.

ParquetExec processes groups in parallel and reads the files in each group sequentially.

projection

projection lists the table columns that the query plan needs to read to execute the query. The parameter name projection refers to projection pushdown, the action of filtering columns.

Consider the following sample data that contains many columns:

h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600
  • Copy
  • Fill window
table state city min_temp max_temp area time
h2o CA SF 68.4 85.7 500u 600

However, the following SQL query specifies only three columns (city, state, and time):

SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
  AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
  • Copy
  • Fill window

When processing the query, the Querier specifies the three required columns in the projection and the projection is “pushed down” to leaf nodes–columns not specified are pruned as early as possible during query execution.

projection=[city, state, time]
  • Copy
  • Fill window
output_ordering

output_ordering specifies the sort order for the output. The Querier specifies output_ordering if the output should be ordered and if the Querier knows the order.

When storing data to Parquet files, InfluxDB sorts the data to improve storage compression and query efficiency and the planner tries to preserve that order for as long as possible. Generally, the output_ordering value that ParquetExec receives is the ordering (or a subset of the ordering) of stored data.

By design, RecordBatchesExec data isn’t sorted.

In the following example, the query planner specifies the output sort order state ASC, city ASC, time ASC,:

output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]
  • Copy
  • Fill window
predicate

predicate is the data filter specified in the query and used for row filtering when scanning Parquet files.

For example, given the following SQL query:

SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
  AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
  • Copy
  • Fill window

The predicate value is the boolean expression in the WHERE statement:

predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA
  • Copy
  • Fill window
pruning predicate

pruning_predicate is created from the predicate value and is used for pruning data and files from the chosen partitions.

For example, given the following predicate parsed from the SQL:

predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA,
  • Copy
  • Fill window

The Querier creates the following pruning_predicate:

pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3
  • Copy
  • Fill window

The default filters files by time.

Before the physical plan is generated, an additional partition pruning step uses predicates on partitioning columns to prune partitions.

ProjectionExec

DataFusion ProjectionExec evaluates an arbitrary list of expressions on the input; the execution plan for the Projection LogicalPlan.

RecordBatchesExec

The InfluxDB RecordBatchesExec implementation retrieves and scans recently written, yet-to-be-persisted, data from the InfluxDB 3 Ingester.

When generating the plan, the Querier sends the query criteria, such as database, table, and columns, to the Ingester to retrieve data not yet persisted to Parquet files. If the Ingester has data that meets the criteria (the chunk size is non-zero), then the plan includes RecordBatchesExec.

RecordBatchesExec attributes

chunks

chunks is the number of data chunks from the Ingester. Often one (1), but it can be many.

projection

projection specifies a list of columns to read and output.

__chunk_order in a list of columns is an InfluxDB-generated column used to keep the chunks and files ordered for deduplication–for example:

projection=[__chunk_order, city, state, time]
  • Copy
  • Fill window

For details and other DataFusion ExecutionPlan implementations, see Struct datafusion::datasource::physical_plan implementors in the DataFusion documentation.

SortExec

The execution plan for the Sort LogicalPlan.

DataFusion SortExec supports sorting datasets that are larger than the memory allotted by the memory manager, by spilling to disk.

SortPreservingMergeExec

DataFusion SortPreservingMergeExec takes an input execution plan and a list of sort expressions and, provided each partition of the input plan is sorted with respect to these sort expressions, yields a single partition sorted with respect to them.

UnionExec

DataFusion UnionExec is the UNION ALL execution plan for combining multiple inputs that have the same schema. UnionExec concatenates the partitions and does not mix or copy data within or across partitions.

Overlapping data and deduplication

Overlapping data refers to files or batches in which the time ranges (represented by timestamps) intersect. Two chunks of data overlap if both chunks contain data for the same portion of time.

Example of overlapping data

For example, the following chunks represent line protocol written to InfluxDB:

// Chunk 4: stored parquet file
// - time range: 400-600
// - no duplicates in its own chunk
// - overlaps chunk 3
[
 "h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600",
 "h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 600",  // duplicates row 3 in chunk 5
 "h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400", // overlaps chunk 3
 "h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // overlaps chunk 3
],

// Chunk 5: Ingester data
// - time range: 550-700
// - overlaps & duplicates data in chunk 4
[
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600", // overlaps chunk 4
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=68.5,max_temp=90.0 600", // duplicates row 2 in chunk 4
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
"h2o,state=MA,city=Boston min_temp=67.4 550", // overlaps chunk 4
]
  • Copy
  • Fill window
  • Chunk 4 spans the time range 400-600 and represents data persisted to a Parquet file in the Object store.
  • Chunk 5 spans the time range 550-700 and represents yet-to-be persisted data from the Ingester.
  • The chunks overlap the range 550-600.

If data overlaps at query time, the Querier must include the deduplication process in the query plan, which uses the same multi-column sort-merge operators used by the Ingester. Compared to an ingestion plan that uses sort-merge operators, a query plan is more complex and ensures that data streams through the plan after deduplication.

Because sort-merge operations used in deduplication have a non-trivial execution cost, InfluxDB 3 tries to avoid the need for deduplication. Due to how InfluxDB organizes data, a Parquet file never contains duplicates of the data it stores; only overlapped data can contain duplicates. During compaction, the Compactor sorts stored data to reduce overlaps and optimize query performance. For data that doesn’t have overlaps, the Querier doesn’t need to include the deduplication process and the query plan can further distribute non-overlapping data for parallel processing.

DataFusion query plans

For more information about DataFusion query plans and the DataFusion API used in InfluxDB 3, see the following:


Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

InfluxDB 3 Open Source Now in Public Alpha

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

We are releasing two products as part of the alpha.

InfluxDB 3 Core, is our new open source product. It is a recent-data engine for time series and event data. InfluxDB 3 Enterprise is a commercial version that builds on Core’s foundation, adding historical query capability, read replicas, high availability, scalability, and fine-grained security.

For more information on how to get started, check out: