13 releases (7 breaking)
| 0.21.0 | May 30, 2026 |
|---|---|
| 0.19.1 | Mar 29, 2026 |
| 0.18.1 | Dec 8, 2025 |
| 0.18.0 | Nov 30, 2025 |
| 0.1.0 | Apr 20, 2025 |
#43 in #message-broker
Used in 2 crates
540KB
12K
SLoC
RMQTT Bridge Egress Kafka
Overview
rmqtt-bridge-egress-kafka is an RMQTT plugin that forwards local MQTT publish messages to one or more Apache Kafka topics. It acts as an egress bridge, subscribing to local MQTT topics and producing the messages to Kafka with topic transformation support.
Key features:
- Forward MQTT messages to Kafka topics
- Topic transformation with
${local.topic}variable substitution - Configurable Kafka producer properties (librdkafka settings)
- Multiple bridge and entry configuration
- Automatic reconnection
Usage
Add Dependency
Add the following to your Cargo.toml:
[dependencies]
rmqtt-bridge-egress-kafka = { version = "*", features = ["plugin"] }
Register Plugin
Add the plugin to your RMQTT node configuration:
[[plugins]]
name = "rmqtt-bridge-egress-kafka"
register = true
Optionally, specify a custom configuration file path:
[[plugins]]
name = "rmqtt-bridge-egress-kafka"
register = true
config_file = "/path/to/rmqtt-bridge-egress-kafka.toml"
Configuration
The configuration file uses TOML format. Below is the complete list of configuration options.
Bridge Instance ([[bridges]])
Each bridge instance defines a connection to a Kafka cluster.
| Option | Type | Default | Description |
|---|---|---|---|
enable |
Boolean | false |
Whether to enable this bridge instance |
name |
String | - | Unique bridge name identifier |
servers |
String | - | Comma-separated list of Kafka broker addresses. Format: host1:port1,host2:port2 |
client_id_prefix |
String | - | Kafka client ID prefix |
concurrent_client_limit |
Integer | 3 |
Maximum limit of clients connected to the remote Kafka broker |
Kafka Properties ([bridges.properties])
Additional librdkafka producer properties. See librdkafka Configuration for all available options.
| Property | Type | Default | Description |
|---|---|---|---|
message.timeout.ms |
String | 5000 |
Message timeout in milliseconds |
| Any other librdkafka property | String | - | See librdkafka configuration documentation |
Bridge Entry ([[bridges.entries]])
Each bridge entry defines a topic forwarding rule.
| Option | Type | Default | Description |
|---|---|---|---|
local.topic_filter |
String | - | Local MQTT topic filter. All messages matching this filter will be forwarded |
remote.topic |
String | - | Kafka topic to produce to. Supports ${local.topic} variable substitution |
remote.queue_timeout |
Duration | 0m |
How long to retry if the librdkafka producer queue is full. 0 means never block |
remote.partition |
Integer | - | Destination partition of the record (optional) |
remote.skip_levels |
Integer | 0 |
Number of leading topic levels to skip when substituting ${local.topic} |
Example Configuration
[[bridges]]
enable = true
name = "bridge_kafka_1"
servers = "127.0.0.1:9092"
client_id_prefix = "kafka_001"
concurrent_client_limit = 3
[bridges.properties]
"message.timeout.ms" = "5000"
[[bridges.entries]]
local.topic_filter = "local/topic1/egress/#"
remote.topic = "remote-topic1-egress-${local.topic}"
remote.queue_timeout = "0m"
Dependencies
- rmqtt (feature: plugin)
- rdkafka
- tokio
License
This project is licensed under the MIT License. See the LICENSE file for details.
Dependencies
~24–42MB
~597K SLoC