HTTP/REST API Examples
HTTP/REST API Examples
Comprehensive code examples for using HeliosDB’s HTTP/REST API.
Authentication
API Key Authentication
# Using Authorization headercurl -X GET https://localhost:443/api/v1/tables \ -H "Authorization: Bearer YOUR_API_KEY"
# Using X-API-Key headercurl -X GET https://localhost:443/api/v1/tables \ -H "X-API-Key: YOUR_API_KEY"JWT Authentication
import requestsimport jwtfrom datetime import datetime, timedelta
# Generate JWT tokendef generate_token(secret, user_id): payload = { "sub": user_id, "iat": datetime.utcnow(), "exp": datetime.utcnow() + timedelta(hours=1), "iss": "heliosdb" } return jwt.encode(payload, secret, algorithm="HS256")
token = generate_token("your-secret", "user123")
# Use tokenresponse = requests.get( "https://localhost:443/api/v1/tables", headers={"Authorization": f"Bearer {token}"})Query Operations
Execute SQL Query
import requests
def execute_query(sql: str, params: dict = None): response = requests.post( "https://localhost:443/api/v1/query", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={ "sql": sql, "params": params or {} } ) return response.json()
# Simple queryresult = execute_query("SELECT * FROM users WHERE status = 'active' LIMIT 10")print(result["data"])
# Parameterized queryresult = execute_query( "SELECT * FROM orders WHERE user_id = :user_id AND created_at > :date", {"user_id": 123, "date": "2024-01-01"})Async Query Execution
import requestsimport time
def execute_async_query(sql: str): # Start async query response = requests.post( "https://localhost:443/api/v1/query/async", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={"sql": sql} ) query_id = response.json()["query_id"]
# Poll for results while True: status_response = requests.get( f"https://localhost:443/api/v1/query/{query_id}", headers={"Authorization": "Bearer YOUR_API_KEY"} ) result = status_response.json()
if result["status"] == "completed": return result["data"] elif result["status"] == "failed": raise Exception(result["error"])
time.sleep(1)
# Long-running queryresult = execute_async_query(""" SELECT customer_id, SUM(amount) as total FROM orders WHERE created_at > '2023-01-01' GROUP BY customer_id ORDER BY total DESC""")Streaming Results
import requests
def stream_query(sql: str): response = requests.post( "https://localhost:443/api/v1/query/stream", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json", "Accept": "application/x-ndjson" }, json={"sql": sql}, stream=True )
for line in response.iter_lines(): if line: row = json.loads(line) yield row
# Process large result setfor row in stream_query("SELECT * FROM large_table"): process_row(row)CRUD Operations
Create (Insert)
// JavaScript/Node.js exampleconst axios = require('axios');
async function insertRows(tableName, rows) { const response = await axios.post( `https://localhost:443/api/v1/tables/${tableName}/rows`, { rows }, { headers: { 'Authorization': 'Bearer YOUR_API_KEY', 'Content-Type': 'application/json' } } ); return response.data;}
// Single insertawait insertRows('users', [{ name: 'Alice', email: 'alice@example.com', created_at: new Date().toISOString()}]);
// Bulk insertawait insertRows('users', [ { name: 'Bob', email: 'bob@example.com' }, { name: 'Charlie', email: 'charlie@example.com' }, { name: 'Diana', email: 'diana@example.com' }]);Read (Query)
import requests
def get_rows(table_name: str, filters: dict = None, limit: int = 100): params = {"limit": limit} if filters: params["filter"] = " AND ".join( f"{k}='{v}'" for k, v in filters.items() )
response = requests.get( f"https://localhost:443/api/v1/tables/{table_name}/rows", headers={"Authorization": "Bearer YOUR_API_KEY"}, params=params ) return response.json()
# Get all active usersusers = get_rows("users", filters={"status": "active"}, limit=50)Update
import requests
def update_rows(table_name: str, filter_expr: str, updates: dict): response = requests.put( f"https://localhost:443/api/v1/tables/{table_name}/rows", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={ "filter": filter_expr, "updates": updates } ) return response.json()
# Update user statusresult = update_rows( "users", "id = 123", {"status": "inactive", "updated_at": "2024-01-15T10:00:00Z"})print(f"Updated {result['affected_rows']} rows")Delete
import requests
def delete_rows(table_name: str, filter_expr: str): response = requests.delete( f"https://localhost:443/api/v1/tables/{table_name}/rows", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={"filter": filter_expr} ) return response.json()
# Delete old sessionsresult = delete_rows("sessions", "expires_at < '2024-01-01'")print(f"Deleted {result['affected_rows']} rows")Vector Operations
Vector Search
import requestsimport numpy as np
def vector_search(vector: list, top_k: int = 10, filter_expr: str = None): payload = { "vector": vector, "top_k": top_k, "include_metadata": True } if filter_expr: payload["filter"] = filter_expr
response = requests.post( "https://localhost:443/api/v1/vectors/search", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json=payload ) return response.json()
# Search for similar documentsquery_embedding = [0.1, 0.2, 0.3, ...] # Your embeddingresults = vector_search(query_embedding, top_k=5)
for match in results["matches"]: print(f"ID: {match['id']}, Score: {match['score']}")Upsert Vectors
import requests
def upsert_vectors(vectors: list): response = requests.post( "https://localhost:443/api/v1/vectors/upsert", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={"vectors": vectors} ) return response.json()
# Upsert with metadatavectors = [ { "id": "doc1", "values": [0.1, 0.2, 0.3, ...], "metadata": {"title": "Document 1", "category": "tech"} }, { "id": "doc2", "values": [0.4, 0.5, 0.6, ...], "metadata": {"title": "Document 2", "category": "science"} }]result = upsert_vectors(vectors)print(f"Upserted {result['upserted_count']} vectors")Schema Management
Create Table
import requests
def create_table(name: str, schema: dict): response = requests.post( "https://localhost:443/api/v1/tables", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={ "name": name, "columns": schema["columns"], "primary_key": schema.get("primary_key"), "indexes": schema.get("indexes", []) } ) return response.json()
# Create users tablecreate_table("users", { "columns": [ {"name": "id", "type": "BIGINT", "nullable": False}, {"name": "name", "type": "VARCHAR(255)", "nullable": False}, {"name": "email", "type": "VARCHAR(255)", "nullable": False}, {"name": "created_at", "type": "TIMESTAMP", "default": "NOW()"} ], "primary_key": ["id"], "indexes": [ {"name": "idx_email", "columns": ["email"], "unique": True} ]})Create Index
import requests
def create_index(table_name: str, index_name: str, columns: list, index_type: str = "btree"): response = requests.post( "https://localhost:443/api/v1/indexes", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={ "table": table_name, "name": index_name, "columns": columns, "type": index_type } ) return response.json()
# Create vector indexcreate_index("documents", "idx_embedding", ["embedding"], "hnsw")Error Handling
import requests
def safe_query(sql: str): try: response = requests.post( "https://localhost:443/api/v1/query", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={"sql": sql}, timeout=30 ) response.raise_for_status() return response.json()
except requests.exceptions.HTTPError as e: if e.response.status_code == 400: error = e.response.json()["error"] print(f"Query error: {error['message']}") elif e.response.status_code == 429: retry_after = e.response.headers.get("Retry-After", 60) print(f"Rate limited. Retry after {retry_after} seconds") elif e.response.status_code == 401: print("Authentication failed") else: print(f"HTTP error: {e}") return None
except requests.exceptions.Timeout: print("Request timed out") return None
except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return NonePagination
import requests
def paginate_query(sql: str, page_size: int = 100): page = 1 while True: response = requests.post( "https://localhost:443/api/v1/query", headers={ "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" }, json={ "sql": sql, "limit": page_size, "offset": (page - 1) * page_size } ) result = response.json()
if not result["data"]: break
for row in result["data"]: yield row
if len(result["data"]) < page_size: break
page += 1
# Iterate through all ordersfor order in paginate_query("SELECT * FROM orders ORDER BY id"): process_order(order)Last Updated: January 2026