Federated Learning User Guide
Federated Learning User Guide
Table of Contents
- Introduction
- Quick Start
- HIPAA Compliance
- GDPR Compliance
- 100+ Node Deployment
- Best Practices
- Troubleshooting
- API Reference
Introduction
HeliosDB Federated Learning enables distributed machine learning training across multiple nodes while preserving data privacy and ensuring regulatory compliance (HIPAA, GDPR).
Key Features
- Privacy-Preserving: Differential privacy, secure aggregation
- Compliant: Built-in HIPAA and GDPR compliance
- Scalable: Tested with 150+ nodes
- Robust: Automatic failure recovery
- Transparent: Comprehensive audit logging
When to Use Federated Learning
Good Use Cases:
- Healthcare data across multiple hospitals
- Financial data across bank branches
- User data across geographic regions
- Sensitive data that cannot be centralized
❌ Not Recommended:
- Small datasets (<1000 samples)
- Real-time predictions (use centralized ML)
- When data can be safely centralized
Quick Start
1. Basic Setup
use heliosdb_federated_learning::*;
#[tokio::main]async fn main() -> Result<()> { // Create federated learning engine let config = Config { aggregation_strategy: AggregationStrategy::FedAvg, round_config: RoundConfig { max_rounds: 100, min_nodes_per_round: 10, selection_strategy: NodeSelectionStrategy::All, node_timeout_secs: 60, target_accuracy: Some(0.95), learning_rate: 0.01, convergence_threshold: 0.001, }, enable_privacy: true, privacy_config: Some(DifferentialPrivacyConfig { epsilon: 0.1, delta: 1e-5, mechanism: NoiseMechanism::Gaussian, }), };
let engine = FederatedLearningEngine::new(config).await?;
// Register model let initial_params = vec![0.0; 1000]; engine.register_model( "my_model".to_string(), "neural_network".to_string(), initial_params, ).await?;
// Create coordinator let coordinator = engine.create_coordinator("my_model".to_string()).await?;
// Create worker nodes for i in 0..10 { let worker_config = WorkerConfig { node_id: format!("node_{}", i), local_epochs: 1, batch_size: 32, learning_rate: 0.01, };
engine.create_worker(worker_config.clone()).await?; coordinator.register_node(worker_config.node_id, 1000).await?; }
// Start training engine.start_training("my_model".to_string()).await?;
println!("Federated learning started!");
Ok(())}2. Training Loop
use heliosdb_federated_learning::*;
async fn training_loop( coordinator: &Coordinator, workers: Vec<Arc<FederatedWorker>>,) -> Result<()> { for round in 0..100 { println!("Round {}/100", round + 1);
// Select nodes let selected_nodes = coordinator.select_nodes_for_round().await?;
// Collect updates let mut updates = Vec::new(); for worker in &workers { if selected_nodes.contains(&worker.node_id()) { let update = worker.train_local_model().await?; updates.push(update); } }
// Aggregate coordinator.execute_round(updates).await?;
// Check convergence if coordinator.has_converged().await { println!("Converged at round {}", round + 1); break; } }
Ok(())}HIPAA Compliance
Setup HIPAA Compliance
use heliosdb_federated_learning::*;
async fn setup_hipaa_compliance() -> Result<()> { // Configure HIPAA compliance let hipaa_config = HipaaConfig { encrypt_phi_at_rest: true, encrypt_phi_in_transit: true, enable_audit_logging: true, data_residency: "US".to_string(), enable_access_controls: true, enable_integrity_checks: true, min_key_size_bits: 256, auto_detect_phi: true, };
let mut hipaa_manager = HipaaComplianceManager::new(hipaa_config)?;
// Encrypt PHI data let patient_data = b"Patient: John Doe, MRN: 12345"; let encrypted_phi = hipaa_manager.encrypt_phi( patient_data, "patient_12345".to_string(), )?;
println!("PHI encrypted with AES-256-GCM");
// Verify gradients don't leak PHI let gradients = vec![0.1, 0.2, 0.3, 0.4, 0.5]; let is_safe = hipaa_manager.verify_gradient_privacy(&gradients)?;
if !is_safe { return Err(FederatedLearningError::ComplianceError( "Gradients may leak PHI".to_string(), )); }
// Generate compliance report let report = hipaa_manager.verify_compliance(); report.print();
if !report.overall_compliant { return Err(FederatedLearningError::ComplianceError( "HIPAA compliance failed".to_string(), )); }
Ok(())}HIPAA Audit Logging
use heliosdb_federated_learning::*;use std::collections::HashMap;
fn audit_phi_access( hipaa_manager: &mut HipaaComplianceManager, user_id: String, resource_id: String,) { hipaa_manager.log_audit_event( user_id, AuditAction::PhiAccess, resource_id, true, // success HashMap::new(), );}
fn export_audit_log( hipaa_manager: &HipaaComplianceManager,) -> Vec<AuditLogEntry> { let start_date = chrono::Utc::now() - chrono::Duration::days(30); let end_date = chrono::Utc::now();
hipaa_manager.export_audit_log(start_date, end_date)}HIPAA Best Practices
- Encrypt All PHI: Never store PHI in plaintext
- Audit Everything: Log all PHI access and modifications
- Verify Gradients: Always check gradients before aggregation
- Regular Reports: Generate compliance reports monthly
- Data Residency: Ensure US-only storage for HIPAA data
GDPR Compliance
Setup GDPR Compliance
use heliosdb_federated_learning::*;
async fn setup_gdpr_compliance() -> Result<()> { // Configure GDPR compliance let gdpr_config = GdprConfig { enable_right_to_be_forgotten: true, enable_data_portability: true, enable_consent_management: true, retention_period_days: 365, enable_data_minimization: true, enable_purpose_limitation: true, enable_explainability: true, };
let mut gdpr_manager = GdprComplianceManager::new(gdpr_config);
// Record user consent gdpr_manager.record_consent( "user123".to_string(), vec![ "model_training".to_string(), "data_analytics".to_string(), ], "v1.0".to_string(), )?;
println!("User consent recorded");
// Add user data gdpr_manager.add_user_data( "user123".to_string(), vec![1, 2, 3, 4, 5], "model_training".to_string(), DataCategory::TrainingData, )?;
println!("User data added");
Ok(())}Right to be Forgotten
use heliosdb_federated_learning::*;
async fn delete_user_data( gdpr_manager: &mut GdprComplianceManager, user_id: &str, node_ids: Vec<String>,) -> Result<()> { println!("Processing deletion request for {}", user_id);
// Delete user data from all nodes let deletion = gdpr_manager.delete_user_data( user_id, node_ids, ).await?;
println!("Deleted from {} nodes", deletion.deleted_from_nodes.len());
// Verify deletion let verified = gdpr_manager.verify_deletion(user_id).await?;
if !verified { return Err(FederatedLearningError::ComplianceError( format!("Failed to verify deletion for {}", user_id), )); }
println!("Deletion verified for {}", user_id);
Ok(())}Data Portability
use heliosdb_federated_learning::*;
async fn export_user_data( gdpr_manager: &GdprComplianceManager, user_id: &str,) -> Result<String> { // Export user data let export = gdpr_manager.export_user_data(user_id)?;
println!("Exported data for {}", user_id); println!(" User data records: {}", export.user_data.len()); println!(" Model contributions: {}", export.model_contributions.len());
// Convert to JSON let json = serde_json::to_string_pretty(&export)?;
Ok(json)}GDPR Best Practices
- Get Consent First: Always record consent before using data
- Purpose Limitation: Only use data for stated purposes
- Data Minimization: Collect only necessary data
- Respond Quickly: Process deletion requests within 30 days
- Audit Regularly: Review consent and data usage quarterly
100+ Node Deployment
Production Deployment
use heliosdb_federated_learning::*;
async fn deploy_100_node_cluster() -> Result<()> { // Configure for production let config = Config { aggregation_strategy: AggregationStrategy::FedAvg, round_config: RoundConfig { max_rounds: 1000, min_nodes_per_round: 80, // 80% quorum selection_strategy: NodeSelectionStrategy::All, node_timeout_secs: 30, target_accuracy: Some(0.95), learning_rate: 0.01, convergence_threshold: 0.001, }, enable_privacy: true, privacy_config: Some(DifferentialPrivacyConfig { epsilon: 0.1, delta: 1e-5, mechanism: NoiseMechanism::Gaussian, }), };
let engine = FederatedLearningEngine::new(config).await?;
// Register production model let initial_params = vec![0.0; 10000]; // Large model engine.register_model( "production_model".to_string(), "deep_neural_network".to_string(), initial_params, ).await?;
// Create coordinator let coordinator = engine.create_coordinator( "production_model".to_string() ).await?;
// Deploy 100 worker nodes println!("Deploying 100 worker nodes...");
for i in 0..100 { let worker_config = WorkerConfig { node_id: format!("prod_node_{:03}", i), local_epochs: 1, batch_size: 32, learning_rate: 0.01, };
engine.create_worker(worker_config.clone()).await?; coordinator.register_node(worker_config.node_id, 10000).await?;
if (i + 1) % 10 == 0 { println!(" Deployed {}/100 nodes", i + 1); } }
println!("All nodes deployed successfully");
Ok(())}Monitoring and Health Checks
use heliosdb_federated_learning::*;
async fn monitor_cluster_health( coordinator: &Coordinator,) -> Result<()> { // Get cluster state let state = coordinator.get_state().await;
println!("Cluster Health:"); println!(" Current round: {}/{}", state.current_round, state.total_rounds); println!(" Is training: {}", state.is_training); println!(" Converged: {}", state.converged);
if let Some(accuracy) = state.best_accuracy { println!(" Best accuracy: {:.2}%", accuracy * 100.0); }
if let Some(loss) = state.best_loss { println!(" Best loss: {:.4}", loss); }
// Get node information let nodes = coordinator.list_nodes().await; let active_nodes = nodes.iter().filter(|n| n.available).count();
println!("\nNode Status:"); println!(" Total nodes: {}", nodes.len()); println!(" Active nodes: {}", active_nodes); println!(" Uptime: {:.1}%", (active_nodes as f64 / nodes.len() as f64) * 100.0 );
// Alert if uptime is low let uptime_pct = (active_nodes as f64 / nodes.len() as f64) * 100.0; if uptime_pct < 95.0 { eprintln!("WARNING: Cluster uptime below 95% ({:.1}%)", uptime_pct); }
Ok(())}Failure Recovery
use heliosdb_federated_learning::*;
async fn handle_node_failure( coordinator: &Coordinator, failed_node_id: &str,) -> Result<()> { println!("Node {} failed", failed_node_id);
// Unregister failed node coordinator.unregister_node(failed_node_id).await?;
// Check if we still have quorum let nodes = coordinator.list_nodes().await; let active_count = nodes.iter().filter(|n| n.available).count();
if active_count < 80 { // Assuming 100 nodes, 80% quorum eprintln!("CRITICAL: Lost quorum ({} active nodes)", active_count); return Err(FederatedLearningError::InsufficientNodes { required: 80, available: active_count, }); }
println!("Continuing with {} active nodes", active_count);
Ok(())}
async fn recover_failed_node( engine: &FederatedLearningEngine, coordinator: &Coordinator, node_id: String,) -> Result<()> { println!("Recovering node {}", node_id);
// Re-create worker let worker_config = WorkerConfig { node_id: node_id.clone(), local_epochs: 1, batch_size: 32, learning_rate: 0.01, };
engine.create_worker(worker_config.clone()).await?;
// Re-register with coordinator coordinator.register_node(node_id.clone(), 10000).await?;
println!("Node {} recovered", node_id);
Ok(())}Best Practices
1. Privacy Configuration
// For sensitive data (healthcare, financial)let privacy_config = DifferentialPrivacyConfig { epsilon: 0.1, // Strong privacy delta: 1e-5, // Very small failure probability mechanism: NoiseMechanism::Gaussian,};
// For less sensitive datalet privacy_config = DifferentialPrivacyConfig { epsilon: 1.0, // Weaker privacy, better accuracy delta: 1e-3, // Larger failure probability mechanism: NoiseMechanism::Gaussian,};2. Node Selection Strategies
// Use all nodes (best for convergence)NodeSelectionStrategy::All
// Random sampling (faster rounds)NodeSelectionStrategy::Random { k: 50 }
// Data-weighted sampling (better accuracy)NodeSelectionStrategy::MostData { k: 50 }
// Performance-based sampling (faster convergence)NodeSelectionStrategy::BestPerformance { k: 50 }3. Convergence Configuration
let round_config = RoundConfig { max_rounds: 100, min_nodes_per_round: 10, selection_strategy: NodeSelectionStrategy::All, node_timeout_secs: 60, target_accuracy: Some(0.95), // Stop when accuracy reached learning_rate: 0.01, convergence_threshold: 0.001, // Stop when loss change < 0.001};4. Error Handling
async fn robust_training( coordinator: &Coordinator, workers: Vec<Arc<FederatedWorker>>,) -> Result<()> { for round in 0..100 { // Select nodes let selected_nodes = match coordinator.select_nodes_for_round().await { Ok(nodes) => nodes, Err(e) => { eprintln!("Failed to select nodes: {}", e); continue; // Skip this round } };
// Collect updates with timeout let mut updates = Vec::new(); for worker in &workers { if selected_nodes.contains(&worker.node_id()) { match tokio::time::timeout( Duration::from_secs(60), worker.train_local_model(), ).await { Ok(Ok(update)) => updates.push(update), Ok(Err(e)) => { eprintln!("Worker {} failed: {}", worker.node_id(), e); } Err(_) => { eprintln!("Worker {} timed out", worker.node_id()); } } } }
// Only aggregate if we have enough updates if updates.len() < selected_nodes.len() / 2 { eprintln!("Not enough updates, skipping round {}", round); continue; }
// Aggregate match coordinator.execute_round(updates).await { Ok(_) => println!("Round {} completed", round), Err(e) => eprintln!("Round {} failed: {}", round, e), } }
Ok(())}Troubleshooting
Common Issues
Issue: “Insufficient nodes”
Cause: Not enough active nodes for training round
Solution:
// Reduce minimum nodes per roundlet round_config = RoundConfig { min_nodes_per_round: 5, // Reduced from 10 ..Default::default()};
// Or wait for nodes to recovertokio::time::sleep(Duration::from_secs(30)).await;Issue: “Privacy budget exhausted”
Cause: Too many training rounds with tight privacy
Solution:
// Increase epsilon (weaker privacy)let privacy_config = DifferentialPrivacyConfig { epsilon: 0.5, // Increased from 0.1 delta: 1e-5, mechanism: NoiseMechanism::Gaussian,};
// Or reduce number of roundslet round_config = RoundConfig { max_rounds: 50, // Reduced from 100 ..Default::default()};Issue: “Convergence not reached”
Cause: Model not converging within max rounds
Solution:
// Increase max roundslet round_config = RoundConfig { max_rounds: 200, // Increased from 100 ..Default::default()};
// Or adjust learning ratelet round_config = RoundConfig { learning_rate: 0.001, // Reduced from 0.01 ..Default::default()};
// Or relax convergence thresholdlet round_config = RoundConfig { convergence_threshold: 0.01, // Increased from 0.001 ..Default::default()};Debugging Tools
// Enable verbose logginguse tracing::Level;use tracing_subscriber;
tracing_subscriber::fmt() .with_max_level(Level::DEBUG) .init();
// Monitor round historylet history = coordinator.get_round_history().await;for round in history { println!("Round {}: {} nodes, loss: {:?}", round.round_number, round.participating_nodes.len(), round.aggregated_loss, );}
// Check node healthfor node in coordinator.list_nodes().await { println!("Node {}: available={}, rounds={}", node.node_id, node.available, node.rounds_participated, );}API Reference
Core Types
// Federated learning enginepub struct FederatedLearningEngine { /* ... */ }
// Training coordinatorpub struct Coordinator { /* ... */ }
// Worker nodepub struct FederatedWorker { /* ... */ }
// HIPAA compliance managerpub struct HipaaComplianceManager { /* ... */ }
// GDPR compliance managerpub struct GdprComplianceManager { /* ... */ }Configuration Types
pub struct Config { pub aggregation_strategy: AggregationStrategy, pub round_config: RoundConfig, pub enable_privacy: bool, pub privacy_config: Option<DifferentialPrivacyConfig>,}
pub struct RoundConfig { pub max_rounds: u64, pub min_nodes_per_round: usize, pub selection_strategy: NodeSelectionStrategy, pub node_timeout_secs: u64, pub target_accuracy: Option<f64>, pub learning_rate: f32, pub convergence_threshold: f64,}
pub struct DifferentialPrivacyConfig { pub epsilon: f64, pub delta: f64, pub mechanism: NoiseMechanism,}Enums
pub enum AggregationStrategy { FedAvg, // Federated averaging FedProx, // Proximal term regularization FedOpt, // Adaptive optimization}
pub enum NodeSelectionStrategy { All, Random { k: usize }, MostData { k: usize }, BestPerformance { k: usize },}
pub enum NoiseMechanism { Gaussian, Laplace,}For complete API documentation, see API.md.
Version: 1.0.0 Last Updated: 2025-11-15 Module: heliosdb-federated-learning