Documentation

Kafka event handler

Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming apps. Configure Kapacitor to send alert messages to a Kafka cluster.

Configuration

Configuration as well as default option values for the Kafka event handler are set in your kapacitor.conf. Below is an example configuration:

[[kafka]]
  enabled = true
  id = "localhost"
  brokers = []
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = false
  ssl-ca = ""
  ssl-cert = ""
  ssl-key = ""
  insecure-skip-verify = false

Multiple Kafka clients may be configured with multiple [[kafka]] sections in TOML. The id acts as a unique identifier for each configured Kafka client.

enabled

Set to true to enable the Kafka event handler.

id

A unique identifier for the Kafka cluster.

brokers

List of Kafka broker addresses using the host:port format.

timeout

Timeout on network operations with the Kafka brokers. If set to 0, a default of 10s is used.

batch-size

The number of messages batched before being sent to Kafka. If set to 0, a default of 100 is used.

batch-timeout

The maximum amount of time to wait before flushing an incomplete batch. If set to 0, a default of 1s is used.

use-ssl

Enable SSL communication. Must be true for other SSL options to take effect.

ssl-ca

Path to certificate authority file.

ssl-cert

Path to host certificate file.

ssl-key

Path to certificate private key file.

insecure-skip-verify

Use SSL but skip chain and host verification. (Required if using a self-signed certificate.)

Options

The following Kafka event handler options can be set in a handler file or when using .kafka() in a TICKscript.

Name Type Description
cluster string Name of the Kafka cluster.
topic string Kafka topic. In TICKscripts, this is set using .kafkaTopic().
template string Message template.
disablePartitionById boolean Disable partitioning Kafka messages by message ID.
partitionAlgorithm string Algorithm to use to assign message IDs to Kafka partitions (crc32 (default), murmur2, or fnv-1a).

Kafka message partitioning

In Kapacitor 1.6+, messages with the same ID are sent to the same Kafka partition. Previously, messages were sent to the Kafka partition with the least amount of data, regardless of the message ID. Messages with no ID are spread randomly between partitions. This aligns the Kapacitor concept of message IDs with the Kafka concept of message keys.

To revert to the previous behavior, use the disablePartitionById option.

When partitioning by ID, use the partitionHashAlgorithm to specify the method used to assign message IDs to Kafka partitions. Kapacitor supports the following partitioning algorithms:

  • crc32: (default) aligns with librdkafka and confluent-kafka-go
  • murmur2: aligns with canonical Java partitioning logic
  • fnv-1a: aligns with Shopify’s sarama project

Example: handler file

id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
options:
  cluster: kafka-cluster
  topic: kafka-topic-name
  template: kafka-template-name
  disablePartitionById: false
  partitionAlgorithm: crc32

Example: TICKscript

|alert()
  // ...
  .kafka()
    .cluster('kafka-cluster')
    .kafkaTopic('kafka-topic-name')
    .template('kafka-template-name')
    .disablePartitionById(FALSE)
    .partitionAlgorithm('crc32')

Using the Kafka Event Handler

With the Kafka event handler enabled in your kapacitor.conf, use the .kafka() attribute in your TICKscripts to send alerts to a Kafka cluster or define a Kafka handler that subscribes to a topic and sends published alerts to Kafka.

The examples below use the following Kafka configuration defined in the kapacitor.conf:

Kafka settings in kapacitor.conf

[[kafka]]
  enabled = true
  id = "infra-monitoring"
  brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true

Send alerts to a Kafka cluster from a TICKscript

The following TICKscript uses the .kafka() event handler to send the message, “Hey, check your CPU”, whenever idle CPU usage drops below 10%. It publishes the messages to the cpu-alerts topic in the infra-monitoring Kafka cluster defined in the kapacitor.conf.

kafka-cpu-alert.tick

stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .kafka()
      .kafkaTopic('cpu-alerts')

Send alerts to a Kafka cluster from a defined handler

The following setup sends an alert to the cpu topic with the message, “Hey, check your CPU”. A Kafka handler is added that subscribes to the cpu topic and publishes all alert messages to the cpu-alerts topic associated with the infra-monitoring Kafka cluster defined in the kapacitor.conf.

Create a TICKscript that publishes alert messages to a topic. The TICKscript below sends an alert message to the cpu topic any time CPU idle usage drops below 10% (or CPU usage is above 90%).

cpu_alert.tick
stream
  |from()
    .measurement('cpu')
  |alert()
    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')
    .topic('cpu')

Add and enable the TICKscript:

kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert

Create a handler file that subscribes to the cpu topic and uses the Kafka event handler to send alerts to the cpu-alerts topic in Kafka.

kafka_cpu_handler.yaml
id: kafka-cpu-alert
topic: cpu
kind: kafka
options:
  topic: 'cpu-alerts'

Add the handler:

kapacitor define-topic-handler kafka_cpu_handler.yaml

Upgrade to InfluxDB Cloud or InfluxDB 2.0!

InfluxDB Cloud and InfluxDB OSS 2.0 ready for production.