CDC Webhooks User Guide
CDC Webhooks User Guide
Version: v5.5 Last Updated: January 4, 2026
Table of Contents
- Introduction
- Getting Started
- Creating CDC Streams
- Event Filtering
- Event Transformation
- Delivery Configuration
- Circuit Breaker
- Exactly-Once Delivery
- Security
- Monitoring
- Best Practices
- API Reference
Introduction
HeliosDB’s CDC-to-Webhook integration enables real-time streaming of database changes to external systems. Changes are captured from the Write-Ahead Log (WAL) and delivered to HTTP endpoints with exactly-once semantics.
Key Features
- Real-time: Sub-50ms delivery latency (P99)
- Reliable: Exactly-once delivery guarantee
- Scalable: 100K+ events per second
- Resilient: Circuit breaker protection
- Flexible: Powerful filtering and transformation
Getting Started
Prerequisites
- HeliosDB v5.5 or later
- Network access to webhook endpoints
- Webhook endpoints that accept POST requests
Quick Start
-- Create a simple CDC streamCREATE CHANGE DATA CAPTURE ON orders TO KAFKA 'localhost:9092' TOPIC 'orders-events' FORMAT JSON;
-- Verify the streamSHOW CDC STREAMS;Creating CDC Streams
Basic Syntax
CREATE CHANGE DATA CAPTURE ON table_name TO destination_type 'address' [TOPIC 'topic_name'] [FORMAT JSON|AVRO] [AS 'stream_name'];Kafka Destination
CREATE CHANGE DATA CAPTURE ON users TO KAFKA 'kafka.example.com:9092' TOPIC 'user-events' FORMAT JSON AS 'users_stream';AWS Kinesis Destination
CREATE CHANGE DATA CAPTURE ON transactions TO KINESIS 'transactions-stream' REGION 'us-east-1' FORMAT AVRO;HTTP Webhook Destination
use heliosdb_webhooks::{WebhookServer, WebhookRegistration};
let server = WebhookServer::new(config).await?;
// Register webhook endpointserver.register(WebhookRegistration { url: "https://api.example.com/webhooks/db-changes".to_string(), secret: "your-webhook-secret".to_string(), tables: vec!["users".to_string(), "orders".to_string()], operations: vec![OperationType::Insert, OperationType::Update],}).await?;Managing Streams
-- List all streamsSHOW CDC STREAMS;
-- View stream detailsSHOW CDC STREAM STATUS users_stream;
-- Pause a streamALTER CDC STREAM users_stream PAUSE;
-- Resume a streamALTER CDC STREAM users_stream RESUME;
-- Drop a streamDROP CHANGE DATA CAPTURE users_stream;Event Filtering
Table Filtering
# Filter by tablefilters: - name: product_updates tables: - products - product_inventoryOperation Filtering
# Filter by operationfilters: - name: inserts_only tables: - orders operations: - INSERTColumn Filtering
# Filter by changed columnsfilters: - name: price_changes tables: - products columns: products: - price - discountCustom Predicates
# Filter with SQL predicatefilters: - name: high_value_orders tables: - orders operations: - INSERT predicate: "total > 1000"Rust API
use heliosdb_cdc::EventFilter;
let filter = EventFilter::new() .tables(vec!["orders".to_string()]) .operations(vec![OperationType::Insert, OperationType::Update]) .columns("orders", vec!["status".to_string()]) .predicate("total > 100") .build();
processor.add_filter(filter).await?;Event Transformation
Event Format
Default event structure:
{ "event_id": "evt_abc123", "timestamp": "2026-01-04T12:00:00Z", "sequence": 12345, "operation": "INSERT", "database": "production", "table": "orders", "key": {"id": 123}, "value": { "id": 123, "customer_id": 456, "total": 99.99, "status": "pending" }, "old_value": null, "transaction_id": "txn_xyz789"}Jinja2 Templates
transformations: - name: slack_format target: https://hooks.slack.com/services/... template: | { "text": "New {{ operation }} on {{ table }}", "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": "*{{ operation }}* on `{{ table }}`\nKey: {{ key }}" } } ] }JSONPath Extraction
transformations: - name: extract_fields target: https://analytics.io/webhooks/orders jsonpath: order_id: "$.key.id" customer: "$.value.customer_id" amount: "$.value.total" operation: "$.operation"Delivery Configuration
Retry Strategy
delivery: max_retries: 3 initial_retry_delay_ms: 100 max_retry_delay_ms: 30000 timeout_secs: 30Retry schedule with exponential backoff:
- Attempt 1: Immediate
- Attempt 2: 100ms + jitter
- Attempt 3: 200ms + jitter
- Attempt 4: 400ms + jitter
Worker Pool
worker_pool: num_workers: 100 max_concurrent_requests: 1000 connection_pool_size: 500Rust Configuration
use heliosdb_webhooks::{WebhookConfig, RetryConfig};
let config = WebhookConfig { worker_pool_size: 100, max_concurrent_requests: 1000, retry: RetryConfig { max_attempts: 3, initial_delay_ms: 100, max_delay_ms: 30000, multiplier: 2.0, jitter: true, }, timeout_secs: 30, ..Default::default()};Circuit Breaker
The circuit breaker prevents cascading failures when webhook endpoints are unhealthy.
States
| State | Description | Behavior |
|---|---|---|
| Closed | Normal operation | All requests flow through |
| Open | Too many failures | Requests rejected immediately |
| Half-Open | Testing recovery | Limited requests allowed |
Configuration
circuit_breaker: failure_threshold: 5 # Open after 5 failures success_threshold: 2 # Close after 2 successes in half-open timeout_secs: 60 # Stay open for 60 seconds half_open_max_calls: 3 # Max calls in half-open stateMonitoring Circuit Breaker
// Check circuit statelet state = circuit_breaker.get_state("https://api.example.com").await?;
match state { CircuitState::Closed => println!("Operating normally"), CircuitState::Open => println!("Circuit open - endpoint unhealthy"), CircuitState::HalfOpen => println!("Testing endpoint recovery"),}Exactly-Once Delivery
How It Works
- Each event generates a unique idempotency key
- Delivered events are logged with their idempotency key
- Retries check the delivery log before sending
- Duplicate deliveries are prevented within the delivery window
Configuration
exactly_once: delivery_window_secs: 86400 # 24 hoursVerifying Delivery
// Check if event was deliveredlet delivered = exactly_once.was_delivered(event_id, webhook_url).await?;
if delivered { println!("Event already delivered, skipping");}Security
HMAC Signature
All webhook requests include an HMAC-SHA256 signature:
POST /webhooks/db-changes HTTP/1.1Host: api.example.comContent-Type: application/jsonX-HeliosDB-Signature: sha256=f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8X-HeliosDB-Event-ID: evt_001X-HeliosDB-Timestamp: 2026-01-04T12:00:00ZVerifying Signatures (Receiver Side)
import hmacimport hashlib
def verify_signature(payload, signature, secret): expected = 'sha256=' + hmac.new( secret.encode(), payload, hashlib.sha256 ).hexdigest()
return hmac.compare_digest(signature, expected)TLS Configuration
security: tls: enabled: true cert_path: /etc/heliosdb/tls/cert.pem key_path: /etc/heliosdb/tls/key.pem ip_whitelist: - 10.0.0.0/8 - 172.16.0.0/12mTLS (Mutual TLS)
let client = create_mtls_client( "/path/to/ca.pem", "/path/to/client-cert.pem", "/path/to/client-key.pem",)?;Monitoring
Prometheus Metrics
# Event processing raterate(heliosdb_cdc_events_processed_total[5m])
# Webhook delivery latency (P99)histogram_quantile(0.99, rate(heliosdb_webhook_delivery_duration_seconds_bucket[5m]))
# Delivery success ratesum(rate(heliosdb_webhook_delivery_total{status="success"}[5m])) /sum(rate(heliosdb_webhook_delivery_total[5m]))
# Circuit breaker stateheliosdb_circuit_breaker_state == 2 # Open circuits
# Event queue backlogheliosdb_event_queue_size{queue_type="overflow"}Available Metrics
| Metric | Type | Description |
|---|---|---|
heliosdb_cdc_events_processed_total | Counter | Events processed |
heliosdb_cdc_events_filtered_total | Counter | Events filtered out |
heliosdb_webhook_delivery_duration_seconds | Histogram | Delivery latency |
heliosdb_webhook_delivery_total | Counter | Delivery attempts |
heliosdb_webhook_retry_total | Counter | Retry attempts |
heliosdb_circuit_breaker_state | Gauge | Circuit state |
heliosdb_event_queue_size | Gauge | Queue size |
Grafana Dashboard
Import the HeliosDB CDC dashboard for real-time monitoring of:
- Event throughput
- Delivery latency
- Success rates
- Circuit breaker status
- Queue backlogs
Best Practices
1. Use Appropriate Filters
# Filter to reduce volumefilters: - name: relevant_changes tables: - orders # Only tables you need operations: - INSERT - UPDATE # Skip DELETEs if not needed columns: orders: - status - total # Only columns you care about2. Configure Retries Appropriately
# Balance between reliability and latencydelivery: max_retries: 3 # Enough for transient failures initial_retry_delay_ms: 100 # Quick first retry max_retry_delay_ms: 5000 # Don't wait too long3. Monitor Circuit Breakers
# Alert on open circuitsALERT WebhookCircuitOpenIF heliosdb_circuit_breaker_state == 2FOR 5mLABELS { severity = "warning" }4. Implement Idempotent Receivers
def handle_webhook(event): event_id = event['event_id']
# Check if already processed if already_processed(event_id): return {"status": "duplicate", "event_id": event_id}
# Process event process_event(event)
# Mark as processed mark_processed(event_id)
return {"status": "success", "event_id": event_id}5. Set Appropriate Timeouts
delivery: timeout_secs: 30 # Match your endpoint's expected response timeAPI Reference
CdcConfig
pub struct CdcConfig { pub wal_path: String, pub checkpoint_interval: usize, pub batch_size: usize, pub poll_interval_ms: u64,}WebhookConfig
pub struct WebhookConfig { pub bind_address: String, pub worker_pool_size: usize, pub max_concurrent_requests: usize, pub timeout_secs: u64, pub retry: RetryConfig, pub circuit_breaker: CircuitBreakerConfig, pub exactly_once: ExactlyOnceConfig,}Event Types
pub enum OperationType { Insert, Update, Delete, Truncate, SchemaChange,}
pub struct CdcEvent { pub event_id: String, pub timestamp: DateTime<Utc>, pub sequence: u64, pub operation: OperationType, pub database: String, pub table: String, pub key: Vec<u8>, pub value: Option<Vec<u8>>, pub old_value: Option<Vec<u8>>, pub transaction_id: String,}Related Documentation
- README.md - Feature overview
- TROUBLESHOOTING.md - Common issues
- Design Document - Technical architecture
- SQL Interface - SQL commands