ClickHouse Examples
ClickHouse Examples
Practical examples for HeliosDB’s ClickHouse protocol support.
Connection Examples
Python Client
from clickhouse_driver import Client
# Basic connectionclient = Client( host='localhost', port=9000, user='default', password='', database='default')
# Execute queryresult = client.execute('SELECT count() FROM events')print(f"Total events: {result[0][0]}")Go Client
import "github.com/ClickHouse/clickhouse-go/v2"
conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{"localhost:9000"}, Auth: clickhouse.Auth{ Database: "default", Username: "default", Password: "", }, Compression: &clickhouse.Compression{ Method: clickhouse.CompressionLZ4, }, MaxOpenConns: 5,})
ctx := context.Background()rows, _ := conn.Query(ctx, "SELECT count() FROM events")Node.js Client
const { createClient } = require('@clickhouse/client');
const client = createClient({ host: 'localhost:9000', database: 'default', username: 'default', password: '', compression: { type: 'lz4' },});
const result = await client.query({ query: 'SELECT count() FROM events',});console.log(await result.json());Table Creation
MergeTree Table
CREATE TABLE events ( timestamp DateTime, event_type String, user_id UInt32, duration Float32) ENGINE = MergeTree()ORDER BY (timestamp, user_id)PRIMARY KEY (timestamp)PARTITION BY toYYYYMM(timestamp);ReplacingMergeTree (Upsert)
CREATE TABLE user_profiles ( user_id UInt32, version UInt32, name String, email String, updated_at DateTime) ENGINE = ReplacingMergeTree(version)ORDER BY user_idPARTITION BY toYYYYMM(updated_at);
-- Query latest versionSELECT user_id, name, emailFROM user_profilesFINALWHERE user_id = 123;AggregatingMergeTree
CREATE TABLE metrics_aggregated ( timestamp DateTime, metric_name String, metric_value AggregateFunction(sum, Float64), metric_count AggregateFunction(count, UInt64)) ENGINE = AggregatingMergeTree()ORDER BY (timestamp, metric_name)PARTITION BY toYYYYMMDD(timestamp);
-- Insert aggregated valuesINSERT INTO metrics_aggregatedSELECT toStartOfMinute(timestamp) as timestamp, metric_name, sumState(value) as metric_value, countState(1) as metric_countFROM raw_metricsGROUP BY timestamp, metric_name;
-- Query merged valuesSELECT timestamp, metric_name, sumMerge(metric_value) as total, countMerge(metric_count) as countFROM metrics_aggregatedGROUP BY timestamp, metric_name;Query Optimization
PREWHERE for Early Filtering
-- FAST: Filters before reading heavy columnsSELECT user_id, event_type, timestampFROM eventsPREWHERE timestamp > '2025-01-01' AND event_type = 'purchase'WHERE user_id > 0;Sampling for Large Datasets
-- Analyze 10% of dataSELECT event_type, count() as cntFROM eventsSAMPLE 0.1GROUP BY event_type;Efficient Aggregations
-- Basic aggregationsSELECT event_type, count() as events, count(DISTINCT user_id) as unique_users, sum(value) as total_value, avg(duration) as avg_duration, min(timestamp) as first_event, max(timestamp) as last_eventFROM eventsGROUP BY event_type;Quantile Functions
Percentile Calculations
-- Quick approximate quantilesSELECT event_type, quantile(0.5)(duration) as p50, quantile(0.95)(duration) as p95, quantile(0.99)(duration) as p99FROM eventsGROUP BY event_type;
-- Multiple quantiles at onceSELECT event_type, quantiles(0.25, 0.5, 0.75, 0.95, 0.99)(duration) as percentilesFROM eventsGROUP BY event_type;Top-K Queries
-- Get top 10 event typesSELECT event_type, count() as cntFROM eventsGROUP BY event_typeORDER BY cnt DESCLIMIT 10;
-- TopK function (more efficient)SELECT user_id, topK(10)(event_type) as top_eventsFROM eventsGROUP BY user_id;Unique Counting
-- Approximate distinct count (very fast!)SELECT event_type, uniq(user_id) as unique_users, uniqHLL12(user_id) as uniq_hllFROM eventsGROUP BY event_type;
-- Exact distinct countSELECT event_type, uniqExact(user_id) as unique_usersFROM eventsGROUP BY event_type;Window Functions
Running Totals
SELECT timestamp, event_type, value, sum(value) OVER ( PARTITION BY event_type ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as running_totalFROM eventsORDER BY event_type, timestamp;Row Number and Ranking
SELECT timestamp, user_id, value, row_number() OVER ( PARTITION BY user_id ORDER BY timestamp ) as event_seq, rank() OVER ( PARTITION BY user_id ORDER BY value DESC ) as value_rankFROM eventsORDER BY user_id, timestamp;Lead/Lag for Time-Series
SELECT timestamp, value, lag(value, 1) OVER (ORDER BY timestamp) as prev_value, lead(value, 1) OVER (ORDER BY timestamp) as next_value, value - lag(value, 1) OVER (ORDER BY timestamp) as value_changeFROM metricsORDER BY timestamp;Materialized Views
-- Base tableCREATE TABLE events ( timestamp DateTime, event_type String, user_id UInt32, value Float32) ENGINE = MergeTree()ORDER BY timestamp;
-- Aggregated viewCREATE MATERIALIZED VIEW events_per_minuteENGINE = AggregatingMergeTree()ORDER BY (timestamp, event_type)ASSELECT toStartOfMinute(timestamp) as timestamp, event_type, count() as event_count, sum(value) as total_value, avg(value) as avg_valueFROM eventsGROUP BY toStartOfMinute(timestamp), event_type;
-- Query view directlySELECT * FROM events_per_minuteWHERE timestamp > '2025-01-01'ORDER BY timestamp DESC;External Data Integration
S3 Integration
-- Direct query from S3SELECT * FROM s3( 'https://my-bucket.s3.amazonaws.com/data/events/*.parquet', 'PARQUET')LIMIT 100;
-- Insert from S3INSERT INTO eventsSELECT *FROM s3( 'https://my-bucket.s3.amazonaws.com/data/events/*.csv', 'CSV', 'timestamp DateTime, event_type String, user_id UInt32, value Float32');Kafka Integration
-- Create Kafka source tableCREATE TABLE events_kafka ( timestamp DateTime, event_type String, user_id UInt32, value Float32) ENGINE = Kafka()SETTINGS kafka_broker_list = 'kafka-1:9092,kafka-2:9092', kafka_topic_list = 'events', kafka_group_id = 'clickhouse_consumer', kafka_format = 'JSONEachRow';
-- Create materialized view to consumeCREATE MATERIALIZED VIEW events_mvTO eventsAS SELECT * FROM events_kafka;Bulk Insert
def bulk_insert_events(client, events, batch_size=10000): """Insert events efficiently in batches.""" for i in range(0, len(events), batch_size): batch = events[i:i+batch_size] client.execute( 'INSERT INTO events (timestamp, event_type, user_id, value) VALUES', batch, types_check=True ) print(f"Inserted {i+len(batch)} events")
# Generate sample dataimport datetimeimport random
events = [ ( datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 24)), random.choice(['click', 'view', 'purchase']), random.randint(1, 10000), random.uniform(0, 100) ) for _ in range(1000000)]
bulk_insert_events(client, events, batch_size=50000)Real-World Examples
Web Analytics Dashboard
# Create tablesclient.execute('''CREATE TABLE IF NOT EXISTS page_views ( timestamp DateTime, session_id UUID, user_id UInt32, page_path String, duration_ms UInt32, status_code UInt16, referrer String) ENGINE = MergeTree()ORDER BY (timestamp, user_id)PARTITION BY toYYYYMMDD(timestamp)''')
# Query for dashboarddashboard_data = client.execute('''SELECT page_path, count() as views, count(DISTINCT user_id) as unique_users, round(avg(duration_ms), 0) as avg_duration_ms, round(quantile(0.95)(duration_ms), 0) as p95_duration_msFROM page_viewsWHERE timestamp >= now() - INTERVAL 24 HOURGROUP BY page_pathORDER BY views DESCLIMIT 20''')Log Analysis
CREATE TABLE application_logs ( timestamp DateTime, level Enum8('DEBUG' = 1, 'INFO' = 2, 'WARN' = 3, 'ERROR' = 4), service String, message String, trace_id String, duration_ms UInt32) ENGINE = MergeTree()ORDER BY (timestamp, service, level)PARTITION BY toYYYYMMDD(timestamp)TTL timestamp + INTERVAL 30 DAY;
-- Error pattern analysisSELECT service, level, substring(message, 1, 100) as error_pattern, count() as occurrences, max(timestamp) as last_seen, avg(duration_ms) as avg_durationFROM application_logsWHERE timestamp > now() - INTERVAL 24 HOUR AND level >= 3GROUP BY service, level, error_patternORDER BY occurrences DESCLIMIT 20;Time-Series Metrics
# High-cardinality time-series storageclient.execute('''CREATE TABLE metrics ( timestamp DateTime, metric_name String, metric_value Float64, tags Array(String)) ENGINE = MergeTree()ORDER BY (timestamp, metric_name)PARTITION BY toYYYYMMDD(timestamp)''')
# Pre-aggregated hourly tableclient.execute('''CREATE MATERIALIZED VIEW metrics_1hENGINE = AggregatingMergeTree()ORDER BY (hour, metric_name)ASSELECT toStartOfHour(timestamp) as hour, metric_name, sumState(metric_value) as total, countState(1) as count, minState(metric_value) as min_value, maxState(metric_value) as max_valueFROM metricsGROUP BY hour, metric_name''')
# Query with finalizationresult = client.execute('''SELECT metric_name, sumMerge(total) as total_value, countMerge(count) as data_points, minMerge(min_value) as min_value, maxMerge(max_value) as max_valueFROM metrics_1hWHERE hour >= now() - INTERVAL 7 DAYGROUP BY metric_nameORDER BY metric_name''')Troubleshooting Queries
Check Slow Queries
SELECT query_duration_ms, query, read_rows, result_rowsFROM system.query_logWHERE query_duration_ms > 1000ORDER BY query_duration_ms DESCLIMIT 10;Analyze Index Usage
SELECT table, name, type, data_compressed_bytes, data_uncompressed_bytesFROM system.data_skipping_indices;Check Table Size
SELECT database, table, formatReadableSize(sum(bytes)) as size, sum(rows) as rowsFROM system.partsWHERE activeGROUP BY database, tableORDER BY sum(bytes) DESC;Related: README.md | CONFIGURATION.md | COMPATIBILITY.md
Last Updated: December 2025