Performance Optimization¶
Comprehensive guide to optimizing Zenoo RPC performance for high-throughput applications, covering caching, connection pooling, batch operations, query optimization, and memory management.
Overview¶
Zenoo RPC is designed for performance from the ground up with:
- Async-First Architecture: Non-blocking I/O for maximum concurrency
- HTTP/2 Support: Connection multiplexing and header compression
- Intelligent Caching: Multi-tier caching with TTL, LRU, and LFU strategies
- Connection Pooling: Efficient resource management and reuse
- Batch Operations: Minimize network round trips
- Query Optimization: Fetch only what you need
Performance Benchmarks¶
Zenoo RPC vs odoorpc¶
| Operation | odoorpc | Zenoo RPC | Improvement |
|---|---|---|---|
| Simple Query | 45ms | 12ms | 3.75x faster |
| Batch Create (100) | 2.3s | 0.4s | 5.75x faster |
| Concurrent Queries | 890ms | 156ms | 5.7x faster |
| Memory Usage | 45MB | 18MB | 2.5x less |
| Connection Overhead | High | Low | HTTP/2 pooling |
Real-World Performance¶
# Performance comparison example
import time
import asyncio
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner
async def performance_demo():
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "admin")
# Setup performance optimizations
await client.setup_cache_manager(backend="memory", max_size=10000)
await client.setup_batch_manager(max_chunk_size=100)
start_time = time.time()
# Concurrent operations with caching
tasks = [
client.model(ResPartner).filter(is_company=True).cache(ttl=300).all(),
client.model(ResPartner).filter(customer_rank__gt=0).cache(ttl=300).all(),
client.model(ResPartner).filter(supplier_rank__gt=0).cache(ttl=300).all(),
]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start_time
print(f"Completed 3 concurrent queries in {elapsed:.2f}s")
print(f"Total records: {sum(len(result) for result in results)}")
Connection Optimization¶
HTTP/2 Connection Pooling¶
class OptimizedClient:
"""Client with optimized connection settings."""
def __init__(self, host: str, port: int = 8069):
self.client = ZenooClient(
host,
port=port,
# HTTP/2 for connection multiplexing
http2=True,
# Optimized connection pool
max_connections=100, # Total connections
max_keepalive_connections=20, # Persistent connections
keepalive_expiry=30.0, # Connection TTL
# Timeout optimization
timeout=30.0,
connect_timeout=10.0,
read_timeout=30.0,
# Compression
headers={
"Accept-Encoding": "gzip, deflate, br",
"User-Agent": "ZenooRPC/1.0 (Performance-Optimized)"
}
)
async def __aenter__(self):
await self.client.__aenter__()
return self.client
async def __aexit__(self, *args):
await self.client.__aexit__(*args)
# Usage
async with OptimizedClient("localhost") as client:
await client.login("demo", "admin", "admin")
# All operations use optimized connection pool
Connection Pool Management¶
from zenoo_rpc.transport.pool import ConnectionPool
class HighPerformancePool:
"""Enterprise-grade connection pool."""
def __init__(self, base_url: str):
self.pool = ConnectionPool(
base_url=base_url,
pool_size=20, # Initial pool size
max_connections=100, # Maximum connections
http2=True, # HTTP/2 multiplexing
timeout=30.0, # Request timeout
health_check_interval=30.0, # Health check frequency
max_error_rate=5.0, # Circuit breaker threshold
connection_ttl=300.0 # Connection lifetime
)
async def initialize(self):
"""Initialize the connection pool."""
await self.pool.initialize()
async def get_stats(self) -> dict:
"""Get pool performance statistics."""
return {
"pool_size": len(self.pool.connections),
"available": self.pool.available_connections.qsize(),
"stats": self.pool.stats,
"circuit_breaker": self.pool.circuit_breaker.state.value
}
# Usage
pool = HighPerformancePool("http://localhost:8069")
await pool.initialize()
async with pool.pool.get_connection() as client:
response = await client.post("/jsonrpc", json=rpc_data)
Connection Reuse Patterns¶
# ✅ Optimal: Single client for multiple operations
async def efficient_operations():
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "admin")
# All operations reuse the same connection pool
partners = await client.model(ResPartner).filter(is_company=True).all()
products = await client.model(ProductProduct).filter(active=True).all()
orders = await client.model(SaleOrder).filter(state="sale").all()
return partners, products, orders
# ❌ Inefficient: Multiple clients
async def inefficient_operations():
# Creates new connection pool for each client
async with ZenooClient("localhost", port=8069) as client1:
await client1.login("demo", "admin", "admin")
partners = await client1.model(ResPartner).all()
async with ZenooClient("localhost", port=8069) as client2:
await client2.login("demo", "admin", "admin")
products = await client2.model(ProductProduct).all()
Caching Strategies¶
Multi-Tier Caching¶
class TieredCacheManager:
"""Multi-tier caching for maximum performance."""
def __init__(self, client: ZenooClient):
self.client = client
async def setup_caching(self):
"""Setup multi-tier caching strategy."""
# L1: Memory cache (fastest, smallest)
await self.client.setup_cache_manager(
backend="memory",
max_size=1000, # Small, hot data
default_ttl=60, # Short TTL for freshness
strategy="lru" # LRU eviction
)
# L2: Redis cache (shared, larger)
await self.client.setup_cache_manager(
backend="redis",
url="redis://localhost:6379",
max_size=10000, # Larger capacity
default_ttl=300, # Longer TTL
strategy="ttl" # TTL-based eviction
)
async def get_with_fallback(self, key: str) -> Any:
"""Get data with cache fallback."""
# Try L1 cache first
value = await self.client.cache_manager.get(key, backend="memory")
if value is not None:
return value
# Try L2 cache
value = await self.client.cache_manager.get(key, backend="redis")
if value is not None:
# Populate L1 cache
await self.client.cache_manager.set(key, value, ttl=60, backend="memory")
return value
# Cache miss - fetch from source
return None
Cache-Optimized Queries¶
class CacheOptimizedService:
"""Service with intelligent caching patterns."""
def __init__(self, client: ZenooClient):
self.client = client
async def get_companies(self, use_cache: bool = True) -> List[ResPartner]:
"""Get companies with intelligent caching."""
query = self.client.model(ResPartner).filter(is_company=True)
if use_cache:
# Cache for 5 minutes with LRU eviction
query = query.cache(ttl=300)
return await query.all()
async def get_partner_details(self, partner_id: int) -> ResPartner:
"""Get partner with relationship prefetching and caching."""
return await (
self.client.model(ResPartner)
.filter(id=partner_id)
.prefetch_related("category_id", "country_id", "state_id")
.cache(ttl=600) # Cache for 10 minutes
.first()
)
async def search_partners(self, query: str, limit: int = 20) -> List[ResPartner]:
"""Search partners with result caching."""
# Cache search results for 2 minutes
return await (
self.client.model(ResPartner)
.filter(name__ilike=f"%{query}%")
.limit(limit)
.only("id", "name", "email", "phone") # Reduce data transfer
.cache(ttl=120)
.all()
)
Cache Performance Monitoring¶
class CacheMonitor:
"""Monitor cache performance and hit rates."""
def __init__(self, cache_manager):
self.cache_manager = cache_manager
async def get_performance_metrics(self) -> dict:
"""Get comprehensive cache metrics."""
stats = await self.cache_manager.get_stats()
hit_rate = (
stats["hits"] / (stats["hits"] + stats["misses"])
if (stats["hits"] + stats["misses"]) > 0
else 0
)
return {
"hit_rate": f"{hit_rate:.2%}",
"total_requests": stats["hits"] + stats["misses"],
"cache_size": stats["size"],
"memory_usage": stats.get("memory_usage", "N/A"),
"evictions": stats.get("evictions", 0),
"avg_response_time": stats.get("avg_response_time", 0)
}
async def optimize_cache_settings(self):
"""Auto-optimize cache settings based on metrics."""
metrics = await self.get_performance_metrics()
hit_rate = float(metrics["hit_rate"].rstrip('%')) / 100
if hit_rate < 0.7: # Low hit rate
# Increase cache size and TTL
await self.cache_manager.configure(
max_size=self.cache_manager.config["max_size"] * 1.5,
default_ttl=self.cache_manager.config["default_ttl"] * 1.2
)
elif hit_rate > 0.95: # Very high hit rate
# Can reduce cache size to save memory
await self.cache_manager.configure(
max_size=self.cache_manager.config["max_size"] * 0.8
)
Batch Operations¶
High-Performance Batch Processing¶
class BatchProcessor:
"""High-performance batch operation processor."""
def __init__(self, client: ZenooClient):
self.client = client
async def setup_batch_processing(self):
"""Setup optimized batch processing."""
await self.client.setup_batch_manager(
max_chunk_size=100, # Optimal chunk size
max_concurrency=10, # Concurrent operations
retry_attempts=3, # Retry failed operations
progress_callback=self._progress_callback
)
async def bulk_create_partners(self, partner_data: List[dict]) -> List[int]:
"""Bulk create partners with optimal performance."""
async with self.client.batch_context() as batch:
# Split into optimal chunks automatically
create_op = batch.create("res.partner", partner_data)
# Execute with progress tracking
results = await batch.execute()
return results[0].result # Created IDs
async def bulk_update_with_different_data(self, updates: List[dict]) -> List[dict]:
"""Bulk update with individual data per record."""
async with self.client.batch_context(max_chunk_size=50) as batch:
# Individual updates for different data
update_op = batch.update("res.partner", updates)
results = await batch.execute()
return results[0].result
async def parallel_batch_operations(self, data_sets: List[List[dict]]) -> List[List[int]]:
"""Execute multiple batch operations in parallel."""
tasks = []
for data_set in data_sets:
task = self.bulk_create_partners(data_set)
tasks.append(task)
# Execute all batches concurrently
return await asyncio.gather(*tasks)
def _progress_callback(self, completed: int, total: int):
"""Progress callback for batch operations."""
percentage = (completed / total) * 100
print(f"Batch progress: {completed}/{total} ({percentage:.1f}%)")
Memory-Efficient Streaming¶
class StreamingProcessor:
"""Memory-efficient streaming for large datasets."""
def __init__(self, client: ZenooClient):
self.client = client
async def stream_large_dataset(self, model_name: str, batch_size: int = 1000):
"""Stream large dataset without loading everything into memory."""
offset = 0
while True:
# Fetch batch
batch = await (
self.client.model_by_name(model_name)
.limit(batch_size)
.offset(offset)
.all()
)
if not batch:
break # No more data
# Process batch
await self.process_batch(batch)
# Update offset
offset += batch_size
# Optional: yield control to event loop
await asyncio.sleep(0)
async def process_batch(self, batch: List[Any]):
"""Process a batch of records."""
# Process records in batch
for record in batch:
await self.process_record(record)
# Clear batch from memory
del batch
async def process_record(self, record: Any):
"""Process individual record."""
# Your processing logic here
pass
Query Optimization¶
Efficient Query Patterns¶
class OptimizedQueryService:
"""Service with optimized query patterns."""
def __init__(self, client: ZenooClient):
self.client = client
async def get_companies_with_contacts(self) -> List[ResPartner]:
"""Get companies with their contacts efficiently."""
# ✅ Optimal: Single query with prefetch
return await (
self.client.model(ResPartner)
.filter(is_company=True)
.prefetch_related("child_ids") # Prefetch contacts
.only("id", "name", "email", "child_ids") # Select only needed fields
.cache(ttl=300)
.all()
)
async def get_partner_summary(self, partner_ids: List[int]) -> List[dict]:
"""Get partner summary with minimal data transfer."""
return await (
self.client.model(ResPartner)
.filter(id__in=partner_ids)
.only("id", "name", "email", "phone", "is_company") # Minimal fields
.values() # Return as dictionaries
)
async def search_with_pagination(self, search_term: str, page: int = 1, page_size: int = 20):
"""Efficient pagination with search."""
offset = (page - 1) * page_size
# Get total count efficiently
total_count = await (
self.client.model(ResPartner)
.filter(name__ilike=f"%{search_term}%")
.count()
)
# Get page data
results = await (
self.client.model(ResPartner)
.filter(name__ilike=f"%{search_term}%")
.limit(page_size)
.offset(offset)
.only("id", "name", "email")
.cache(ttl=60) # Short cache for search results
.all()
)
return {
"results": results,
"total_count": total_count,
"page": page,
"page_size": page_size,
"total_pages": (total_count + page_size - 1) // page_size
}
N+1 Query Prevention¶
class RelationshipOptimizer:
"""Prevent N+1 queries with relationship optimization."""
def __init__(self, client: ZenooClient):
self.client = client
async def get_orders_with_details(self) -> List[dict]:
"""Get orders with customer and line details efficiently."""
# ✅ Optimal: Prefetch all relationships
orders = await (
self.client.model(SaleOrder)
.filter(state="sale")
.prefetch_related(
"partner_id", # Customer
"order_line", # Order lines
"order_line.product_id" # Products in lines
)
.select_related("partner_id") # Join customer data
.all()
)
# No additional queries needed - all data is prefetched
return [
{
"order_id": order.id,
"customer_name": order.partner_id.name,
"total_amount": order.amount_total,
"line_count": len(order.order_line),
"products": [line.product_id.name for line in order.order_line]
}
for order in orders
]
async def get_partners_with_categories(self) -> List[dict]:
"""Get partners with their categories efficiently."""
# ✅ Optimal: Batch load categories
partners = await (
self.client.model(ResPartner)
.filter(is_company=True)
.prefetch_related("category_id")
.all()
)
return [
{
"partner_id": partner.id,
"name": partner.name,
"categories": [cat.name for cat in partner.category_id]
}
for partner in partners
]
Memory Management¶
Memory-Efficient Processing¶
class MemoryOptimizer:
"""Memory optimization techniques."""
def __init__(self, client: ZenooClient):
self.client = client
async def process_large_dataset_efficiently(self, model_name: str):
"""Process large dataset with minimal memory usage."""
# Use generator pattern for memory efficiency
async for batch in self._batch_generator(model_name, batch_size=500):
# Process batch
await self._process_batch_memory_efficient(batch)
# Explicit cleanup
del batch
# Yield control to garbage collector
await asyncio.sleep(0)
async def _batch_generator(self, model_name: str, batch_size: int = 500):
"""Generator for memory-efficient batch processing."""
offset = 0
while True:
batch = await (
self.client.model_by_name(model_name)
.limit(batch_size)
.offset(offset)
.values() # Use values() for lower memory usage
)
if not batch:
break
yield batch
offset += batch_size
async def _process_batch_memory_efficient(self, batch: List[dict]):
"""Process batch with memory efficiency."""
# Process records one by one to minimize peak memory
for record in batch:
await self._process_record(record)
# Record is automatically garbage collected
async def _process_record(self, record: dict):
"""Process individual record."""
# Your processing logic here
pass
Connection Pool Optimization¶
class ResourceManager:
"""Optimize resource usage and cleanup."""
def __init__(self):
self.active_connections = {}
self.connection_stats = {}
async def get_optimized_client(self, key: str) -> ZenooClient:
"""Get or create optimized client with resource tracking."""
if key not in self.active_connections:
client = ZenooClient(
"localhost",
port=8069,
# Optimized settings
max_connections=50,
max_keepalive_connections=10,
timeout=30.0,
# Memory optimization
headers={"Connection": "keep-alive"}
)
self.active_connections[key] = client
self.connection_stats[key] = {
"created_at": time.time(),
"requests": 0,
"errors": 0
}
return self.active_connections[key]
async def cleanup_idle_connections(self, max_idle_time: float = 300.0):
"""Clean up idle connections to free resources."""
current_time = time.time()
for key, stats in list(self.connection_stats.items()):
if current_time - stats["created_at"] > max_idle_time:
# Close idle connection
client = self.active_connections.pop(key, None)
if client:
await client.close()
del self.connection_stats[key]
print(f"Cleaned up idle connection: {key}")
async def get_resource_stats(self) -> dict:
"""Get resource usage statistics."""
return {
"active_connections": len(self.active_connections),
"total_requests": sum(stats["requests"] for stats in self.connection_stats.values()),
"total_errors": sum(stats["errors"] for stats in self.connection_stats.values()),
"memory_usage": self._estimate_memory_usage()
}
def _estimate_memory_usage(self) -> str:
"""Estimate memory usage of active connections."""
# Simplified estimation
base_memory_per_connection = 1024 * 1024 # 1MB per connection
total_memory = len(self.active_connections) * base_memory_per_connection
return f"{total_memory / (1024 * 1024):.1f} MB"
Performance Monitoring¶
Real-Time Performance Metrics¶
class PerformanceMonitor:
"""Monitor and track performance metrics."""
def __init__(self):
self.metrics = {
"request_count": 0,
"total_response_time": 0.0,
"error_count": 0,
"cache_hits": 0,
"cache_misses": 0
}
async def track_operation(self, operation_name: str, operation_func):
"""Track operation performance."""
start_time = time.time()
try:
result = await operation_func()
# Record success metrics
response_time = time.time() - start_time
self.metrics["request_count"] += 1
self.metrics["total_response_time"] += response_time
print(f"{operation_name}: {response_time:.3f}s")
return result
except Exception as e:
# Record error metrics
self.metrics["error_count"] += 1
print(f"{operation_name} failed: {e}")
raise
def get_performance_summary(self) -> dict:
"""Get performance summary."""
avg_response_time = (
self.metrics["total_response_time"] / self.metrics["request_count"]
if self.metrics["request_count"] > 0
else 0
)
error_rate = (
self.metrics["error_count"] / self.metrics["request_count"]
if self.metrics["request_count"] > 0
else 0
)
cache_hit_rate = (
self.metrics["cache_hits"] / (self.metrics["cache_hits"] + self.metrics["cache_misses"])
if (self.metrics["cache_hits"] + self.metrics["cache_misses"]) > 0
else 0
)
return {
"total_requests": self.metrics["request_count"],
"avg_response_time": f"{avg_response_time:.3f}s",
"error_rate": f"{error_rate:.2%}",
"cache_hit_rate": f"{cache_hit_rate:.2%}",
"requests_per_second": self._calculate_rps()
}
def _calculate_rps(self) -> float:
"""Calculate requests per second."""
# Simplified calculation
return self.metrics["request_count"] / max(self.metrics["total_response_time"], 1)
Best Practices Summary¶
1. Connection Management¶
# ✅ Optimal connection setup
client = ZenooClient(
"localhost",
port=8069,
http2=True, # Enable HTTP/2
max_connections=100, # Adequate pool size
max_keepalive_connections=20, # Persistent connections
timeout=30.0 # Reasonable timeout
)
2. Query Optimization¶
# ✅ Efficient query patterns
partners = await (
client.model(ResPartner)
.filter(is_company=True)
.only("id", "name", "email") # Select only needed fields
.prefetch_related("category_id") # Prevent N+1 queries
.cache(ttl=300) # Cache results
.limit(100) # Reasonable limits
.all()
)
3. Batch Operations¶
# ✅ Efficient batch processing
async with client.batch_context(max_chunk_size=100) as batch:
batch.create("res.partner", partner_data)
results = await batch.execute()
4. Memory Management¶
# ✅ Memory-efficient processing
async for batch in stream_large_dataset(model_name, batch_size=500):
await process_batch(batch)
del batch # Explicit cleanup
5. Caching Strategy¶
# ✅ Multi-tier caching
await client.setup_cache_manager(
backend="memory",
max_size=1000,
default_ttl=300,
strategy="lru"
)
Next Steps¶
- Explore Security Considerations for production deployments
- Learn about Extension Points for custom optimizations
- Check Monitoring Setup for performance tracking
- Review Architecture Overview for system design patterns