Track state changes across task executions
Problem
It’s common to use InfluxDB tasks to evaluate and assign states to your time series data and then detect changes in those states. Tasks process data in batches, but what happens if there is a state change across the batch boundary? The task won’t recognize it without knowing the final state of the previous task execution. This guide walks through creating a task that assigns a state to rows and then uses results from the previous task execution to detect any state changes across the batch boundary so you don’t miss any state changes.
Solution
Explicitly assign levels to your data based on thresholds.
Solution Advantages
This is the easiest solution to understand if you have never written a task with the monitor
package.
Solution Disadvantages
You have to explicitly define your thresholds, which potentially requires more code.
Solution Overview
Create a task where you:
- Boilerplate. Import packages and define task options.
- Query your data.
- Assign states to your data based on thresholds. Store this data in a variable, i.e. “states”.
- Write the “states” to a bucket.
- Find the latest value from the previous task run and store it in a variable “last_state_previous_task”.
- Union “states” and “last_state_previous_task”. Store this data in a variable “unioned_states”.
- Discover state changes in “unioned_states”. Store this data in a variable “state_changes”.
- Notify on state changes that span across the last two tasks to catch any state changes that occur across task executions.
Solution Explained
- Import packages and define task options and secrets. Import the following packages:
-
Flux Telegram package: This package
-
Flux InfluxDB secrets package: This package contains the secrets.get() function which allows you to retrieve secrets from the InfluxDB secret store. Learn how to manage secrets in InfluxDB to use this package.
-
Flux InfluxDB monitoring package: This package contains functions and tools for monitoring your data.
import "contrib/sranka/telegram" import "influxdata/influxdb/secrets" import "influxdata/influxdb/monitor" option task = {name: "State changes across tasks", every: 30m, offset: 5m} telegram_token = secrets.get(key: "telegram_token") telegram_channel_ID = secrets.get(key: "telegram_channel_ID")
-
Query the data you want to monitor.
data = from(bucket: "example-bucket") // Query for data from the last successful task run or from the 1 every duration ago. // This ensures that you won’t miss any data. |> range(start: tasks.lastSuccess(orTime: -task.every)) |> filter(fn: (r) => r._measurement == "example-measurement") |> filter(fn: (r) => r.tagKey1 == "example-tag-value") |> filter(fn: (r) => r._field == "example-field")
Where
data
might look like:_measurement tagKey1 _field _value _time example-measurement example-tag-value example-field 30.0 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 2022-01-01T00:00:00Z -
Assign states to your data based on thresholds. Store this data in a variable, i.e. “states”. To simplify this example, there are only two states: “ok” and “crit.” Store states in the
_level
column (required by themonitor
package).states = data |> map(fn: (r) => ({r with _level: if r._value > 40.0 then "crit" else "ok"}))
Where
states
might look like:_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 30.0 ok 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 crit 2022-01-01T00:01:00Z -
Write “states” back to InfluxDB. You can write the data to a new measurement or to a new bucket. To write the data to a new measurement, use
set()
to update the value of the_measurement
column in your “states” data.states // (Optional) Change the measurement name to write the data to a new measurement |> set(key: "_measurement", value: "new-measurement") |> to(bucket : "example-bucket")
-
Find the latest value from the previous task run and store it in a variable “last_state_previous_task”,
last_state_previous_task = from(bucket: "example-bucket") |> range(start: date.sub(d: task.every, from: tasks.lastSuccess(orTime: -task.every)) |> filter(fn: (r) => r._measurement == "example-measurement") |> filter(fn: (r) => r.tagKey == "example-tag-value") |> filter(fn: (r) => r._field == "example-field") |> last()
Where
last_state_previous_task
might look like:_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 55.0 crit 2021-12-31T23:59:00Z -
Union “states” and “last_state_previous_task”. Store this data in a variable “unioned_states”. Use
sort()
to ensure rows are ordered by time.unioned_states = union(tables: [states, last_state_previous_task]) |> sort(columns: ["_time"], desc: true)
Where
unioned_states
might look like:_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 55.0 crit 2021-12-31T23:59:00Z example-measurement example-tag-value example-field 30.0 ok 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 crit 2022-01-01T00:01:00Z -
Use
monitor.stateChangesOnly()
to return only rows where the state changed in “unioned_states”. Store this data in a variable, “state_changes”.state_changes = unioned_states |> monitor.stateChangesOnly()
Where
state_changes
might look like:_measurement tagKey1 _field _value _level _time example-measurement example-tag-value example-field 30.0 ok 2022-01-01T00:00:00Z example-measurement example-tag-value example-field 50.0 crit 2022-01-01T00:01:00Z -
Notify on state changes that span across the last two tasks to catch any state changes that occur across task executions.
state_changes = data |> map( fn: (r) => ({ _value: telegram.message( token: telegram_token, channel: telegram_channel_ID, text: "state change at ${r._value} at ${r._time}", ), }), )
Using the unioned data, the following alerts would be sent to Telegram:
state change at 30.0 at 2022-01-01T00:00:00Z
state change at 50.0 at 2022-01-01T00:01:00Z
Was this page helpful?
Thank you for your feedback!
Support and feedback
Thank you for being part of our community! We welcome and encourage your feedback and bug reports for and this documentation. To find support, use the following resources:
Customers with an annual or support contract can contact InfluxData Support.