Time-Series User Guide
Time-Series User Guide
Version: 6.0 Last Updated: January 4, 2026 Audience: Developers and Database Administrators
Table of Contents
- Time-Series Data Types
- Compression Options
- Retention Policies
- Continuous Aggregates
- Gap Filling
- Time-Zone Handling
- Integration with MVCC
- Query Engine
- Ingestion Pipeline
- Partition Management
Time-Series Data Types
Core Data Structures
TimeSeriesPoint
The fundamental data structure for time-series data:
pub struct TimeSeriesPoint { /// Metric identifier (e.g., "cpu.usage", "sensor.temperature") pub metric: String,
/// Timestamp in milliseconds since Unix epoch (UTC) pub timestamp: u64,
/// Numeric value pub value: f64,
/// Optional dimensional tags for filtering pub tags: HashMap<String, String>,}TSValue Enum
For mixed-type time-series:
pub enum TSValue { Null, // Missing value Integer(i64), // Integer values Float(f64), // Floating-point values Boolean(bool), // Boolean flags}Creating Data Points
// Simple pointlet point = TimeSeriesPoint::new( "server.cpu_percent", 1704067200000, // Timestamp in ms 75.5,);
// Point with tags (dimensions)let mut tags = HashMap::new();tags.insert("host".to_string(), "web-server-01".to_string());tags.insert("datacenter".to_string(), "us-east-1".to_string());tags.insert("environment".to_string(), "production".to_string());
let tagged_point = TimeSeriesPoint::with_tags( "server.cpu_percent", 1704067200000, 75.5, tags,);Timestamp Conventions
| Format | Example | Usage |
|---|---|---|
| Unix milliseconds | 1704067200000 | Primary format in Rust API |
| Unix seconds | 1704067200 | Convert with * 1000 |
| ISO 8601 | 2024-01-01T00:00:00Z | SQL queries |
| RFC 3339 | 2024-01-01T00:00:00.000Z | API responses |
Compression Options
HeliosDB provides multiple compression strategies optimized for different data patterns.
Gorilla Compression
Facebook’s Gorilla algorithm is the default for time-series data:
use heliosdb_storage::timeseries::compression::{ GorillaCompressor, CompressionConfig,};
let config = CompressionConfig { compress_timestamps: true, // Delta-of-delta encoding compress_values: true, // XOR + bit-packing block_size: 1024, // Points per block min_ratio: 1.5, // Minimum compression ratio};
let compressor = GorillaCompressor::new(config);Algorithm Details
Timestamp Compression (Delta-of-Delta):
- First timestamp: stored in full (64 bits)
- Second timestamp: delta from first (variable bits)
- Subsequent timestamps: delta-of-delta (1-32 bits)
Timestamp sequence: [1000, 1060, 1120, 1180, 1240]Delta sequence: [ 60, 60, 60, 60]Delta-of-delta: [ 0, 0, 0]Value Compression (XOR + Bit-packing):
- XOR with previous value
- Leading/trailing zero compression
- Typical 4-8 bits per value for slowly changing data
Batch Compression
For high-throughput scenarios:
use heliosdb_storage::timeseries::compression::{ BatchCompressor, BatchCompressionConfig,};
let config = BatchCompressionConfig { block_size: 4096, // Larger blocks for better ratio compress_timestamps: true, compress_values: true, compress_metrics: true, // Dictionary compression for metric names enable_delta_encoding: true, target_ratio: 10.0, // Target 10x compression};
let compressor = BatchCompressor::with_config(config);
// Compress a batch of pointslet compressed = compressor.compress_batch(&points)?;
// Decompresslet decompressed = compressor.decompress_batch(&compressed)?;Dictionary Compression
For repeated strings (metric names, tags):
use heliosdb_storage::timeseries::compression::DictionaryCompressor;
let mut dict = DictionaryCompressor::new();
// Encode metrics to compact IDslet id1 = dict.encode("server.cpu_percent")?; // Returns 0let id2 = dict.encode("server.memory_used")?; // Returns 1let id3 = dict.encode("server.cpu_percent")?; // Returns 0 (reused)
// Decode backlet metric = dict.decode(0)?; // "server.cpu_percent"Compression Configuration Guide
| Use Case | Block Size | Recommended Settings |
|---|---|---|
| Real-time streaming | 256-512 | Low latency, moderate ratio |
| Batch ingestion | 4096+ | High ratio, higher latency |
| Long-term storage | 8192+ | Maximum compression |
| High-frequency data | 1024 | Balance of ratio and latency |
Retention Policies
Policy Configuration
use heliosdb_storage::timeseries::retention::{ RetentionPolicy, RetentionEngine,};use std::time::Duration;
// Time-based retentionlet policy = RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600)) // 30 days .with_cleanup_interval(3600) // Check every hour .with_auto_cleanup(true);
// Size-based retentionlet policy = RetentionPolicy::new(Duration::from_secs(365 * 24 * 3600)) // 1 year .with_max_size(100_000_000_000) // 100 GB limit .with_cleanup_interval(3600);Per-Metric Policies
let mut retention_engine = RetentionEngine::new(storage);
// Global policy: 30 daysretention_engine.set_policy( RetentionPolicy::new(Duration::from_secs(30 * 24 * 3600)));
// High-frequency metrics: 7 daysretention_engine.set_metric_policy( "metrics.high_freq.*", RetentionPolicy::new(Duration::from_secs(7 * 24 * 3600)));
// Critical metrics: 90 daysretention_engine.set_metric_policy( "metrics.critical.*", RetentionPolicy::new(Duration::from_secs(90 * 24 * 3600)));Manual Cleanup
// Run immediate cleanupretention_engine.cleanup_expired().await?;
// Cleanup by size constraintretention_engine.cleanup_by_size(50_000_000_000).await?; // Keep under 50 GB
// Delete specific metric datalet deleted = retention_engine.delete_range( "old.metric", start_timestamp, end_timestamp,).await?;
println!("Deleted {} records", deleted);Retention Statistics
let stats = retention_engine.stats();println!("Total deleted records: {}", stats.total_deleted_records);println!("Total freed bytes: {}", stats.total_deleted_bytes);println!("Cleanup operations: {}", stats.cleanup_count);Estimate Savings
let estimate = retention_engine.estimate_savings(&policy).await?;println!("Expired records: {}", estimate.expired_records);println!("Bytes to free: {}", estimate.expired_bytes);println!("Retention rate: {:.1}%", estimate.retention_rate * 100.0);Continuous Aggregates
Continuous aggregates pre-compute rollups for fast dashboard queries.
Configuration
use heliosdb_storage::timeseries::downsampling::{ DownsamplingEngine, DownsamplingConfig, DownsamplingTier, AggregationFunction,};
let mut engine = DownsamplingEngine::new(storage);
// Configure multi-tier downsamplinglet config = DownsamplingConfig::new(Duration::from_secs(60)) // 1-minute base .with_aggregation(AggregationFunction::Average) .add_tier( DownsamplingTier::new(Duration::from_secs(300), AggregationFunction::Average) .with_age_threshold(Duration::from_secs(3600)) // After 1 hour .with_retention(Duration::from_secs(7 * 24 * 3600)) // Keep 7 days ) .add_tier( DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average) .with_age_threshold(Duration::from_secs(24 * 3600)) // After 1 day .with_retention(Duration::from_secs(30 * 24 * 3600)) // Keep 30 days ) .add_tier( DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average) .with_age_threshold(Duration::from_secs(7 * 24 * 3600)) // After 7 days );
engine.configure_metric("cpu.usage", config).await?;Aggregation Functions
pub enum AggregationFunction { Average, // Mean value Min, // Minimum value Max, // Maximum value Sum, // Sum of values Count, // Number of points First, // First value in bucket Last, // Last value in bucket StdDev, // Standard deviation}Process Downsampling
// Manual processingengine.process_pending().await?;
// On-demand downsampling for a rangelet downsampled = engine.downsample_range( "cpu.usage", start_time, end_time, AggregationFunction::Average, Duration::from_secs(3600), // 1-hour buckets).await?;Gap Filling
Handle missing data points with various interpolation strategies.
Fill Strategies
pub enum FillMethod { Null, // Leave gaps as null Zero, // Fill with zero Forward, // Use previous value (LOCF) Backward, // Use next value Linear, // Linear interpolation}Using Gap Filling
use heliosdb_storage::timeseries::TimeSeriesOps;
let filled = TimeSeriesOps::fill_missing( &time_series, TimeInterval::Minute(1), // Expected interval FillMethod::Linear, // Interpolation strategy)?;Gap Filling in SQL
-- Fill gaps with linear interpolationSELECT time_bucket_gapfill('5 minutes', timestamp) AS bucket, sensor_id, interpolate(avg(temperature)) AS temperatureFROM sensor_readingsWHERE timestamp BETWEEN '2025-01-01' AND '2025-01-02'GROUP BY bucket, sensor_idORDER BY bucket;
-- Forward fill (Last Observation Carried Forward)SELECT time_bucket_gapfill('1 hour', timestamp) AS bucket, locf(avg(value)) AS filled_valueFROM metricsWHERE timestamp > NOW() - INTERVAL '1 day'GROUP BY bucket;Time-Zone Handling
Storage Convention
All timestamps are stored in UTC. Time-zone conversion happens at query time.
Rust API
use chrono::{DateTime, Utc, TimeZone};use chrono_tz::America::New_York;
// Store timestamp in UTClet utc_time = Utc::now();let timestamp_ms = utc_time.timestamp_millis() as u64;
// Convert from local time to UTC for storagelet local_time = New_York.with_ymd_and_hms(2025, 1, 1, 9, 0, 0).unwrap();let utc_for_storage = local_time.with_timezone(&Utc);SQL Queries
-- Query with timezone conversionSELECT timestamp AT TIME ZONE 'America/New_York' AS local_time, valueFROM metricsWHERE timestamp > NOW() - INTERVAL '24 hours'ORDER BY timestamp;
-- Aggregate by local day (accounting for DST)SELECT date_trunc('day', timestamp AT TIME ZONE 'America/New_York') AS local_day, AVG(value) AS daily_avgFROM metricsWHERE timestamp > NOW() - INTERVAL '30 days'GROUP BY local_day;Time Interval Truncation
use heliosdb_storage::timeseries::TimeInterval;
let interval = TimeInterval::Hour(1);let truncated = interval.truncate(datetime)?;// 2025-01-01 14:37:42 -> 2025-01-01 14:00:00Integration with MVCC
HeliosDB’s time-series storage integrates with MVCC for transactional guarantees.
Point-in-Time Queries
// Query data as it existed at a specific timelet snapshot_time = 1704067200000; // 2024-01-01 00:00:00 UTC
let historical_points = engine.query_at_snapshot( "cpu.usage", start_time, end_time, snapshot_time, // MVCC snapshot).await?;Transactional Writes
// Write multiple metrics atomicallyengine.begin_transaction().await?;
engine.write_point("cpu.usage", 75.5, None).await?;engine.write_point("memory.usage", 62.3, None).await?;engine.write_point("disk.usage", 45.0, None).await?;
engine.commit().await?;Consistent Reads
// Read consistent snapshot across multiple metricslet snapshot = engine.get_snapshot().await?;
let cpu = snapshot.query_range("cpu.usage", start, end).await?;let memory = snapshot.query_range("memory.usage", start, end).await?;// Both queries see the same consistent stateQuery Engine
Time Range Queries
use heliosdb_storage::timeseries::query_engine::{ TimeRangeQuery, TimeSeriesQueryEngine, AggregationFunction,};
let mut engine = TimeSeriesQueryEngine::new();
let query = TimeRangeQuery::new("cpu.usage", start_time, end_time) .with_tag("host", "server-01") .with_limit(1000) .with_aggregation(Duration::from_secs(300), AggregationFunction::Average);
let results = engine.execute_query(&query, &data_points).await?;Window Functions
use heliosdb_storage::timeseries::query_engine::WindowType;
// Tumbling windows (non-overlapping)let tumbling = engine.execute_windowed_query( &points, WindowType::Tumbling { size: Duration::from_secs(300) }, AggregationFunction::Average,)?;
// Sliding windows (overlapping)let sliding = engine.execute_windowed_query( &points, WindowType::Sliding { size: Duration::from_secs(300), slide: Duration::from_secs(60), }, AggregationFunction::Average,)?;
// Session windows (gap-based)let sessions = engine.execute_windowed_query( &points, WindowType::Session { gap: Duration::from_secs(1800) }, AggregationFunction::Count,)?;Time-Based Joins
use heliosdb_storage::timeseries::query_engine::{ TimeJoinConfig, TimeJoinType, FillStrategy,};
let config = TimeJoinConfig::new(TimeJoinType::AsOf, 5000) // 5 second tolerance .with_fill_strategy(FillStrategy::Forward);
let joined = engine.execute_time_join( &left_series, &right_series, &config,)?;Query Result Caching
// Caching is automatic with 5-minute TTL// Clear cache if neededengine.clear_cache();Ingestion Pipeline
Configuration
use heliosdb_storage::timeseries::ingestion::{ IngestionPipeline, IngestionConfig,};
let config = IngestionConfig { batch_size: 10000, // Points per batch batch_timeout: Duration::from_millis(100), // Max wait time write_workers: 4, // Parallel write threads buffer_capacity: 100000, // Out-of-order buffer size handle_out_of_order: true, // Handle late-arriving data max_time_skew: 60000, // 1 minute tolerance backfill_mode: false, // Normal mode};
let pipeline = IngestionPipeline::new(config, storage, compressor);Ingesting Data
// Single pointpipeline.ingest(point).await?;
// Batch ingestion (preferred)pipeline.ingest_batch(&points).await?;
// Historical data backfillpipeline.ingest_backfill(&historical_points).await?;
// Force flush pending batchespipeline.flush().await?;Ingestion Statistics
let stats = pipeline.stats().await;
println!("Total points: {}", stats.total_points);println!("Throughput: {:.0} pts/sec", stats.points_per_second);println!("Batches written: {}", stats.batches_written);println!("Avg batch size: {:.0}", stats.avg_batch_size);println!("Out-of-order: {}", stats.out_of_order_count);println!("Avg latency: {:.2}ms", stats.avg_write_latency_ms);println!("Buffer utilization: {:.1}%", stats.buffer_utilization * 100.0);Graceful Shutdown
pipeline.shutdown().await?;Partition Management
Partition Strategies
use heliosdb_storage::timeseries::partitioning::{ PartitionManager, PartitionStrategy,};
// Choose strategy based on data volumelet strategy = PartitionStrategy::Daily; // Most common
// Available strategies:// - Hourly: For very high volume (1M+ points/hour)// - Daily: Standard metrics (recommended)// - Weekly: Lower volume data// - Monthly: Long-term storage// - Yearly: Archive data// - Custom(seconds): Flexible intervalsPartition Operations
let manager = PartitionManager::new( "/data/partitions", PartitionStrategy::Daily,).await?;
// Compute partition for timestamplet partition_id = manager.compute_partition_id(timestamp).await?;
// Get or create partitionlet partition = manager.get_or_create_partition(timestamp).await?;
// Query partitions for time rangelet partitions = manager.get_partitions_for_range(start, end).await?;
// List all partitionslet all_partitions = manager.list_partitions().await;
// Archive old partitionmanager.archive_partition(partition_id).await?;
// Delete partitionmanager.delete_partition(partition_id).await?;Partition Metadata
let partition = manager.get_partition(partition_id).await?;
if let Some(metadata) = partition { println!("Partition ID: {}", metadata.partition_id); println!("Path: {:?}", metadata.path); println!("Records: {}", metadata.record_count); println!("Size: {} bytes", metadata.size_bytes); println!("Time range: {} - {}", metadata.min_timestamp, metadata.max_timestamp); println!("Compressed: {}", metadata.compressed); println!("Archived: {}", metadata.archived);}Storage Statistics
let total_size = manager.total_size().await;let total_records = manager.total_records().await;
println!("Total storage: {} GB", total_size / 1_000_000_000);println!("Total records: {}", total_records);Best Practices
Naming Conventions
# Hierarchical metric namesservice.component.metric_name
# Examplesweb.api.request_countdatabase.queries.latency_mscache.redis.hit_rateTag Design
// Good: Low cardinality, useful for filteringtags.insert("environment", "production");tags.insert("region", "us-east-1");tags.insert("service", "api-gateway");
// Avoid: High cardinality tags (creates too many series)// tags.insert("user_id", user_id); // Don't do this// tags.insert("request_id", request_id); // Don't do thisBatch Size Tuning
| Scenario | Recommended Batch Size |
|---|---|
| Real-time streaming | 1,000 - 5,000 |
| Batch processing | 10,000 - 50,000 |
| Historical backfill | 50,000 - 100,000 |
Compression vs. Query Speed
| Priority | Configuration |
|---|---|
| Maximum compression | Large blocks (8192+), all compression enabled |
| Fast queries | Smaller blocks (512-1024), skip dictionary compression |
| Balanced | 1024-2048 block size, all compression enabled |
See Also: README | Quick Start | Examples