HeliosDB Streaming - Usage Examples
HeliosDB Streaming - Usage Examples
Practical examples for common streaming use cases
Table of Contents
- Real-Time Analytics Dashboard
- User Session Analysis
- Fraud Detection
- IoT Sensor Monitoring
- E-Commerce Conversion Funnel
- Financial Trading Alerts
- Log Aggregation
- Anomaly Detection
Real-Time Analytics Dashboard
Use Case: Real-time metrics for application monitoring.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { // Create stream from logs let stream = Stream::new(StreamConfig { name: "app_metrics".to_string(), buffer_size: 10000, ..Default::default() });
// Apply 1-minute tumbling window let window = TumblingWindow::new(Duration::from_secs(60)); let windowed = window.apply(stream).await?;
// Process windows tokio::spawn(async move { while let Some(event) = windowed.recv().await { if let StreamEvent::Data(window_result) = event { let rows = &window_result.rows;
// Calculate metrics let total_requests: usize = rows.len(); let errors = rows.iter() .filter(|r| r.get("status").and_then(|v| v.as_integer()) == Some(500)) .count();
let avg_latency: f64 = rows.iter() .filter_map(|r| r.get("latency_ms").and_then(|v| v.as_float())) .sum::<f64>() / total_requests as f64;
println!("Window [{} - {}]:", window_result.window.start, window_result.window.end ); println!(" Total Requests: {}", total_requests); println!(" Errors: {} ({:.2}%)", errors, (errors as f64 / total_requests as f64) * 100.0); println!(" Avg Latency: {:.2}ms", avg_latency); } } });
Ok(())}User Session Analysis
Use Case: Track user sessions with session windows.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { // Create stream let stream = Stream::new(StreamConfig::default());
// Session window: 30-minute inactivity gap let window = SessionWindow::new(Duration::from_secs(1800)); let sessions = window.apply(stream).await?;
// Analyze sessions tokio::spawn(async move { while let Some(event) = sessions.recv().await { if let StreamEvent::Data(session) = event { let user_id = session.rows[0] .get("user_id") .and_then(|v| v.as_string()) .unwrap();
let page_views = session.rows.len(); let duration = (session.window.end - session.window.start) .num_seconds();
let pages_visited: std::collections::HashSet<_> = session.rows .iter() .filter_map(|r| r.get("page").and_then(|v| v.as_string())) .collect();
println!("User {} Session:", user_id); println!(" Duration: {} minutes", duration / 60); println!(" Page Views: {}", page_views); println!(" Unique Pages: {}", pages_visited.len()); } } });
Ok(())}Fraud Detection
Use Case: Detect suspicious transaction patterns.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { let stream = Stream::new(StreamConfig::default());
// Pattern: Multiple high-value transactions in short time let pattern = PatternBuilder::new("fraud_pattern".to_string()) .define_variable( "HighValue".to_string(), PatternCondition::GreaterThan { field: "amount".to_string(), value: 1000.0, }, ) .sequence(PatternSequence::Exactly { var: "HighValue".to_string(), count: 3, }) .within(Duration::from_secs(300)) // Within 5 minutes .measure( "total_amount".to_string(), MeasureExpression::Aggregate { var: "HighValue".to_string(), field: "amount".to_string(), func: "SUM".to_string(), }, ) .measure( "transaction_count".to_string(), MeasureExpression::Count { var: "HighValue".to_string(), }, ) .build()?;
let matcher = PatternMatcher::new(pattern); let alerts = matcher.apply(stream).await?;
// Handle fraud alerts tokio::spawn(async move { while let Some(event) = alerts.recv().await { if let StreamEvent::Data(alert) = event { let user = alert.get("user_id").unwrap(); let total = alert.get("total_amount").unwrap(); let count = alert.get("transaction_count").unwrap();
println!("⚠ FRAUD ALERT:"); println!(" User: {}", user); println!(" {} transactions totaling {}", count, total); println!(" Action: Account flagged for review");
// Trigger alert system // send_alert_to_ops(user, total, count); } } });
Ok(())}IoT Sensor Monitoring
Use Case: Monitor IoT sensors for anomalies.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { let stream = Stream::new(StreamConfig::default());
// Sliding window: 5-minute window, 1-minute slide let window = SlidingWindow::new( Duration::from_secs(300), Duration::from_secs(60) ); let windowed = window.apply(stream).await?;
// Detect anomalies tokio::spawn(async move { while let Some(event) = windowed.recv().await { if let StreamEvent::Data(window_result) = event { // Calculate statistics let temperatures: Vec<f64> = window_result.rows .iter() .filter_map(|r| r.get("temperature").and_then(|v| v.as_float())) .collect();
if temperatures.is_empty() { continue; }
let mean = temperatures.iter().sum::<f64>() / temperatures.len() as f64; let variance = temperatures.iter() .map(|t| (t - mean).powi(2)) .sum::<f64>() / temperatures.len() as f64; let std_dev = variance.sqrt();
// Check for anomalies (3-sigma rule) for temp in &temperatures { if (temp - mean).abs() > 3.0 * std_dev { println!("🚨 ANOMALY DETECTED:"); println!(" Temperature: {:.2}°C", temp); println!(" Mean: {:.2}°C, Std Dev: {:.2}", mean, std_dev); println!(" Deviation: {:.2} sigma", (temp - mean).abs() / std_dev); } } } } });
Ok(())}E-Commerce Conversion Funnel
Use Case: Track user journey from view to purchase.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { let stream = Stream::new(StreamConfig::default());
// Pattern: View -> Add to Cart -> Purchase let pattern = PatternBuilder::new("conversion_funnel".to_string()) .define_variable( "View".to_string(), PatternCondition::Equals { field: "event_type".to_string(), value: Value::String("product_view".to_string()), }, ) .define_variable( "AddToCart".to_string(), PatternCondition::Equals { field: "event_type".to_string(), value: Value::String("add_to_cart".to_string()), }, ) .define_variable( "Purchase".to_string(), PatternCondition::Equals { field: "event_type".to_string(), value: Value::String("purchase".to_string()), }, ) .sequence(PatternSequence::Sequence { patterns: vec![ PatternSequence::Single { var: "View".to_string() }, PatternSequence::Single { var: "AddToCart".to_string() }, PatternSequence::Single { var: "Purchase".to_string() }, ], }) .within(Duration::from_secs(7200)) // 2-hour window .measure( "product_id".to_string(), MeasureExpression::First { var: "View".to_string(), field: "product_id".to_string(), }, ) .measure( "revenue".to_string(), MeasureExpression::Last { var: "Purchase".to_string(), field: "amount".to_string(), }, ) .build()?;
let matcher = PatternMatcher::new(pattern); let conversions = matcher.apply(stream).await?;
// Track conversions tokio::spawn(async move { let mut total_revenue = 0.0; let mut conversion_count = 0;
while let Some(event) = conversions.recv().await { if let StreamEvent::Data(conversion) = event { conversion_count += 1; let revenue = conversion.get("revenue") .and_then(|v| v.as_float()) .unwrap_or(0.0); total_revenue += revenue;
println!(" Conversion #{}", conversion_count); println!(" Product: {}", conversion.get("product_id").unwrap()); println!(" Revenue: ${:.2}", revenue); println!(" Total Revenue: ${:.2}", total_revenue); } } });
Ok(())}Financial Trading Alerts
Use Case: Alert on rapid price movements.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { let stream = Stream::new(StreamConfig::default());
// Pattern: Price spike (3+ consecutive increases) let pattern = PatternBuilder::new("price_spike".to_string()) .define_variable( "Increase".to_string(), PatternCondition::GreaterThan { field: "price_change_pct".to_string(), value: 2.0, // > 2% increase }, ) .sequence(PatternSequence::Exactly { var: "Increase".to_string(), count: 3, }) .within(Duration::from_secs(60)) // Within 1 minute .measure( "start_price".to_string(), MeasureExpression::First { var: "Increase".to_string(), field: "price".to_string(), }, ) .measure( "end_price".to_string(), MeasureExpression::Last { var: "Increase".to_string(), field: "price".to_string(), }, ) .build()?;
let matcher = PatternMatcher::new(pattern); let spikes = matcher.apply(stream).await?;
// Handle alerts tokio::spawn(async move { while let Some(event) = spikes.recv().await { if let StreamEvent::Data(spike) = event { let symbol = spike.get("symbol").unwrap(); let start = spike.get("start_price").and_then(|v| v.as_float()).unwrap(); let end = spike.get("end_price").and_then(|v| v.as_float()).unwrap(); let change_pct = ((end - start) / start) * 100.0;
println!(" PRICE SPIKE DETECTED:"); println!(" Symbol: {}", symbol); println!(" Price: ${:.2} -> ${:.2}", start, end); println!(" Change: {:.2}%", change_pct); println!(" Time: < 1 minute"); } } });
Ok(())}Log Aggregation
Use Case: Aggregate logs with SQL streaming.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { // Create SQL table environment let config = TableConfig { idle_state_retention: Duration::from_secs(3600), max_parallelism: 64, event_time_characteristic: TimeCharacteristic::EventTime, };
let env = StreamTableEnvironment::new(config);
// Create stream let stream = Stream::new(StreamConfig::default());
// Define schema let schema = TableSchema { columns: vec![ ColumnDef { name: "timestamp".to_string(), data_type: DataType::Timestamp, nullable: false, }, ColumnDef { name: "level".to_string(), data_type: DataType::String, nullable: false, }, ColumnDef { name: "service".to_string(), data_type: DataType::String, nullable: false, }, ColumnDef { name: "message".to_string(), data_type: DataType::String, nullable: false, }, ], primary_key: None, watermark_strategy: Some(WatermarkStrategy::BoundedOutOfOrderness { max_out_of_orderness: Duration::from_secs(5), }), };
// Register stream as table env.register_stream("logs", stream, schema);
// Query for errors let errors = env.sql_query( "SELECT service, COUNT(*) as error_count \ FROM logs \ WHERE level = 'ERROR' \ GROUP BY service" ).await?;
// Process results tokio::spawn(async move { while let Some(event) = errors.recv().await { if let StreamEvent::Data(row) = event { println!("Service: {}, Errors: {}", row.get("service").unwrap(), row.get("error_count").unwrap() ); } } });
Ok(())}Anomaly Detection
Use Case: Detect anomalies with stream-stream join and pattern matching.
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { // Two streams: current metrics and historical baseline let current_stream = Stream::new(StreamConfig::default()); let baseline_stream = Stream::new(StreamConfig::default());
// Join current with baseline let join = StreamStreamJoin::new( Duration::from_secs(60), JoinType::Inner, "metric_id", "metric_id" );
let joined = join.apply(current_stream, baseline_stream).await?;
// Detect anomalies where current > 3x baseline let pattern = PatternBuilder::new("anomaly".to_string()) .define_variable( "Spike".to_string(), PatternCondition::And { conditions: vec![ PatternCondition::GreaterThan { field: "current_value".to_string(), value: 0.0, }, // Custom condition: current > 3 * baseline // (simplified - would need custom condition type) ], }, ) .sequence(PatternSequence::Single { var: "Spike".to_string(), }) .build()?;
let matcher = PatternMatcher::new(pattern); let anomalies = matcher.apply(joined).await?;
// Handle anomalies tokio::spawn(async move { while let Some(event) = anomalies.recv().await { if let StreamEvent::Data(anomaly) = event { println!("🔴 ANOMALY:"); println!(" Metric: {}", anomaly.get("metric_id").unwrap()); println!(" Current: {}", anomaly.get("current_value").unwrap()); println!(" Baseline: {}", anomaly.get("baseline_value").unwrap()); } } });
Ok(())}Production Deployment Tips
1. Error Handling
use heliosdb_streaming::*;
async fn robust_pipeline() -> Result<()> { let stream = Stream::new(StreamConfig::default());
// Configure restart strategy let config = JobConfig { restart_strategy: RestartStrategy::FixedDelay { max_attempts: 3, delay: Duration::from_secs(10), }, ..Default::default() };
// Proper error handling match process_stream(stream).await { Ok(_) => println!("Pipeline completed successfully"), Err(e) => { eprintln!("Pipeline error: {}", e); // Trigger alerting // Fallback to backup pipeline } }
Ok(())}2. Monitoring
use heliosdb_streaming::*;
async fn monitored_pipeline() -> Result<()> { let job_manager = JobManager::new(JobManagerConfig::default())?;
// Submit with monitoring let job_id = job_manager.submit_job(config, graph).await?;
// Periodic health checks tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_secs(30)).await;
let metrics = job_manager.get_metrics(job_id).await.unwrap(); println!("Job {}: {} records/sec, {:.2}ms latency", job_id, metrics.throughput, metrics.avg_latency_ms );
// Alert if degraded if metrics.avg_latency_ms > 100.0 { println!("⚠ High latency detected!"); } } });
Ok(())}3. Checkpointing
use heliosdb_streaming::*;
async fn fault_tolerant_pipeline() -> Result<()> { // Enable checkpointing let backend = StateBackend::new(StateBackendConfig { storage_type: StorageType::RocksDB, checkpoint_dir: "/var/heliosdb/checkpoints".to_string(), enable_encryption: true, ..Default::default() })?;
let coordinator = CheckpointCoordinator::new( backend, Duration::from_secs(60) // Checkpoint every minute )?;
// Checkpoint on shutdown tokio::signal::ctrl_c().await?; println!("Shutting down, creating savepoint..."); let savepoint = coordinator.create_savepoint().await?; println!("Savepoint created: {}", savepoint);
Ok(())}Performance Optimization
1. Buffer Tuning
// High throughput: larger bufferslet stream = Stream::new(StreamConfig { buffer_size: 10000, ..Default::default()});
// Low latency: smaller bufferslet stream = Stream::new(StreamConfig { buffer_size: 100, ..Default::default()});2. Parallelism
let config = JobConfig { parallelism: num_cpus::get() * 2, ..Default::default()};3. Batch Processing
let sink = DatabaseSink::new(DatabaseConfig { batch_size: 5000, // Batch writes for efficiency batch_timeout: Duration::from_millis(100), ..Default::default()})?;More examples available in test files: /heliosdb-streaming/tests/
API Documentation: HELIOSDB_STREAMING_API.md
Generated: October 28, 2025