Redis RESP3 Examples
Redis RESP3 Examples
Practical examples for HeliosDB’s Redis RESP3 protocol support.
Connection Examples
Python (redis-py)
import redis
# Basic connectionclient = redis.Redis( host='localhost', port=6379, protocol=3, decode_responses=True)
# Test connectionprint(client.ping()) # TrueNode.js (node-redis)
const { createClient } = require('redis');
const client = createClient({ url: 'redis://localhost:6379'});
await client.connect();console.log(await client.ping()); // PONGGo (go-redis)
import "github.com/redis/go-redis/v9"
client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Protocol: 3,})
ctx := context.Background()pong, _ := client.Ping(ctx).Result()fmt.Println(pong) // PONGString Operations
Basic CRUD
# SET and GETclient.set('user:1:name', 'Alice')name = client.get('user:1:name') # 'Alice'
# SET with optionsclient.set('session:xyz', 'token123', ex=3600) # Expire in 1 hourclient.set('lock:resource', 'owner1', nx=True) # Set if not exists
# Multi-set operationsclient.mset({ 'user:1:name': 'Alice', 'user:1:email': 'alice@example.com', 'user:1:age': '28'})
values = client.mget('user:1:name', 'user:1:email', 'user:1:age')# ['Alice', 'alice@example.com', '28']Counters
# Increment/Decrementclient.set('counter', '10')client.incr('counter') # 11client.incrby('counter', 5) # 16client.decr('counter') # 15client.incrbyfloat('score', 0.5)List Operations
Queue Implementation
# Push operationsclient.rpush('queue:jobs', 'job1', 'job2', 'job3')
# Pop operationsjob = client.rpop('queue:jobs') # FIFO
# Blocking pop (wait for item)job = client.brpop('queue:jobs', timeout=10)
# Multiple queue blockingresult = client.brpop(['queue:1', 'queue:2', 'queue:3'], timeout=10)List Range Queries
# Get rangeitems = client.lrange('queue:jobs', 0, -1) # All itemsfirst_5 = client.lrange('queue:jobs', 0, 4) # First 5
# Trimclient.ltrim('queue:jobs', 0, 99) # Keep first 100Set Operations
Membership Testing
# Add to setclient.sadd('users:online', 'alice', 'bob', 'charlie')
# Check membershipis_online = client.sismember('users:online', 'alice') # True
# Get all membersmembers = client.smembers('users:online')Set Algebra
client.sadd('users:premium', 'alice', 'dave')client.sadd('users:active', 'bob', 'charlie', 'dave')
# Unioncombined = client.sunion('users:premium', 'users:active')# {'alice', 'bob', 'charlie', 'dave'}
# Intersectionboth = client.sinter('users:premium', 'users:active')# {'dave'}
# Differencediff = client.sdiff('users:premium', 'users:active')# {'alice'}Sorted Set Operations
Leaderboard
# Add with scoresclient.zadd('leaderboard:monthly', { 'alice': 1500, 'bob': 1200, 'charlie': 1800})
# Increment scoreclient.zincrby('leaderboard:monthly', 100, 'alice')
# Get top 3top_3 = client.zrevrange('leaderboard:monthly', 0, 2, withscores=True)# [(b'charlie', 1800.0), (b'alice', 1600.0), (b'bob', 1200.0)]
# Get rankrank = client.zrevrank('leaderboard:monthly', 'alice') # 1 (0-indexed)Hash Operations
Object Storage
# Set hash fieldsclient.hset('user:1', mapping={ 'name': 'Alice', 'email': 'alice@example.com', 'age': '28', 'city': 'NYC'})
# Get single fieldname = client.hget('user:1', 'name')
# Get all fieldsuser_data = client.hgetall('user:1')
# Increment numeric fieldclient.hincrby('user:1', 'visits', 1)Pub/Sub Messaging
Publisher
def publisher(client): import time for i in range(5): client.publish('news:sports', f'Score update {i}') time.sleep(1)Subscriber
def subscriber(client): pubsub = client.pubsub() pubsub.subscribe('news:sports')
for message in pubsub.listen(): if message['type'] == 'message': print(f"Received: {message['data']}")Pattern Subscription
pubsub = client.pubsub()pubsub.psubscribe('news:*') # All news channels
for message in pubsub.listen(): if message['type'] == 'pmessage': print(f"Channel: {message['channel']}, Data: {message['data']}")Streams
Basic Stream Operations
# Add to streamstream_id = client.xadd('events', { 'type': 'purchase', 'user': 'alice', 'amount': '99.99'})
# Read from streamevents = client.xread({'events': '0'}, count=10)Consumer Groups
# Create consumer groupclient.xgroup_create('events', 'mygroup', id='0', mkstream=True)
# Read as consumerevents = client.xreadgroup( groupname='mygroup', consumername='consumer1', streams={'events': '>'}, count=10)
# Acknowledge processingfor stream, messages in events: for msg_id, data in messages: # Process message client.xack('events', 'mygroup', msg_id)Transactions
Basic Transaction
# Start transactionpipe = client.pipeline()pipe.incr('counter')pipe.hset('stats', 'last_update', time.time())pipe.lpush('history', 'update')results = pipe.execute()Optimistic Locking
with client.pipeline() as pipe: while True: try: pipe.watch('balance') balance = int(client.get('balance') or 0)
if balance >= 100: pipe.multi() pipe.decrby('balance', 100) pipe.execute() break else: pipe.unwatch() raise Exception('Insufficient balance') except redis.WatchError: continue # RetryLua Scripting
Basic Script
# Rate limiter scriptrate_limit_script = """local key = KEYS[1]local limit = tonumber(ARGV[1])local window = tonumber(ARGV[2])local current = tonumber(redis.call('GET', key) or 0)
if current < limit then redis.call('INCR', key) redis.call('EXPIRE', key, window) return 1else return 0end"""
script = client.register_script(rate_limit_script)result = script(keys=['rate:user:123'], args=[100, 60])Real-World Patterns
Rate Limiter
class RateLimiter: def __init__(self, client, limit=100, window=60): self.client = client self.limit = limit self.window = window
def is_allowed(self, user_id): key = f'rate:{user_id}' current = self.client.incr(key) if current == 1: self.client.expire(key, self.window) return current <= self.limit
limiter = RateLimiter(client, limit=100, window=60)if limiter.is_allowed('user:123'): # Process request passDistributed Lock
import uuid
class DistributedLock: def __init__(self, client, name, timeout=10): self.client = client self.name = f'lock:{name}' self.token = str(uuid.uuid4()) self.timeout = timeout
def acquire(self): return self.client.set( self.name, self.token, nx=True, ex=self.timeout )
def release(self): if self.client.get(self.name) == self.token: self.client.delete(self.name)
def __enter__(self): if not self.acquire(): raise Exception('Could not acquire lock') return self
def __exit__(self, *args): self.release()
with DistributedLock(client, 'critical_resource'): # Critical section passSession Manager
import jsonimport uuidimport time
class SessionManager: def __init__(self, client, ttl=3600): self.client = client self.ttl = ttl
def create(self, user_id, data): session_id = str(uuid.uuid4()) session_key = f'session:{session_id}'
self.client.hset(session_key, mapping={ 'user_id': user_id, 'created': time.time(), 'data': json.dumps(data) }) self.client.expire(session_key, self.ttl)
# Index by user self.client.sadd(f'user_sessions:{user_id}', session_id)
return session_id
def get(self, session_id): session_key = f'session:{session_id}' data = self.client.hgetall(session_key) if data: data['data'] = json.loads(data.get('data', '{}')) return data
def destroy(self, session_id): session = self.get(session_id) if session: user_id = session['user_id'] self.client.delete(f'session:{session_id}') self.client.srem(f'user_sessions:{user_id}', session_id)Job Queue
import jsonimport time
class JobQueue: def __init__(self, client, name): self.client = client self.name = name self.pending = f'queue:{name}:pending' self.processing = f'queue:{name}:processing' self.failed = f'queue:{name}:failed'
def enqueue(self, job_data, priority=0): job_id = str(uuid.uuid4()) job = { 'id': job_id, 'data': job_data, 'created': time.time(), 'attempts': 0 } self.client.zadd(self.pending, {json.dumps(job): priority}) return job_id
def dequeue(self, timeout=0): # Move from pending to processing result = self.client.bzpopmin(self.pending, timeout=timeout) if result: job = json.loads(result[1]) job['attempts'] += 1 self.client.hset(self.processing, job['id'], json.dumps(job)) return job return None
def complete(self, job_id): self.client.hdel(self.processing, job_id)
def fail(self, job_id, max_retries=3): job_json = self.client.hget(self.processing, job_id) if job_json: job = json.loads(job_json) self.client.hdel(self.processing, job_id)
if job['attempts'] < max_retries: # Re-queue with delay self.client.zadd( self.pending, {json.dumps(job): time.time() + 60} ) else: # Move to failed queue self.client.lpush(self.failed, json.dumps(job))Geospatial Examples
Location-Based Services
# Add locationsclient.geoadd('restaurants', [ (-122.4194, 37.7749, 'sf_diner'), (-122.4094, 37.7849, 'sf_cafe'), (-122.4294, 37.7649, 'sf_bistro')])
# Find nearbynearby = client.georadius( 'restaurants', longitude=-122.4194, latitude=37.7749, radius=5, unit='km', withdist=True)
# Get distancedistance = client.geodist('restaurants', 'sf_diner', 'sf_cafe', unit='km')HyperLogLog Examples
Unique Visitors
# Add visitorsfor user_id in ['user:1', 'user:2', 'user:3', 'user:1']: # user:1 duplicate client.pfadd('unique_visitors:today', user_id)
# Count uniquecount = client.pfcount('unique_visitors:today') # 3
# Merge multiple daysclient.pfmerge( 'unique_visitors:week', 'unique_visitors:mon', 'unique_visitors:tue', 'unique_visitors:wed')Pipelining for Performance
# Batch operationspipe = client.pipeline(transaction=False)for i in range(1000): pipe.set(f'key:{i}', f'value:{i}')results = pipe.execute()
# Mixed operationspipe = client.pipeline()pipe.incr('counter')pipe.lpush('log', 'entry')pipe.hset('stats', 'last', time.time())pipe.expire('temp_key', 300)results = pipe.execute()Related: README.md | CONFIGURATION.md | COMPATIBILITY.md
Last Updated: December 2025