Streaming & Real-Time Analytics User Guide
Streaming & Real-Time Analytics User Guide
Table of Contents
- Overview
- Stream Processing Fundamentals
- Setting Up Streaming
- Continuous Queries
- Stream-Stream Joins
- Stream-Table Joins
- Aggregations & Complex Event Processing
- Exactly-Once Semantics
- Backpressure & Flow Control
- Integration Patterns
- Watermarks & Late Data
- State Management
- Performance Tuning
- Monitoring Streams
- Real-World Examples
- Troubleshooting
Overview
HeliosDB’s streaming and real-time analytics capabilities enable you to process infinite streams of data with sub-second latency. Unlike batch systems that process data in discrete chunks, HeliosDB streaming processes events as they arrive, maintaining state across distributed operators.
Key Concepts
Event Time vs. Processing Time: Event time is when an event occurred in the real world, while processing time is when the stream processor sees it. HeliosDB supports both semantics, with event-time being the default for accurate time-windowed analytics.
Continuous Queries: Unlike traditional SQL queries that terminate, continuous queries run indefinitely, emitting results as new data arrives. This is essential for real-time dashboards and monitoring.
Windowing: Data arrives as an unbounded stream, so we aggregate using windows—fixed time intervals (tumbling windows), overlapping periods (sliding windows), or dynamic gaps (session windows).
Joins: HeliosDB enables three join patterns: stream-stream (correlating events from two streams), stream-table (enriching stream events with reference data), and stream-sink (persisting results).
Exactly-Once Semantics: Critical for applications requiring zero data loss and no duplicates. HeliosDB uses two-phase commit protocols with deduplication to guarantee exactly-once processing.
Use Cases
- Real-time Event Analytics: Track user behavior, convert pageviews to revenue metrics in real-time
- IoT & Sensor Monitoring: Ingest millions of sensor readings, detect anomalies instantly
- Financial Transactions: Pattern detection for fraud, real-time risk scoring
- Network Traffic Analysis: Detect DDoS attacks, analyze flow patterns
- Application Metrics: Monitor API performance, request latencies, error rates
- Anomaly Detection: Stream machine learning models with continuous predictions
Stream Processing Fundamentals
Time Semantics
HeliosDB supports two time semantics configured at stream creation:
// Event-time semantics (recommended for time-windowed analytics)let config = StreamConfig { name: "events".to_string(), time_semantics: TimeSemantics::EventTime, allowed_lateness: Duration::from_secs(60), watermark_interval: Duration::from_secs(1),};let stream = Stream::new(config);
// Processing-time semantics (simpler, no late data handling)let config = StreamConfig { name: "metrics".to_string(), time_semantics: TimeSemantics::ProcessingTime, allowed_lateness: Duration::from_secs(0), watermark_interval: Duration::from_secs(1),};Event-time processing gives you accurate results regardless of delay, but requires watermarks to determine window completion. Processing-time processing is simpler but results depend on execution speed, not event semantics.
Stream Data Model
Data flows as rows with typed fields:
use heliosdb_streaming::{Row, Value};use std::collections::HashMap;use chrono::Utc;
// Create a row with typed fieldslet mut fields = HashMap::new();fields.insert("user_id".to_string(), Value::Integer(12345));fields.insert("event_type".to_string(), Value::String("purchase".to_string()));fields.insert("amount".to_string(), Value::Float(99.99));fields.insert("timestamp".to_string(), Value::Timestamp(Utc::now()));
let row = Row::new(fields) .with_event_time(Utc::now()); // Set event timestamp
// Access valuesif let Some(Value::Integer(user_id)) = row.get("user_id") { println!("User: {}", user_id);}Values support: Null, Boolean, Integer, Float, String, Bytes, Timestamp, Array, and Object types.
Window Types
Tumbling Windows (fixed, non-overlapping):
- Best for: Hourly/daily aggregations, periodic summaries
- Example: 1-minute windows for request count tracking
Sliding Windows (overlapping):
- Best for: Detecting trends, moving averages
- Example: 1-minute window slides every 10 seconds
Session Windows (dynamic gaps):
- Best for: User sessions, activity bursts
- Example: 30-minute gap between events marks session end
Count Windows (event-count based):
- Best for: Micro-batching, load balancing
- Example: Emit results after every 1000 events
Setting Up Streaming
Initialize Streaming Engine
use heliosdb_streaming::StreamingEngine;
#[tokio::main]async fn main() -> Result<()> { // Create the streaming engine let engine = StreamingEngine::new().await?;
// Create streams and queries // ... your streaming logic ...
// Shutdown gracefully engine.shutdown().await?; Ok(())}Create a Stream
use heliosdb_streaming::{Stream, StreamConfig, TimeSemantics};use std::time::Duration;
// Create an input streamlet config = StreamConfig { name: "api_requests".to_string(), time_semantics: TimeSemantics::EventTime, allowed_lateness: Duration::from_secs(10), watermark_interval: Duration::from_secs(1),};
let stream = Stream::new(config) .with_schema(vec![ "user_id".to_string(), "endpoint".to_string(), "response_time".to_string(), "status".to_string(), ]);Define Stream Schemas
Schemas are metadata describing stream fields. While HeliosDB is schema-flexible, defining schemas enables validation:
// Schema definition (for documentation and validation)let schema = vec![ "request_id".to_string(), // String "timestamp".to_string(), // Timestamp "method".to_string(), // String (GET, POST, etc) "path".to_string(), // String "status_code".to_string(), // Integer "response_ms".to_string(), // Integer "user_id".to_string(), // Integer];
let stream = Stream::new(config).with_schema(schema);Continuous Queries
Basic Continuous Query
A continuous query runs indefinitely, emitting results as new data arrives:
use heliosdb_streaming::{ContinuousQuery, Stream, Row, Value};use std::collections::HashMap;
// Create output sinklet sink = Arc::new(CallbackSink::new(|rows| { for row in rows { println!("Result: {:?}", row.fields()); }}));
// Create continuous querylet query = ContinuousQuery::new( "my_query".to_string(), stream.clone(),).with_sink(sink);
// Executelet handle = engine.create_continuous_query(query).await?;
// Query is now running continuously...
// Stop when doneengine.stop_continuous_query("my_query").await?;Query Sinks
Continuous queries emit results to sinks:
Table Sink - Writes to a database table:
use heliosdb_streaming::TableSink;
let sink = Arc::new(TableSink::new("realtime_metrics".to_string()));Callback Sink - Calls a Rust function:
use heliosdb_streaming::CallbackSink;
let sink = Arc::new(CallbackSink::new(|rows| { for row in rows { // Handle result rows (update dashboard, send alert, etc) println!("Alert: {:?}", row); }}));Memory Sink - Stores in memory (testing):
use heliosdb_streaming::MemorySink;
let sink = Arc::new(MemorySink::new());let results = sink.get_results();Stream-Stream Joins
Join two streams to correlate events. Requires a time window to match events:
use heliosdb_streaming::StreamStreamJoin;use std::time::Duration;
// Create second streamlet clicks = Stream::new(StreamConfig { name: "clicks".to_string(), ..Default::default()});
let impressions = Stream::new(StreamConfig { name: "impressions".to_string(), ..Default::default()});
// Join with 5-second windowlet joiner = StreamStreamJoin::new(Duration::from_secs(5));let joined = joiner.join(impressions, clicks).await?;
// Joined stream emits events where impression and click occur within 5 secondsStream-Stream Join Patterns
Inner Join - Only matching pairs:
// Both streams have event within window - emits combined row// Example: Match ad impression with subsequent clickLeft Outer Join - All left events, matching right when available:
// Impression with click = emits combined// Impression without click = emits impression row with null click fieldsFull Outer Join - All events from both streams:
// Click without impression = emits click row with null impression fieldsJoin Key Management
Specify how to match events (usually a user_id or correlation_id):
// Join on user_id fieldlet joiner = StreamStreamJoin::new(Duration::from_secs(10)) .with_left_key("user_id".to_string()) .with_right_key("user_id".to_string());Without explicit keys, all events in the window are considered for joining.
Stream-Table Joins
Enrich stream events with reference data from a static table:
use heliosdb_streaming::StreamTableJoin;
// Load reference datalet products = vec![ Row::new(maplit::hashmap!{ "product_id".to_string() => Value::Integer(1), "name".to_string() => Value::String("Widget".to_string()), "category".to_string() => Value::String("Hardware".to_string()), "price".to_string() => Value::Float(19.99), }), // ... more products];
// Create joinlet joiner = StreamTableJoin::new( "products".to_string(), // Table identifier "product_id".to_string() // Join key).with_join_type(JoinType::LeftOuter);
// Load data into cachejoiner.load_table(products)?;
// Join stream with tablelet enriched = joiner.join(purchase_stream).await?;Join Types for Stream-Table
| Type | Behavior | Use Case |
|---|---|---|
| Inner | Only emit if product found | Filtered pipeline |
| Left Outer | Emit all purchases, null product fields if not found | Catch missing products |
| Right Outer | Not typical for stream-table | N/A |
Table Update Patterns
For frequently-changing reference data:
// Periodic reloadlet joiner = Arc::new(joiner);tokio::spawn({ let joiner = joiner.clone(); async move { loop { tokio::time::sleep(Duration::from_secs(3600)).await;
// Reload products from database let products = fetch_products_from_db().await; joiner.load_table(products).ok(); } }});Aggregations & Complex Event Processing
Windowed Aggregations
Aggregate stream data within windows:
use heliosdb_streaming::{WindowedAggregation, WindowType, AggregateFunction};use std::time::Duration;
let aggregation = WindowedAggregation::new( WindowType::Tumbling { size: Duration::from_secs(60), }, vec![ AggregateFunction::Count, AggregateFunction::Sum { field: "amount".to_string(), }, AggregateFunction::Avg { field: "response_time".to_string(), }, AggregateFunction::Min { field: "response_time".to_string(), }, AggregateFunction::Max { field: "response_time".to_string(), }, ], stream.clone(),).with_group_by(vec!["status".to_string(), "endpoint".to_string()]);
let results = aggregation.execute().await?;Supported Aggregations
- Count: Number of events
- Sum: Total of numeric field
- Avg: Average of numeric field
- Min: Minimum value
- Max: Maximum value
- Distinct: Unique values
- List: Collect all values
- ApproxDistinct: HyperLogLog for cardinality
Complex Event Processing (CEP)
Detect patterns in event sequences using MATCH_RECOGNIZE:
use heliosdb_streaming::{Pattern, PatternVariable, PatternCondition, PatternSequence, Measure};
// Define pattern: Detect fraud (3+ failed logins in 5 minutes)let pattern = Pattern { name: "fraud_pattern".to_string(), variables: vec![ PatternVariable { name: "failed".to_string(), condition: PatternCondition::Equals { field: "result".to_string(), value: Value::String("failed".to_string()), }, }, ], sequence: PatternSequence::Sequence { patterns: vec![ PatternSequence::Single { var: "failed".to_string() }, PatternSequence::Single { var: "failed".to_string() }, PatternSequence::Single { var: "failed".to_string() }, ], }, within: Some(Duration::from_secs(300)), measures: vec![ Measure { name: "login_attempts".to_string(), expression: MeasureExpression::Count, }, ],};
let matcher = PatternMatcher::new(pattern);let matches = matcher.match_stream(login_stream).await?;CEP Pattern Syntax
Patterns follow SQL’s MATCH_RECOGNIZE:
PATTERN (A B+ C) - A followed by one or more B, then CPATTERN (A{3}) - Exactly 3 consecutive APATTERN (A? B) - Optional A, then BPATTERN (A*) - Zero or more AVariables bind events matching conditions, measures compute aggregate values across matched events.
Exactly-Once Semantics
Critical guarantee: each event is processed exactly once, no data loss, no duplicates.
Enable Exactly-Once Processing
use heliosdb_streaming::{TransactionCoordinator, IdempotentProducer, MessageDeduplicator};
// Create coordinator for transactionslet coordinator = TransactionCoordinator::new(Duration::from_secs(30))?;
// Create deduplicator for idempotent writeslet dedup = MessageDeduplicator::new(10_000); // 10K message capacity
// When producing results, use deduplicationlet msg_id = uuid::Uuid::new_v4();if !dedup.is_duplicate(&msg_id)? { sink.emit(result_rows).await?; dedup.mark_processed(&msg_id)?;}Two-Phase Commit Protocol
Ensures atomicity across multiple systems:
// Phase 1: Prepare (all participants acknowledge)let txn = coordinator.begin_transaction().await?;txn.add_participant("kafka_producer".to_string());txn.add_participant("postgres_sink".to_string());
// Process eventsfor event in &events { process_event(event)?;}
// Phase 2: Commitcoordinator.commit(txn.id).await?;// If any participant fails, automatic rollbackOffset Management
Track positions in source streams to enable recovery:
use heliosdb_streaming::{OffsetManager, Checkpoint};
let offset_mgr = OffsetManager::new()?;
// Save offset after processinglet checkpoint = Checkpoint { stream_id: "kafka-topic".to_string(), partition: 0, offset: 12345, timestamp: Utc::now(),};
offset_mgr.save_checkpoint(&checkpoint).await?;
// On recovery, resume from saved offsetlet last = offset_mgr.get_checkpoint("kafka-topic", 0).await?;kafka_source.seek_to_offset(last.offset).await?;Failure Recovery
Recovery guarantees:
- Duplicate Detection: Message IDs prevent re-processing after crash
- Idempotent Operations: Writing same message twice produces same result
- Offset Tracking: Know exactly where to resume
// Process with idempotencylet msg_id = event.id.clone();
// Checkpoint before processingoffset_mgr.save_offset(&msg_id).await?;
// Process with deduplicationif !dedup.seen(&msg_id)? { write_to_sink(&event).await?;}Backpressure & Flow Control
Backpressure Strategies
Handle cases where downstream can’t keep up:
use heliosdb_streaming::{BackpressureController, BackpressureStrategy};
let controller = BackpressureController::new( BackpressureStrategy::Block, // Wait for space 10_000, // Max buffer size);
// Or drop old datalet controller = BackpressureController::new( BackpressureStrategy::DropOldest, 10_000,);Strategies:
- Block: Pause source until buffer drains (no data loss, latency increases)
- DropOldest: Remove oldest events when full (data loss, latency stable)
- DropNewest: Drop incoming events when full (prevent memory explosion)
- Signal: Track backpressure but don’t drop (for monitoring)
Flow Control in Practice
// Acquire space before accepting eventlet decision = controller.acquire().await;
match decision { BackpressureDecision::Accept => { // Process event process(event).await?; } BackpressureDecision::Reject => { // Drop or retry eprintln!("Dropping event due to backpressure"); }}
// Release space when donecontroller.release();Adaptive Backpressure
Dynamically adjust based on downstream lag:
use heliosdb_streaming::AdaptiveBackpressureController;
let adaptive = AdaptiveBackpressureController::new( 10_000, // Initial buffer 20_000, // Max buffer);
// System automatically adjusts buffer based on laglet decision = adaptive.acquire().await;Integration Patterns
Kafka Integration
Consume from Kafka topics:
use heliosdb_streaming::{KafkaSourceConnector, KafkaConfig};
let kafka_config = KafkaConfig { bootstrap_servers: vec!["localhost:9092".to_string()], topic: "user_events".to_string(), group_id: Some("heliosdb-consumer".to_string()), client_id: Some("processor-1".to_string()), auto_offset_reset: "latest".to_string(), enable_auto_commit: false, // Manual commit for exactly-once max_poll_records: 500,};
let source = KafkaSourceConnector::new(kafka_config);let stream = source.start().await?;
// Process stream// ... filtering, joining, aggregating ...
// Commit offset after successful processingoffset_mgr.commit_offset(&stream_id, &offset).await?;Kafka Sink
Write results to Kafka:
use heliosdb_streaming::KafkaSinkConnector;
let sink_config = KafkaConfig { bootstrap_servers: vec!["localhost:9092".to_string()], topic: "alerts".to_string(), ..Default::default()};
let sink = KafkaSinkConnector::new(sink_config);
// Write resultssink.write(result_rows).await?;sink.flush().await?;Database Integration
Write results to PostgreSQL/MySQL:
use heliosdb_streaming::{DatabaseSinkConnector, DatabaseType, WriteMode};
let db_sink = DatabaseSinkConnector::new( DatabaseSinkConfig { database_type: DatabaseType::PostgreSQL, host: "localhost".to_string(), port: 5432, database: "analytics".to_string(), table: "real_time_metrics".to_string(), write_mode: WriteMode::Upsert, });
db_sink.write(result_rows).await?;REST API Integration
Expose streaming metrics via HTTP:
use axum::{Router, routing::get, Json};use std::sync::Arc;
let metrics = Arc::new(sink.get_results());
let app = Router::new() .route("/metrics", get({ let metrics = metrics.clone(); move || { let m = metrics.clone(); async move { Json(m.clone()) } } }));
axum::Server::bind(&"0.0.0.0:3000".parse()?) .serve(app.into_make_service()) .await?;Watermarks & Late Data
Watermark Strategy
Watermarks signal that no earlier events will arrive:
use heliosdb_streaming::{WatermarkGenerator, WatermarkStrategy};use std::time::Duration;
// Bounded out-of-order: assume max 10-second delaylet generator = WatermarkGenerator::new( WatermarkStrategy::BoundedOutOfOrderness { max_out_of_order: Duration::from_secs(10), }, Duration::from_secs(60), // allowed_lateness);
// Update watermark as events arrivefor event in events { if let Some(new_watermark) = generator.update(event.event_time) { println!("Watermark advanced to: {}", new_watermark); // This triggers window completion }}Handling Late Data
Data arriving after window close:
use heliosdb_streaming::{LateDataHandler, LateDataDecision};
let handler = LateDataHandler::new(Duration::from_secs(60));
// On late arrivalmatch handler.handle_late_event(&event)? { LateDataDecision::IncludeInWindow => { // Add to window, emit correction update_window(&event)?; sink.emit(correction_rows).await?; } LateDataDecision::Discard => { // Event is too late, drop it }}Allowed Lateness Configuration
let config = StreamConfig { name: "events".to_string(), time_semantics: TimeSemantics::EventTime, allowed_lateness: Duration::from_secs(60), // Accept events up to 1 min late watermark_interval: Duration::from_secs(1),};Tradeoff: Larger allowed_lateness keeps windows open longer (more corrections possible, higher state cost).
State Management
Checkpoint Basics
Persist operator state for failure recovery:
use heliosdb_streaming::{CheckpointCoordinator, StateSnapshot};
let coordinator = CheckpointCoordinator::new()?;
// Trigger checkpoint periodicallytokio::spawn({ let coordinator = coordinator.clone(); async move { loop { tokio::time::sleep(Duration::from_secs(60)).await;
let snapshot = coordinator.trigger_checkpoint().await?; println!("Checkpoint {}: {:?}", snapshot.id, snapshot.status); } }});State Backends
Store operator state locally or remotely:
In-Memory (fast, no persistence):
use heliosdb_streaming::InMemoryStateBackend;
let backend = InMemoryStateBackend::new();File-based (persistent, local):
use heliosdb_streaming::FileStateBackend;
let backend = FileStateBackend::new("/data/checkpoints".to_string())?;Encrypted (secure storage):
use heliosdb_streaming::EncryptedStateBackend;
let backend = EncryptedStateBackend::new( "/data/encrypted_state".to_string(), encryption_key.clone(),)?;Operator State
Manage mutable state within operators:
use heliosdb_streaming::OperatorState;
struct SessionAggregator { state: OperatorState,}
impl SessionAggregator { fn process_event(&mut self, event: &Row) -> Result<()> { // Get value state let mut count = self.state.get_value("count") .and_then(|v| v.as_i64()) .unwrap_or(0);
count += 1;
// Update state self.state.set_value("count".to_string(), Value::Integer(count))?;
Ok(()) }}Performance Tuning
Parallelism
Increase processing throughput with parallel tasks:
// Process multiple streams in parallellet handles: Vec<_> = (0..4).map(|partition| { let stream = streams[partition].clone(); tokio::spawn(async move { while let Some(event) = stream.recv().await { process_event(event).await.ok(); } })}).collect();
futures::future::join_all(handles).await;Batching
Reduce overhead by processing events in batches:
use heliosdb_streaming::DynamicBatchSizer;
let batcher = DynamicBatchSizer::new( 100, // Min batch 1_000, // Max batch);
let mut batch = Vec::new();while let Some(event) = stream.recv().await { batch.push(event);
if batch.len() >= batcher.ideal_batch_size() { sink.write(batch.clone()).await?; batch.clear(); }}State Size Management
Limit state growth in session windows:
// Session window with 30-minute timeoutlet window = SessionWindow::new(Duration::from_secs(1800)) .with_max_sessions(10_000); // Limit concurrent sessionsBloom Filter Optimization
Speed up stream-stream joins with Bloom filters:
use bloomfilter::Bloom;
let mut bloom = Bloom::new(100_000, 0.01); // 100K elements, 1% FPR
// Add keys from first streamfor event in &stream1 { if let Some(Value::Integer(id)) = event.get("id") { bloom.set(&id.to_string()); }}
// Only look up in second stream if Bloom says "maybe present"for event in &stream2 { if let Some(Value::Integer(id)) = event.get("id") { if bloom.check(&id.to_string()) { // Probable match, do actual lookup } }}Monitoring Streams
Metrics Collection
Track stream health:
use heliosdb_streaming::BackpressureMetrics;
let metrics = controller.metrics();println!("Total records: {}", metrics.total_records);println!("Dropped: {}", metrics.dropped_records);println!("Backpressure events: {}", metrics.backpressure_events);println!("Avg buffer: {:.2}", metrics.average_buffer_size);Watermark Monitoring
let wm_gen = WatermarkGenerator::new(strategy, Duration::from_secs(60));let current = wm_gen.current_watermark();println!("Current watermark: {}", current);Consumer Lag Monitoring
Track how far behind source you are:
use heliosdb_streaming::ConsumerLagMonitor;
let lag_monitor = ConsumerLagMonitor::new();
// Periodically check lagtokio::spawn({ let monitor = lag_monitor.clone(); async move { loop { tokio::time::sleep(Duration::from_secs(10)).await;
let lag = monitor.estimate_lag().await; if lag > Duration::from_secs(30) { eprintln!("WARNING: Consumer lag {} seconds", lag.as_secs()); } } }});Query Status
// Check if continuous query is runningif let Some(handle) = engine.get_continuous_query("my_query") { let metrics = handle.metrics(); println!("Processed: {} rows", metrics.total_rows); println!("Errors: {}", metrics.error_count);}Real-World Examples
Example 1: Real-Time E-Commerce Analytics
Track conversion funnel in real-time—detect drop-offs immediately:
#[tokio::main]async fn main() -> Result<()> { let engine = StreamingEngine::new().await?;
// Create streams for each funnel step let views = Stream::new(StreamConfig { name: "product_views".to_string(), ..Default::default() });
let carts = Stream::new(StreamConfig { name: "add_to_cart".to_string(), ..Default::default() });
let purchases = Stream::new(StreamConfig { name: "purchases".to_string(), ..Default::default() });
// Join views → cart adds (30-min window) let view_to_cart = engine.stream_stream_join( views.clone(), carts.clone(), Duration::from_secs(1800), ).await?;
// Join cart → purchase (24-hour window) let view_to_purchase = engine.stream_stream_join( view_to_cart, purchases.clone(), Duration::from_secs(86400), ).await?;
// Aggregate by product per hour let hourly_funnel = WindowedAggregation::new( WindowType::Tumbling { size: Duration::from_secs(3600), }, vec![ AggregateFunction::Count, AggregateFunction::Sum { field: "purchase_amount".to_string(), }, ], view_to_purchase, ) .with_group_by(vec!["product_id".to_string()]) .execute() .await?;
// Create query to emit results let sink = Arc::new(CallbackSink::new(|rows| { for row in rows { let product_id = row.get("product_id"); let count = row.get("count"); let revenue = row.get("sum_purchase_amount"); println!("Funnel: product={:?}, count={:?}, revenue={:?}", product_id, count, revenue); } }));
let query = ContinuousQuery::new( "hourly_funnel".to_string(), hourly_funnel, ).with_sink(sink);
engine.create_continuous_query(query).await?;
// Run forever std::future::pending().await}Example 2: IoT Sensor Anomaly Detection
Monitor temperature/humidity sensors, detect anomalies:
#[tokio::main]async fn main() -> Result<()> { let engine = StreamingEngine::new().await?;
let sensor_stream = Stream::new(StreamConfig { name: "sensor_readings".to_string(), time_semantics: TimeSemantics::EventTime, ..Default::default() });
// Detect: temperature > 100°F for 3+ consecutive readings let pattern = Pattern { name: "overheat".to_string(), variables: vec![ PatternVariable { name: "high_temp".to_string(), condition: PatternCondition::GreaterThan { field: "temperature_f".to_string(), value: 100.0, }, }, ], sequence: PatternSequence::Sequence { patterns: vec![ PatternSequence::Single { var: "high_temp".to_string() }, PatternSequence::Single { var: "high_temp".to_string() }, PatternSequence::Single { var: "high_temp".to_string() }, ], }, within: Some(Duration::from_secs(300)), // 5-minute window measures: vec![ Measure { name: "max_temp".to_string(), expression: MeasureExpression::Max { field: "temperature_f".to_string(), }, }, ], };
let matcher = PatternMatcher::new(pattern); let anomalies = matcher.match_stream(sensor_stream).await?;
// Enrich with location data let locations = vec![ // Load sensor locations from DB ];
let enriched = StreamTableJoin::new( "sensor_locations".to_string(), "sensor_id".to_string(), ) .with_join_type(JoinType::Inner); enriched.load_table(locations)?;
let enriched_anomalies = enriched.join(anomalies).await?;
// Alert via callback let sink = Arc::new(CallbackSink::new(|rows| { for row in rows { if let (Some(Value::String(location)), Some(Value::Float(temp))) = (row.get("location"), row.get("max_temp")) { eprintln!("ALERT: Overheat at {} - {:.1}°F", location, temp); // Send email, SMS, etc } } }));
engine.create_continuous_query( ContinuousQuery::new("overheat_detector".to_string(), enriched_anomalies) .with_sink(sink) ).await?;
std::future::pending().await}Example 3: Financial Transaction Fraud Detection
Detect payment fraud with sliding window heuristics:
#[tokio::main]async fn main() -> Result<()> { let engine = StreamingEngine::new().await?;
let transactions = Stream::new(StreamConfig { name: "transactions".to_string(), time_semantics: TimeSemantics::EventTime, allowed_lateness: Duration::from_secs(30), watermark_interval: Duration::from_secs(1), });
// Detect: 3+ transactions from different locations in 10 minutes let pattern = Pattern { name: "velocity_fraud".to_string(), variables: vec![ PatternVariable { name: "txn".to_string(), condition: PatternCondition::Always, }, ], sequence: PatternSequence::Sequence { patterns: vec![ PatternSequence::Single { var: "txn".to_string() }, PatternSequence::Single { var: "txn".to_string() }, PatternSequence::Single { var: "txn".to_string() }, ], }, within: Some(Duration::from_secs(600)), measures: vec![ Measure { name: "location_count".to_string(), expression: MeasureExpression::Distinct { field: "location".to_string(), }, }, Measure { name: "total_amount".to_string(), expression: MeasureExpression::Sum { field: "amount".to_string(), }, }, ], };
let matcher = PatternMatcher::new(pattern); let fraud_signals = matcher.match_stream(transactions.clone()).await?;
// Enrich with customer info let customer_profiles = vec![ // Load from DB ];
let enriched = StreamTableJoin::new( "customers".to_string(), "customer_id".to_string(), ); enriched.load_table(customer_profiles)?;
let enriched_fraud = enriched.join(fraud_signals).await?;
// Score and alert let sink = Arc::new(CallbackSink::new(|rows| { for row in rows { if let (Some(Value::Integer(customer_id)), Some(Value::Float(amount))) = (row.get("customer_id"), row.get("total_amount")) { let risk_score = calculate_risk_score(&row);
if risk_score > 0.8 { // Block transaction println!("BLOCK: Customer {} suspicious ${:.2}", customer_id, amount); } else if risk_score > 0.5 { // Flag for review println!("FLAG: Customer {} suspicious ${:.2}", customer_id, amount); } } } }));
engine.create_continuous_query( ContinuousQuery::new("fraud_detector".to_string(), enriched_fraud) .with_sink(sink) ).await?;
std::future::pending().await}
fn calculate_risk_score(row: &Row) -> f64 { // Implement scoring logic 0.7}Troubleshooting
High Latency
Symptom: Results arrive slowly, watermarks lag behind real-time.
Diagnosis:
let lag_monitor = ConsumerLagMonitor::new();let lag = lag_monitor.estimate_lag().await;if lag > Duration::from_secs(30) { println!("Consumer lag: {:?}", lag);}Solutions:
- Increase parallelism: Process partitions in parallel
- Reduce batch size: Lower latency per batch
- Check backpressure: Monitor
BackpressureMetrics - Profile operators: Identify slow joins/aggregations
Data Loss
Symptom: Output row count doesn’t match input.
Diagnosis:
- Check backpressure strategy—
DropOldest/DropNewestdrop data - Verify sink is writing successfully:
sink.flush().await? - Check offset commits: May have crashed before committing
Solutions:
// Use exactly-once with offset trackinglet coordinator = TransactionCoordinator::new(Duration::from_secs(30))?;
// Commit only after successful writeoffset_mgr.save_checkpoint(&checkpoint).await?;sink.write(rows).await?;coordinator.commit(txn.id).await?; // AtomicDuplicate Results
Symptom: Same row appears multiple times.
Solutions:
- Enable message deduplication:
MessageDeduplicator - Implement idempotent sinks: Writing same row is safe
- Check exactly-once configuration
Memory Leak in Session Windows
Symptom: Memory grows unbounded with session window.
Solutions:
let window = SessionWindow::new(Duration::from_secs(1800)) .with_max_sessions(10_000) // Limit concurrent .with_cleanup_interval(Duration::from_secs(60)); // Periodic cleanupWatermark Stuck
Symptom: Windows don’t complete, data piles up in memory.
Diagnosis:
let current_wm = wm_gen.current_watermark();let max_ts = wm_gen.max_timestamp();println!("Watermark: {}, Max Event: {}", current_wm, max_ts);Solutions:
- Check for idle sources (no events for long time)
- Lower
allowed_lateness—enables faster window closure - Monitor
watermark_intervalconfiguration
Join Explosion (Many Cartesian Products)
Symptom: Output rows >> input rows, memory bloats.
Cause: Stream-stream join matches all events in window.
Solutions:
// Add explicit join keyslet joiner = StreamStreamJoin::new(Duration::from_secs(10)) .with_left_key("user_id".to_string()) .with_right_key("user_id".to_string());
// Narrow windowStreamStreamJoin::new(Duration::from_secs(1)) // Shorter windowBackpressure Causing Delays
Symptom: Consistent 1-2 second latency even with fast sink.
Solutions:
// Increase bufferlet controller = BackpressureController::new( BackpressureStrategy::Block, 50_000, // Larger buffer);
// Or use adaptivelet adaptive = AdaptiveBackpressureController::new(10_000, 100_000);Related Documentation
- Continuous Queries Specification
- Stream Processing Performance Tuning
- Kafka Integration Guide
- Time-Windowed Analytics Tutorial
- Exactly-Once Semantics Implementation
- Backpressure & Flow Control Design
Summary
HeliosDB’s streaming analytics capabilities enable:
- Real-time processing with sub-second latency
- Event-time semantics for accurate time-windowed analytics
- Multiple join types (stream-stream, stream-table)
- Complex event processing for pattern detection
- Exactly-once guarantees with transactional semantics
- Flow control with adaptive backpressure
- Flexible integration (Kafka, databases, REST APIs)
- Production-grade monitoring and observability
Start with simple tumbling windows and aggregations, gradually add joins and CEP patterns as your needs evolve. Monitor closely during the first deployments to tune window sizes, batch parameters, and parallelism for your workload.