MongoDB Protocol Examples
MongoDB Protocol Examples
Practical code examples and usage patterns for HeliosDB’s MongoDB compatibility.
Connection Examples
Python (PyMongo)
from pymongo import MongoClient
# Basic connectionclient = MongoClient("mongodb://localhost:27017/")db = client.my_databasecollection = db.my_collection
# With authenticationclient = MongoClient( "mongodb://user:password@localhost:27017/", authSource="admin", authMechanism="SCRAM-SHA-256")
# With TLSclient = MongoClient( "mongodb://localhost:27017/", tls=True, tlsCAFile="/path/to/ca.pem")Node.js
const { MongoClient } = require('mongodb');
async function connect() { const client = new MongoClient('mongodb://localhost:27017/', { auth: { username: 'user', password: 'password' }, authSource: 'admin', maxPoolSize: 50 });
await client.connect(); const db = client.db('my_database'); return db;}Java
import com.mongodb.client.MongoClients;import com.mongodb.client.MongoClient;import com.mongodb.client.MongoDatabase;
MongoClient client = MongoClients.create( "mongodb://user:password@localhost:27017/?authSource=admin");MongoDatabase database = client.getDatabase("my_database");CRUD Operations
Insert Documents
# Insert oneresult = collection.insert_one({ "name": "Alice", "email": "alice@example.com", "age": 30, "created_at": datetime.now()})print(f"Inserted: {result.inserted_id}")
# Insert manydocs = [ {"name": "Bob", "age": 25}, {"name": "Charlie", "age": 35}, {"name": "Diana", "age": 28}]result = collection.insert_many(docs)print(f"Inserted {len(result.inserted_ids)} documents")Find Documents
# Find oneuser = collection.find_one({"name": "Alice"})
# Find many with filteradults = collection.find({"age": {"$gte": 18}})for user in adults: print(user)
# With projectionusers = collection.find( {"age": {"$gte": 21}}, {"name": 1, "email": 1, "_id": 0})
# With sorting and paginationresults = collection.find({"status": "active"}) \ .sort("created_at", -1) \ .skip(10) \ .limit(5)Update Documents
# Update onecollection.update_one( {"_id": ObjectId("...")}, {"$set": {"status": "active"}})
# Update manycollection.update_many( {"category": "electronics"}, {"$inc": {"view_count": 1}})
# Upsertcollection.update_one( {"email": "new@example.com"}, {"$set": {"name": "New User"}}, upsert=True)
# Array operationscollection.update_one( {"_id": user_id}, {"$push": {"tags": "premium"}})
collection.update_one( {"_id": user_id}, {"$pull": {"notifications": {"read": True}}})Delete Documents
# Delete onecollection.delete_one({"_id": ObjectId("...")})
# Delete manyresult = collection.delete_many({"status": "archived"})print(f"Deleted {result.deleted_count} documents")Query Operators
Comparison Operators
# Greater thancollection.find({"age": {"$gt": 21}})
# Between (range)collection.find({"age": {"$gte": 18, "$lte": 65}})
# In listcollection.find({"status": {"$in": ["active", "pending"]}})
# Not equalcollection.find({"role": {"$ne": "admin"}})Logical Operators
# AND (implicit)collection.find({"age": {"$gte": 18}, "status": "active"})
# ORcollection.find({ "$or": [ {"role": "admin"}, {"permissions": {"$in": ["manage_users"]}} ]})
# AND with ORcollection.find({ "status": "active", "$or": [ {"age": {"$lt": 18}}, {"guardian": {"$exists": True}} ]})Array Operators
# Contains all elementscollection.find({"tags": {"$all": ["mongodb", "database"]}})
# Array element matchescollection.find({ "items": { "$elemMatch": { "price": {"$gt": 100}, "quantity": {"$gte": 5} } }})
# Array sizecollection.find({"tags": {"$size": 3}})Text Search
# Create text indexcollection.create_index([("content", "text"), ("title", "text")])
# Text searchcollection.find({"$text": {"$search": "mongodb database"}})
# With scorecollection.find( {"$text": {"$search": "mongodb"}}, {"score": {"$meta": "textScore"}}).sort([("score", {"$meta": "textScore"})])Aggregation Pipeline
Basic Aggregation
# Group and countpipeline = [ {"$match": {"status": "completed"}}, {"$group": { "_id": "$category", "total": {"$sum": "$amount"}, "count": {"$sum": 1}, "average": {"$avg": "$amount"} }}, {"$sort": {"total": -1}}, {"$limit": 10}]results = collection.aggregate(pipeline)Lookup (Join)
# Join orders with customerspipeline = [ {"$lookup": { "from": "customers", "localField": "customer_id", "foreignField": "_id", "as": "customer" }}, {"$unwind": "$customer"}, {"$project": { "order_id": 1, "total": 1, "customer_name": "$customer.name", "customer_email": "$customer.email" }}]Advanced Lookup with Pipeline
pipeline = [ {"$lookup": { "from": "products", "let": {"order_items": "$items"}, "pipeline": [ {"$match": { "$expr": {"$in": ["$_id", "$$order_items"]} }}, {"$project": {"name": 1, "price": 1}} ], "as": "product_details" }}]Faceted Aggregation
pipeline = [ {"$facet": { "by_category": [ {"$group": {"_id": "$category", "count": {"$sum": 1}}} ], "by_price_range": [ {"$bucket": { "groupBy": "$price", "boundaries": [0, 50, 100, 500, 1000], "default": "Other", "output": {"count": {"$sum": 1}} }} ], "statistics": [ {"$group": { "_id": None, "total_products": {"$sum": 1}, "avg_price": {"$avg": "$price"}, "max_price": {"$max": "$price"} }} ] }}]Window Functions
pipeline = [ {"$setWindowFields": { "partitionBy": "$department", "sortBy": {"salary": -1}, "output": { "rank": {"$rank": {}}, "dept_avg": { "$avg": "$salary", "window": {"documents": ["unbounded", "unbounded"]} } } }}]Change Streams
Watch Collection
# Basic change streamwith collection.watch() as stream: for change in stream: print(f"Operation: {change['operationType']}") if 'fullDocument' in change: print(f"Document: {change['fullDocument']}")With Pipeline Filter
pipeline = [ {"$match": { "operationType": {"$in": ["insert", "update"]}, "fullDocument.priority": "high" }}]
with collection.watch(pipeline) as stream: for change in stream: process_high_priority(change)Resume After Disconnect
resume_token = None
try: with collection.watch() as stream: for change in stream: process(change) resume_token = change['_id']except Exception: # Resume from last position with collection.watch(resume_after=resume_token) as stream: for change in stream: process(change)Async Change Stream (Motor)
import motor.motor_asyncio
async def watch_changes(): client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017/') collection = client.mydb.mycollection
async with collection.watch() as stream: async for change in stream: print(change)Transactions
Basic Transaction
with client.start_session() as session: with session.start_transaction(): # Transfer money accounts.update_one( {"_id": "account_a"}, {"$inc": {"balance": -100}}, session=session ) accounts.update_one( {"_id": "account_b"}, {"$inc": {"balance": 100}}, session=session ) # Transaction commits automatically on exitWith Error Handling
def transfer_with_retry(session): while True: try: with session.start_transaction(): # Debit accounts.update_one( {"_id": "from_account", "balance": {"$gte": 100}}, {"$inc": {"balance": -100}}, session=session ) # Credit accounts.update_one( {"_id": "to_account"}, {"$inc": {"balance": 100}}, session=session ) # Log transaction transactions.insert_one({ "from": "from_account", "to": "to_account", "amount": 100, "timestamp": datetime.now() }, session=session) return # Success except (ConnectionFailure, OperationFailure) as e: if e.has_error_label("TransientTransactionError"): continue # Retry raise
with client.start_session() as session: transfer_with_retry(session)Index Management
Create Indexes
# Single fieldcollection.create_index("email")
# Compoundcollection.create_index([("category", 1), ("price", -1)])
# Uniquecollection.create_index("username", unique=True)
# Sparsecollection.create_index("optional_field", sparse=True)
# TTL (expire after 1 hour)collection.create_index("created_at", expireAfterSeconds=3600)
# Text indexcollection.create_index([("title", "text"), ("content", "text")])
# Geospatialcollection.create_index([("location", "2dsphere")])Manage Indexes
# List indexesfor index in collection.list_indexes(): print(index)
# Drop indexcollection.drop_index("email_1")
# Drop all indexescollection.drop_indexes()Migration from MongoDB
Export/Import
# Export from MongoDBmongodump --uri="mongodb://source:27017/mydb" --out=/backup
# Import to HeliosDBmongorestore --uri="mongodb://heliosdb:27017/mydb" /backupConnection String Migration
import os
# Environment-based connectionMONGODB_URI = os.environ.get( "MONGODB_URI", "mongodb://localhost:27017/" # Default to HeliosDB)
client = MongoClient(MONGODB_URI)Compatibility Check
def check_compatibility(): """Verify HeliosDB MongoDB compatibility""" client = MongoClient("mongodb://localhost:27017/") db = client.test_db
# Test CRUD result = db.test.insert_one({"test": True}) assert result.inserted_id
doc = db.test.find_one({"test": True}) assert doc is not None
db.test.update_one({"test": True}, {"$set": {"verified": True}}) db.test.delete_one({"test": True})
# Test aggregation db.test.insert_many([{"x": i} for i in range(10)]) result = list(db.test.aggregate([ {"$group": {"_id": None, "sum": {"$sum": "$x"}}} ])) assert result[0]["sum"] == 45
print("All compatibility checks passed!")Related: README.md | CONFIGURATION.md | COMPATIBILITY.md
Last Updated: December 2025