Create custom aggregate functions

To aggregate your data, use the Flux aggregate functions or create custom aggregate functions using the reduce()function.

Aggregate function characteristics

Aggregate functions all have the same basic characteristics:

  • They operate on individual input tables and transform all records into a single record.
  • The output table has the same group key as the input table.

How reduce() works

The reduce() function operates on one row at a time using the function defined in the fn parameter. The fn function maps keys to specific values using two records specified by the following parameters:

Parameter Description
r A record that represents the row or record.
accumulator A record that contains values used in each row’s aggregate calculation.

The reduce() function’s identity parameter defines the initial accumulator record.

Example reduce() function

The following example reduce() function produces a sum and product of all values in an input table.

|> reduce(
    fn: (r, accumulator) => ({
        sum: r._value + accumulator.sum,
        product: r._value * accumulator.product
    identity: {sum: 0.0, product: 1.0},

To illustrate how this function works, take this simplified table for example:

_time _value
2019-04-23T16:10:49Z 1.6
2019-04-23T16:10:59Z 2.3
2019-04-23T16:11:09Z 0.7
2019-04-23T16:11:19Z 1.2
2019-04-23T16:11:29Z 3.8
Input records

The fn function uses the data in the first row to define the r record. It defines the accumulator record using the identity parameter.

r           = { _time: 2019-04-23T16:10:49.00Z, _value: 1.6 }
accumulator = { sum  : 0.0, product : 1.0 }
Key mappings

It then uses the r and accumulator records to populate values in the key mappings:

// sum: r._value + accumulator.sum
sum: 1.6 + 0.0

// product: r._value * accumulator.product
product: 1.6 * 1.0
Output record

This produces an output record with the following key value pairs:

{ sum: 1.6, product: 1.6 }

The function then processes the next row using this output record as the accumulator.

Because reduce() uses the output record as the accumulator when processing the next row, keys mapped in the fn function must match keys in the identity and accumulator records.

Processing the next row
// Input records for the second row
r           = { _time: 2019-04-23T16:10:59.00Z, _value: 2.3 }
accumulator = { sum  : 1.6, product : 1.6 }

// Key mappings for the second row
sum: 2.3 + 1.6
product: 2.3 * 1.6

// Output record of the second row
{ sum: 3.9, product: 3.68 }

It then uses the new output record as the accumulator for the next row. This cycle continues until all rows in the table are processed.

Final output record and table

After all records in the table are processed, reduce() uses the final output record to create a transformed table with one row and columns for each mapped key.

Final output record
{ sum: 9.6, product: 11.74656 }
Output table
sum product
9.6 11.74656

What happened to the _time column?

The reduce() function only keeps columns that are:

  1. Are part of the input table’s group key.
  2. Explicitly mapped in the fn function.

It drops all other columns. Because _time is not part of the group key and is not mapped in the fn function, it isn’t included in the output table.

Custom aggregate function examples

To create custom aggregate functions, use principles outlined in Creating custom functions and the reduce() function to aggregate rows in each input table.

Create a custom average function

This example illustrates how to create a function that averages values in a table. This is meant for demonstration purposes only. The built-in mean() function does the same thing and is much more performant.

average = (tables=<-, outputField="average") => tables
    |> reduce(
        // Define the initial accumulator record
        identity: {count: 0.0, sum: 0.0, avg: 0.0},
        fn: (r, accumulator) => ({
            // Increment the counter on each reduce loop
            count: accumulator.count + 1.0,
            // Add the _value to the existing sum
            sum: accumulator.sum + r._value,
            // Divide the existing sum by the existing count for a new average
            avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
    // Drop the sum and the count columns since they are no longer needed
    |> drop(columns: ["sum", "count"])
    // Set the _field column of the output table to to the value
    // provided in the outputField parameter
    |> set(key: "_field", value: outputField)
    // Rename avg column to _value
    |> rename(columns: {avg: "_value"})
average = (tables=<-, outputField="average") => tables
    |> reduce(
        identity: {count: 0.0, sum: 0.0, avg: 0.0},
        fn: (r, accumulator) => ({
            count: accumulator.count + 1.0,
            sum: accumulator.sum + r._value,
            avg: (accumulator.sum + r._value) / (accumulator.count + 1.0),
    |> drop(columns: ["sum", "count"])
    |> set(key: "_field", value: outputField)
    |> rename(columns: {avg: "_value"})

Aggregate multiple columns

Built-in aggregate functions only operate on one column. Use reduce() to create a custom aggregate function that aggregates multiple columns.

The following function expects input tables to have c1_value and c2_value columns and generates an average for each.

multiAvg = (tables=<-) => tables
    |> reduce(
        identity: {
            count: 1.0,
            c1_sum: 0.0,
            c1_avg: 0.0,
            c2_sum: 0.0,
            c2_avg: 0.0,
        fn: (r, accumulator) => ({
            count: accumulator.count + 1.0,
            c1_sum: accumulator.c1_sum + r.c1_value,
            c1_avg: accumulator.c1_sum / accumulator.count,
            c2_sum: accumulator.c2_sum + r.c2_value,
            c2_avg: accumulator.c2_sum / accumulator.count,

Aggregate gross and net profit

Use reduce() to create a function that aggregates gross and net profit. This example expects profit and expenses columns in the input tables.

profitSummary = (tables=<-) => tables
    |> reduce(
        identity: {gross: 0.0, net: 0.0},
        fn: (r, accumulator) => ({
            gross: accumulator.gross + r.profit,
            net: + r.profit - r.expenses

Was this page helpful?

Thank you for your feedback!

Introducing InfluxDB Clustered

A highly available InfluxDB 3.0 cluster on your own infrastructure.

InfluxDB Clustered is a highly available InfluxDB 3.0 cluster built for high write and query workloads on your own infrastructure.

InfluxDB Clustered is currently in limited availability and is only available to a limited group of InfluxData customers. If interested in being part of the limited access group, please contact the InfluxData Sales team.

Learn more
Contact InfluxData Sales

The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Flux is going into maintenance mode and will not be supported in InfluxDB 3.0. This was a decision based on the broad demand for SQL and the continued growth and adoption of InfluxQL. We are continuing to support Flux for users in 1.x and 2.x so you can continue using it with no changes to your code. If you are interested in transitioning to InfluxDB 3.0 and want to future-proof your code, we suggest using InfluxQL.

For information about the future of Flux, see the following:

State of the InfluxDB Cloud Serverless documentation

InfluxDB Cloud Serverless documentation is a work in progress.

The new documentation for InfluxDB Cloud Serverless is a work in progress. We are adding new information and content almost daily. Thank you for your patience!

If there is specific information you’re looking for, please submit a documentation issue.

InfluxDB Cloud powered by TSM