14 releases (7 breaking)
| 0.21.0 | May 30, 2026 |
|---|---|
| 0.19.1 | Mar 29, 2026 |
| 0.18.1 | Dec 8, 2025 |
| 0.18.0 | Nov 30, 2025 |
| 0.1.1 | May 22, 2025 |
#12 in #ingress
Used in 2 crates
540KB
12K
SLoC
RMQTT Bridge Ingress Kafka
Overview
rmqtt-bridge-ingress-kafka is an RMQTT plugin that consumes messages from one or more Apache Kafka topics and publishes them to the local RMQTT broker. It acts as an ingress bridge, bridging Kafka messages into MQTT with flexible topic mapping and consumer group management.
Key features:
- Consume messages from Kafka topics with consumer groups
- Flexible topic transformation with
${kafka.key}variable substitution - Configurable consumer offset start position
- Configurable Kafka consumer properties (librdkafka settings)
- Partition-level consumption control
- Message expiration interval
Usage
Add Dependency
Add the following to your Cargo.toml:
[dependencies]
rmqtt-bridge-ingress-kafka = { version = "*", features = ["plugin"] }
Register Plugin
Add the plugin to your RMQTT node configuration:
[[plugins]]
name = "rmqtt-bridge-ingress-kafka"
register = true
Optionally, specify a custom configuration file path:
[[plugins]]
name = "rmqtt-bridge-ingress-kafka"
register = true
config_file = "/path/to/rmqtt-bridge-ingress-kafka.toml"
Configuration
The configuration file uses TOML format. Below is the complete list of configuration options.
Bridge Instance ([[bridges]])
Each bridge instance defines a connection to a Kafka cluster.
| Option | Type | Default | Description |
|---|---|---|---|
enable |
Boolean | false |
Whether to enable this bridge instance |
name |
String | - | Unique bridge name identifier |
servers |
String | - | Comma-separated list of Kafka broker addresses. Format: host1:port1,host2:port2 |
client_id_prefix |
String | - | Kafka client ID prefix |
expiry_interval |
Duration | 5m |
Message expiration time. 0 means no expiration |
Kafka Properties ([bridges.properties])
Additional librdkafka consumer properties. See librdkafka Configuration for all available options.
| Property | Type | Default | Description |
|---|---|---|---|
message.timeout.ms |
String | 5000 |
Message timeout in milliseconds |
enable.auto.commit |
String | true |
Whether to enable auto-commit of consumer offsets |
| Any other librdkafka property | String | - | See librdkafka configuration documentation |
Bridge Entry ([[bridges.entries]])
Each bridge entry defines a topic consumption and forwarding rule.
| Option | Type | Default | Description |
|---|---|---|---|
remote.topic |
String | - | Kafka topic to consume from |
remote.group_id |
String | - | Kafka consumer group ID |
remote.start_partition |
Integer | -1 |
Start partition index (inclusive). -1 means all partitions |
remote.stop_partition |
Integer | -1 |
Stop partition index (inclusive). -1 means all partitions |
remote.offset |
String | end |
Consumer offset start position. Values: beginning, end, stored, or a specific offset number |
local.qos |
Integer | - | QoS for publishing to the local broker. Values: 0, 1, 2. If not set, follows Kafka message metadata |
local.topic |
String | - | Local MQTT topic to publish to. Supports ${kafka.key} variable for the Kafka message key |
local.retain |
Boolean | false |
Whether to retain the published message locally |
Example Configuration
[[bridges]]
enable = true
name = "bridge_kafka_1"
servers = "127.0.0.1:9092"
client_id_prefix = "kafka_001"
expiry_interval = "5m"
[bridges.properties]
"message.timeout.ms" = "5000"
[[bridges.entries]]
remote.topic = "remote-topic1-ingress"
remote.group_id = "group_id_001"
local.topic = "local/topic1/ingress/${kafka.key}"
Dependencies
- rmqtt (feature: plugin)
- rdkafka
- tokio
License
This project is licensed under the MIT License. See the LICENSE file for details.
Dependencies
~24–42MB
~597K SLoC