17 releases
| 0.3.1 | May 20, 2026 |
|---|---|
| 0.3.0 | May 12, 2026 |
| 0.2.13 | Apr 28, 2026 |
| 0.2.8 | Mar 17, 2026 |
| 0.1.0 | Jan 15, 2026 |
#13 in #drasi
2MB
36K
SLoC
MySQL Stored Procedure Reaction
A Drasi reaction plugin that invokes MySQL stored procedures when continuous query results change.
Overview
The MySQL Stored Procedure reaction enables you to:
- Execute different stored procedures for ADD, UPDATE, and DELETE operations
- Map query result fields to stored procedure parameters using
@after.fieldNameand@before.fieldNamesyntax - Configure default templates for all queries or per-query routes for specific queries
- Handle multiple queries with different stored procedure configurations
- Automatically retry failed procedure calls with exponential backoff
- Configure connection pooling and timeouts
Installation
Add the dependency to your Cargo.toml:
[dependencies]
drasi-reaction-storedproc-mysql = { path = "path/to/drasi-core/components/reactions/storedproc-mysql" }
Quick Start
1. Create Stored Procedures in MySQL
DELIMITER //
CREATE PROCEDURE add_user(
IN p_id INT,
IN p_name VARCHAR(255),
IN p_email VARCHAR(255)
)
BEGIN
INSERT INTO users_sync (id, name, email)
VALUES (p_id, p_name, p_email);
END //
CREATE PROCEDURE update_user(
IN p_id INT,
IN p_name VARCHAR(255),
IN p_email VARCHAR(255)
)
BEGIN
UPDATE users_sync
SET name = p_name, email = p_email
WHERE id = p_id;
END //
CREATE PROCEDURE delete_user(
IN p_id INT
)
BEGIN
DELETE FROM users_sync WHERE id = p_id;
END //
DELIMITER ;
2. Create the Reaction
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
use drasi_lib::DrasiLib;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let reaction = MySqlStoredProcReaction::builder("user-sync")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("password")
.with_query("user-changes")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})
.build()
.await?;
let drasi = DrasiLib::builder()
.with_id("my-app")
.with_reaction(reaction)
.build()
.await?;
drasi.start().await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
Configuration
Builder API
Traditional Username/Password Authentication
Use a default template that applies to all queries:
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
.with_ssl(true) // Enable SSL/TLS
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.with_command_timeout_ms(30000)
.with_retry_attempts(3)
.build()
.await?;
Cloud Identity Provider Authentication
For cloud-managed MySQL databases, you can use identity providers instead of passwords:
Azure AD Authentication (Azure Database for MySQL):
use drasi_lib::identity::AzureIdentityProvider;
// For Azure Kubernetes Service with Workload Identity
let identity_provider = AzureIdentityProvider::with_workload_identity("myuser@myserver")?;
// For local development or Azure VMs with Managed Identity
let identity_provider = AzureIdentityProvider::with_default_credentials("myuser@myserver")?;
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("myserver.mysql.database.azure.com")
.with_port(3306)
.with_database("mydb")
.with_identity_provider(identity_provider)
.with_ssl(true) // Required for Azure
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.build()
.await?;
AWS IAM Authentication (Amazon RDS for MySQL/Aurora MySQL):
use drasi_identity_aws::AwsIdentityProvider;
// Using IAM user credentials
let identity_provider = AwsIdentityProvider::new(
"myuser",
).await?;
// Or assuming an IAM role
let identity_provider = AwsIdentityProvider::with_assumed_role(
"myuser",
"arn:aws:iam::123456789012:role/RDSAccessRole",
None
).await?;
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("mydb.rds.amazonaws.com")
.with_port(3306)
.with_database("mydb")
.with_identity_provider(identity_provider)
.with_ssl(true) // Recommended for RDS
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.build()
.await?;
Password Provider (programmatic username/password):
use drasi_lib::identity::PasswordIdentityProvider;
let identity_provider = PasswordIdentityProvider::new("root", "secret");
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_identity_provider(identity_provider)
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.build()
.await?;
Note: When using identity providers, do not call
.with_user()or.with_password(). The identity provider handles authentication automatically.See the Identity Provider README for detailed setup instructions for Azure AD and AWS IAM authentication.
Builder API with Query-Specific Routes
Configure different stored procedures for different queries:
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
.with_query("user-changes")
.with_query("order-changes")
// Default template for most queries
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
updated: None,
deleted: None,
})
// Special route for critical queries
.with_route("order-changes", QueryConfig {
added: Some(TemplateSpec::new("CALL process_order(@after.order_id, @after.total)")),
updated: Some(TemplateSpec::new("CALL update_order(@after.order_id, @after.status)")),
deleted: Some(TemplateSpec::new("CALL cancel_order(@before.order_id)")),
})
.build()
.await?;
Configuration Options
| Option | Description | Type | Default |
|---|---|---|---|
hostname |
Database hostname | String |
"localhost" |
port |
Database port | u16 |
3306 |
user |
Database user | String |
Required |
password |
Database password | String |
Required |
database |
Database name | String |
Required |
ssl |
Enable SSL/TLS | bool |
false |
default_template |
Default template for all queries | Option<QueryConfig> |
None |
routes |
Per-query template configurations | HashMap<String, QueryConfig> |
Empty |
command_timeout_ms |
Command timeout | u64 |
30000 |
retry_attempts |
Number of retries | u32 |
3 |
Parameter Mapping
Context-Aware Field Access
The reaction uses context-aware field mapping with @after and @before prefixes:
ADD Operations
Use @after.fieldName to access the newly added data:
QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: None,
deleted: None,
}
Query result for ADD:
{
"type": "add",
"data": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
}
}
Executes:
CALL add_user(1, 'Alice', 'alice@example.com')
UPDATE Operations
Use @before.fieldName for old values and @after.fieldName for new values:
QueryConfig {
added: None,
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @before.email, @after.email)")),
deleted: None,
}
Query result for UPDATE:
{
"type": "update",
"data": {
"before": {
"id": 1,
"email": "alice@oldmail.com"
},
"after": {
"id": 1,
"email": "alice@newmail.com"
}
}
}
Executes:
CALL update_user(1, 'alice@oldmail.com', 'alice@newmail.com')
DELETE Operations
Use @before.fieldName to access the deleted data:
QueryConfig {
added: None,
updated: None,
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
}
Query result for DELETE:
{
"type": "delete",
"data": {
"id": 1,
"name": "Alice"
}
}
Executes:
CALL delete_user(1)
Nested Field Access
Access deeply nested fields using dot notation:
TemplateSpec::new("CALL add_address(@after.user.id, @after.location.city, @after.location.floor)")
Query result:
{
"user": {
"id": 123
},
"location": {
"city": "Seattle",
"floor": 5
}
}
Executes:
CALL add_address(123, 'Seattle', 5)
Template Routing Priority
When a query result arrives, the reaction determines which stored procedure to call using the following priority:
- Query-specific route: If a route exists for the query ID, use its template
- Default template: If no route exists, use the default template
- Skip: If neither exists for the operation type, skip processing
Example:
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
// Default template - used by most queries
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
updated: Some(TemplateSpec::new("CALL default_update(@after.id)")),
deleted: None, // No default for deletes
})
// Special handling for critical-query
.with_route("critical-query", QueryConfig {
added: Some(TemplateSpec::new("CALL critical_add(@after.id, @after.priority)")),
updated: None, // Falls back to default_update
deleted: Some(TemplateSpec::new("CALL critical_delete(@before.id)")),
})
.with_query("normal-query")
.with_query("critical-query")
.build()
.await?;
In this example:
normal-queryADD → Usesdefault_addnormal-queryUPDATE → Usesdefault_updatenormal-queryDELETE → Skipped (no template)critical-queryADD → Usescritical_add(route override)critical-queryUPDATE → Usesdefault_update(fallback)critical-queryDELETE → Usescritical_delete(route)
Error Handling
The reaction includes automatic retry logic with exponential backoff:
- Initial retry: 100ms delay
- Subsequent retries: 200ms, 400ms, 800ms, etc.
- Max retries: Configurable (default: 3)
- Timeout: Configurable per command (default: 30s)
Connection Pooling
The MySQL reaction uses connection pooling for optimal performance. Connections are automatically managed and reused.
Plugin Packaging
This reaction is compiled as a dynamic plugin (cdylib) that can be loaded by drasi-server at runtime.
Key files:
Cargo.toml— includescrate-type = ["lib", "cdylib"]src/descriptor.rs— implementsReactionPluginDescriptorwith kind"storedproc-mysql", configuration DTO, and OpenAPI schema generationsrc/lib.rs— invokesdrasi_plugin_sdk::export_plugin!to export the plugin entry point
Building:
cargo build -p drasi-reaction-storedproc-mysql
The compiled .so (Linux) / .dylib (macOS) / .dll (Windows) is placed in target/debug/ and can be copied to the server's plugins/ directory.
For more details on the plugin descriptor pattern and configuration DTOs, see the Reaction Developer Guide.
License
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.
Dependencies
~24–39MB
~520K SLoC