Documentation

experimental.join() function

Flux 0.65.0+

The experimental.join() function is subject to change at any time. By using this function, you accept the risks of experimental functions.

The experimental.join() function joins two streams of tables on the group key and _time column. Use the fn parameter to map new output tables using values from input tables.

To join streams of tables with different fields or measurements, use group() or drop() to remove _field and _measurement from the group key before joining. See an example below.

import "experimental"

// ...

experimental.join(
  left: left,
  right: right,
  fn: (left, right) => ({left with lv: left._value, rv: right._value })
)

Parameters

left

First of two streams of tables to join.

Second of two streams of tables to join.

fn

A function with left and right arguments that maps a new output record using values from the left and right input records. The return value must be a record.

Examples

Input and output tables

Given the following input tables:

left
_time _field _value
0001 temp 80.1
0002 temp 80.2
0003 temp 79.9
0004 temp 80.0
_time _field _value
0001 temp 72.1
0002 temp 72.2
0003 temp 71.9
0004 temp 72.0

The following experimental.join() function would output:

import "experimental"

experimental.join(
  left: left,
  right: right,
  fn: (left, right) => ({
    left with
    lv: left._value,
    rv: right._value,
    diff: left._value - right._value
  })
)
_time _field lv rv diff
0001 temp 80.1 72.1 8.0
0002 temp 80.2 72.2 8.0
0003 temp 79.9 71.9 8.0
0004 temp 80.0 72.0 8.0

Join two streams of tables
import "experimental"

s1 = from(bucket: "example-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "foo")

s2 = from(bucket: "example-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "bar")

experimental.join(
  left: s1,
  right: s2,
  fn: (left, right) => ({
    left with
    s1_value: left._value,
    s2_value: right._value
  })
)
Join two streams of tables with different fields and measurements
import "experimental"

s1 = from(bucket: "example-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "foo" and r._field == "bar")
  |> group(columns: ["_time", "_measurement", "_field", "_value"], mode: "except")

s2 = from(bucket: "example-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "baz" and r._field == "quz")
  |> group(columns: ["_time", "_measurement", "_field", "_value"], mode: "except")

experimental.join(
  left: s1,
  right: s2,
  fn: (left, right) => ({
    left with
    bar_value: left._value,
    quz_value: right._value
  })
)

Upgrade to InfluxDB Cloud or InfluxDB 2.0!

InfluxDB Cloud and InfluxDB OSS 2.0 ready for production.