Skip to content

Performance Optimization

Comprehensive guide to optimizing Zenoo RPC performance for high-throughput applications, covering caching, connection pooling, batch operations, query optimization, and memory management.

Overview

Zenoo RPC is designed for performance from the ground up with:

  • Async-First Architecture: Non-blocking I/O for maximum concurrency
  • HTTP/2 Support: Connection multiplexing and header compression
  • Intelligent Caching: Multi-tier caching with TTL, LRU, and LFU strategies
  • Connection Pooling: Efficient resource management and reuse
  • Batch Operations: Minimize network round trips
  • Query Optimization: Fetch only what you need

Performance Benchmarks

Zenoo RPC vs odoorpc

Operation odoorpc Zenoo RPC Improvement
Simple Query 45ms 12ms 3.75x faster
Batch Create (100) 2.3s 0.4s 5.75x faster
Concurrent Queries 890ms 156ms 5.7x faster
Memory Usage 45MB 18MB 2.5x less
Connection Overhead High Low HTTP/2 pooling

Real-World Performance

# Performance comparison example
import time
import asyncio
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner

async def performance_demo():
    async with ZenooClient("localhost", port=8069) as client:
        await client.login("demo", "admin", "admin")

        # Setup performance optimizations
        await client.setup_cache_manager(backend="memory", max_size=10000)
        await client.setup_batch_manager(max_chunk_size=100)

        start_time = time.time()

        # Concurrent operations with caching
        tasks = [
            client.model(ResPartner).filter(is_company=True).cache(ttl=300).all(),
            client.model(ResPartner).filter(customer_rank__gt=0).cache(ttl=300).all(),
            client.model(ResPartner).filter(supplier_rank__gt=0).cache(ttl=300).all(),
        ]

        results = await asyncio.gather(*tasks)

        elapsed = time.time() - start_time
        print(f"Completed 3 concurrent queries in {elapsed:.2f}s")
        print(f"Total records: {sum(len(result) for result in results)}")

Connection Optimization

HTTP/2 Connection Pooling

class OptimizedClient:
    """Client with optimized connection settings."""

    def __init__(self, host: str, port: int = 8069):
        self.client = ZenooClient(
            host,
            port=port,
            # HTTP/2 for connection multiplexing
            http2=True,

            # Optimized connection pool
            max_connections=100,           # Total connections
            max_keepalive_connections=20,  # Persistent connections
            keepalive_expiry=30.0,         # Connection TTL

            # Timeout optimization
            timeout=30.0,
            connect_timeout=10.0,
            read_timeout=30.0,

            # Compression
            headers={
                "Accept-Encoding": "gzip, deflate, br",
                "User-Agent": "ZenooRPC/1.0 (Performance-Optimized)"
            }
        )

    async def __aenter__(self):
        await self.client.__aenter__()
        return self.client

    async def __aexit__(self, *args):
        await self.client.__aexit__(*args)

# Usage
async with OptimizedClient("localhost") as client:
    await client.login("demo", "admin", "admin")
    # All operations use optimized connection pool

Connection Pool Management

from zenoo_rpc.transport.pool import ConnectionPool

class HighPerformancePool:
    """Enterprise-grade connection pool."""

    def __init__(self, base_url: str):
        self.pool = ConnectionPool(
            base_url=base_url,
            pool_size=20,              # Initial pool size
            max_connections=100,       # Maximum connections
            http2=True,               # HTTP/2 multiplexing
            timeout=30.0,             # Request timeout
            health_check_interval=30.0, # Health check frequency
            max_error_rate=5.0,       # Circuit breaker threshold
            connection_ttl=300.0      # Connection lifetime
        )

    async def initialize(self):
        """Initialize the connection pool."""
        await self.pool.initialize()

    async def get_stats(self) -> dict:
        """Get pool performance statistics."""
        return {
            "pool_size": len(self.pool.connections),
            "available": self.pool.available_connections.qsize(),
            "stats": self.pool.stats,
            "circuit_breaker": self.pool.circuit_breaker.state.value
        }

# Usage
pool = HighPerformancePool("http://localhost:8069")
await pool.initialize()

async with pool.pool.get_connection() as client:
    response = await client.post("/jsonrpc", json=rpc_data)

Connection Reuse Patterns

# ✅ Optimal: Single client for multiple operations
async def efficient_operations():
    async with ZenooClient("localhost", port=8069) as client:
        await client.login("demo", "admin", "admin")

        # All operations reuse the same connection pool
        partners = await client.model(ResPartner).filter(is_company=True).all()
        products = await client.model(ProductProduct).filter(active=True).all()
        orders = await client.model(SaleOrder).filter(state="sale").all()

        return partners, products, orders

# ❌ Inefficient: Multiple clients
async def inefficient_operations():
    # Creates new connection pool for each client
    async with ZenooClient("localhost", port=8069) as client1:
        await client1.login("demo", "admin", "admin")
        partners = await client1.model(ResPartner).all()

    async with ZenooClient("localhost", port=8069) as client2:
        await client2.login("demo", "admin", "admin")
        products = await client2.model(ProductProduct).all()

Caching Strategies

Multi-Tier Caching

class TieredCacheManager:
    """Multi-tier caching for maximum performance."""

    def __init__(self, client: ZenooClient):
        self.client = client

    async def setup_caching(self):
        """Setup multi-tier caching strategy."""

        # L1: Memory cache (fastest, smallest)
        await self.client.setup_cache_manager(
            backend="memory",
            max_size=1000,        # Small, hot data
            default_ttl=60,       # Short TTL for freshness
            strategy="lru"        # LRU eviction
        )

        # L2: Redis cache (shared, larger)
        await self.client.setup_cache_manager(
            backend="redis",
            url="redis://localhost:6379",
            max_size=10000,       # Larger capacity
            default_ttl=300,      # Longer TTL
            strategy="ttl"        # TTL-based eviction
        )

    async def get_with_fallback(self, key: str) -> Any:
        """Get data with cache fallback."""
        # Try L1 cache first
        value = await self.client.cache_manager.get(key, backend="memory")
        if value is not None:
            return value

        # Try L2 cache
        value = await self.client.cache_manager.get(key, backend="redis")
        if value is not None:
            # Populate L1 cache
            await self.client.cache_manager.set(key, value, ttl=60, backend="memory")
            return value

        # Cache miss - fetch from source
        return None

Cache-Optimized Queries

class CacheOptimizedService:
    """Service with intelligent caching patterns."""

    def __init__(self, client: ZenooClient):
        self.client = client

    async def get_companies(self, use_cache: bool = True) -> List[ResPartner]:
        """Get companies with intelligent caching."""
        query = self.client.model(ResPartner).filter(is_company=True)

        if use_cache:
            # Cache for 5 minutes with LRU eviction
            query = query.cache(ttl=300)

        return await query.all()

    async def get_partner_details(self, partner_id: int) -> ResPartner:
        """Get partner with relationship prefetching and caching."""
        return await (
            self.client.model(ResPartner)
            .filter(id=partner_id)
            .prefetch_related("category_id", "country_id", "state_id")
            .cache(ttl=600)  # Cache for 10 minutes
            .first()
        )

    async def search_partners(self, query: str, limit: int = 20) -> List[ResPartner]:
        """Search partners with result caching."""
        # Cache search results for 2 minutes
        return await (
            self.client.model(ResPartner)
            .filter(name__ilike=f"%{query}%")
            .limit(limit)
            .only("id", "name", "email", "phone")  # Reduce data transfer
            .cache(ttl=120)
            .all()
        )

Cache Performance Monitoring

class CacheMonitor:
    """Monitor cache performance and hit rates."""

    def __init__(self, cache_manager):
        self.cache_manager = cache_manager

    async def get_performance_metrics(self) -> dict:
        """Get comprehensive cache metrics."""
        stats = await self.cache_manager.get_stats()

        hit_rate = (
            stats["hits"] / (stats["hits"] + stats["misses"])
            if (stats["hits"] + stats["misses"]) > 0
            else 0
        )

        return {
            "hit_rate": f"{hit_rate:.2%}",
            "total_requests": stats["hits"] + stats["misses"],
            "cache_size": stats["size"],
            "memory_usage": stats.get("memory_usage", "N/A"),
            "evictions": stats.get("evictions", 0),
            "avg_response_time": stats.get("avg_response_time", 0)
        }

    async def optimize_cache_settings(self):
        """Auto-optimize cache settings based on metrics."""
        metrics = await self.get_performance_metrics()

        hit_rate = float(metrics["hit_rate"].rstrip('%')) / 100

        if hit_rate < 0.7:  # Low hit rate
            # Increase cache size and TTL
            await self.cache_manager.configure(
                max_size=self.cache_manager.config["max_size"] * 1.5,
                default_ttl=self.cache_manager.config["default_ttl"] * 1.2
            )
        elif hit_rate > 0.95:  # Very high hit rate
            # Can reduce cache size to save memory
            await self.cache_manager.configure(
                max_size=self.cache_manager.config["max_size"] * 0.8
            )

Batch Operations

High-Performance Batch Processing

class BatchProcessor:
    """High-performance batch operation processor."""

    def __init__(self, client: ZenooClient):
        self.client = client

    async def setup_batch_processing(self):
        """Setup optimized batch processing."""
        await self.client.setup_batch_manager(
            max_chunk_size=100,      # Optimal chunk size
            max_concurrency=10,      # Concurrent operations
            retry_attempts=3,        # Retry failed operations
            progress_callback=self._progress_callback
        )

    async def bulk_create_partners(self, partner_data: List[dict]) -> List[int]:
        """Bulk create partners with optimal performance."""
        async with self.client.batch_context() as batch:
            # Split into optimal chunks automatically
            create_op = batch.create("res.partner", partner_data)

            # Execute with progress tracking
            results = await batch.execute()

            return results[0].result  # Created IDs

    async def bulk_update_with_different_data(self, updates: List[dict]) -> List[dict]:
        """Bulk update with individual data per record."""
        async with self.client.batch_context(max_chunk_size=50) as batch:
            # Individual updates for different data
            update_op = batch.update("res.partner", updates)

            results = await batch.execute()
            return results[0].result

    async def parallel_batch_operations(self, data_sets: List[List[dict]]) -> List[List[int]]:
        """Execute multiple batch operations in parallel."""
        tasks = []

        for data_set in data_sets:
            task = self.bulk_create_partners(data_set)
            tasks.append(task)

        # Execute all batches concurrently
        return await asyncio.gather(*tasks)

    def _progress_callback(self, completed: int, total: int):
        """Progress callback for batch operations."""
        percentage = (completed / total) * 100
        print(f"Batch progress: {completed}/{total} ({percentage:.1f}%)")

Memory-Efficient Streaming

class StreamingProcessor:
    """Memory-efficient streaming for large datasets."""

    def __init__(self, client: ZenooClient):
        self.client = client

    async def stream_large_dataset(self, model_name: str, batch_size: int = 1000):
        """Stream large dataset without loading everything into memory."""
        offset = 0

        while True:
            # Fetch batch
            batch = await (
                self.client.model_by_name(model_name)
                .limit(batch_size)
                .offset(offset)
                .all()
            )

            if not batch:
                break  # No more data

            # Process batch
            await self.process_batch(batch)

            # Update offset
            offset += batch_size

            # Optional: yield control to event loop
            await asyncio.sleep(0)

    async def process_batch(self, batch: List[Any]):
        """Process a batch of records."""
        # Process records in batch
        for record in batch:
            await self.process_record(record)

        # Clear batch from memory
        del batch

    async def process_record(self, record: Any):
        """Process individual record."""
        # Your processing logic here
        pass

Query Optimization

Efficient Query Patterns

class OptimizedQueryService:
    """Service with optimized query patterns."""

    def __init__(self, client: ZenooClient):
        self.client = client

    async def get_companies_with_contacts(self) -> List[ResPartner]:
        """Get companies with their contacts efficiently."""
        # ✅ Optimal: Single query with prefetch
        return await (
            self.client.model(ResPartner)
            .filter(is_company=True)
            .prefetch_related("child_ids")  # Prefetch contacts
            .only("id", "name", "email", "child_ids")  # Select only needed fields
            .cache(ttl=300)
            .all()
        )

    async def get_partner_summary(self, partner_ids: List[int]) -> List[dict]:
        """Get partner summary with minimal data transfer."""
        return await (
            self.client.model(ResPartner)
            .filter(id__in=partner_ids)
            .only("id", "name", "email", "phone", "is_company")  # Minimal fields
            .values()  # Return as dictionaries
        )

    async def search_with_pagination(self, search_term: str, page: int = 1, page_size: int = 20):
        """Efficient pagination with search."""
        offset = (page - 1) * page_size

        # Get total count efficiently
        total_count = await (
            self.client.model(ResPartner)
            .filter(name__ilike=f"%{search_term}%")
            .count()
        )

        # Get page data
        results = await (
            self.client.model(ResPartner)
            .filter(name__ilike=f"%{search_term}%")
            .limit(page_size)
            .offset(offset)
            .only("id", "name", "email")
            .cache(ttl=60)  # Short cache for search results
            .all()
        )

        return {
            "results": results,
            "total_count": total_count,
            "page": page,
            "page_size": page_size,
            "total_pages": (total_count + page_size - 1) // page_size
        }

N+1 Query Prevention

class RelationshipOptimizer:
    """Prevent N+1 queries with relationship optimization."""

    def __init__(self, client: ZenooClient):
        self.client = client

    async def get_orders_with_details(self) -> List[dict]:
        """Get orders with customer and line details efficiently."""
        # ✅ Optimal: Prefetch all relationships
        orders = await (
            self.client.model(SaleOrder)
            .filter(state="sale")
            .prefetch_related(
                "partner_id",           # Customer
                "order_line",           # Order lines
                "order_line.product_id" # Products in lines
            )
            .select_related("partner_id")  # Join customer data
            .all()
        )

        # No additional queries needed - all data is prefetched
        return [
            {
                "order_id": order.id,
                "customer_name": order.partner_id.name,
                "total_amount": order.amount_total,
                "line_count": len(order.order_line),
                "products": [line.product_id.name for line in order.order_line]
            }
            for order in orders
        ]

    async def get_partners_with_categories(self) -> List[dict]:
        """Get partners with their categories efficiently."""
        # ✅ Optimal: Batch load categories
        partners = await (
            self.client.model(ResPartner)
            .filter(is_company=True)
            .prefetch_related("category_id")
            .all()
        )

        return [
            {
                "partner_id": partner.id,
                "name": partner.name,
                "categories": [cat.name for cat in partner.category_id]
            }
            for partner in partners
        ]

Memory Management

Memory-Efficient Processing

class MemoryOptimizer:
    """Memory optimization techniques."""

    def __init__(self, client: ZenooClient):
        self.client = client

    async def process_large_dataset_efficiently(self, model_name: str):
        """Process large dataset with minimal memory usage."""
        # Use generator pattern for memory efficiency
        async for batch in self._batch_generator(model_name, batch_size=500):
            # Process batch
            await self._process_batch_memory_efficient(batch)

            # Explicit cleanup
            del batch

            # Yield control to garbage collector
            await asyncio.sleep(0)

    async def _batch_generator(self, model_name: str, batch_size: int = 500):
        """Generator for memory-efficient batch processing."""
        offset = 0

        while True:
            batch = await (
                self.client.model_by_name(model_name)
                .limit(batch_size)
                .offset(offset)
                .values()  # Use values() for lower memory usage
            )

            if not batch:
                break

            yield batch
            offset += batch_size

    async def _process_batch_memory_efficient(self, batch: List[dict]):
        """Process batch with memory efficiency."""
        # Process records one by one to minimize peak memory
        for record in batch:
            await self._process_record(record)
            # Record is automatically garbage collected

    async def _process_record(self, record: dict):
        """Process individual record."""
        # Your processing logic here
        pass

Connection Pool Optimization

class ResourceManager:
    """Optimize resource usage and cleanup."""

    def __init__(self):
        self.active_connections = {}
        self.connection_stats = {}

    async def get_optimized_client(self, key: str) -> ZenooClient:
        """Get or create optimized client with resource tracking."""
        if key not in self.active_connections:
            client = ZenooClient(
                "localhost",
                port=8069,
                # Optimized settings
                max_connections=50,
                max_keepalive_connections=10,
                timeout=30.0,
                # Memory optimization
                headers={"Connection": "keep-alive"}
            )

            self.active_connections[key] = client
            self.connection_stats[key] = {
                "created_at": time.time(),
                "requests": 0,
                "errors": 0
            }

        return self.active_connections[key]

    async def cleanup_idle_connections(self, max_idle_time: float = 300.0):
        """Clean up idle connections to free resources."""
        current_time = time.time()

        for key, stats in list(self.connection_stats.items()):
            if current_time - stats["created_at"] > max_idle_time:
                # Close idle connection
                client = self.active_connections.pop(key, None)
                if client:
                    await client.close()

                del self.connection_stats[key]
                print(f"Cleaned up idle connection: {key}")

    async def get_resource_stats(self) -> dict:
        """Get resource usage statistics."""
        return {
            "active_connections": len(self.active_connections),
            "total_requests": sum(stats["requests"] for stats in self.connection_stats.values()),
            "total_errors": sum(stats["errors"] for stats in self.connection_stats.values()),
            "memory_usage": self._estimate_memory_usage()
        }

    def _estimate_memory_usage(self) -> str:
        """Estimate memory usage of active connections."""
        # Simplified estimation
        base_memory_per_connection = 1024 * 1024  # 1MB per connection
        total_memory = len(self.active_connections) * base_memory_per_connection
        return f"{total_memory / (1024 * 1024):.1f} MB"

Performance Monitoring

Real-Time Performance Metrics

class PerformanceMonitor:
    """Monitor and track performance metrics."""

    def __init__(self):
        self.metrics = {
            "request_count": 0,
            "total_response_time": 0.0,
            "error_count": 0,
            "cache_hits": 0,
            "cache_misses": 0
        }

    async def track_operation(self, operation_name: str, operation_func):
        """Track operation performance."""
        start_time = time.time()

        try:
            result = await operation_func()

            # Record success metrics
            response_time = time.time() - start_time
            self.metrics["request_count"] += 1
            self.metrics["total_response_time"] += response_time

            print(f"{operation_name}: {response_time:.3f}s")
            return result

        except Exception as e:
            # Record error metrics
            self.metrics["error_count"] += 1
            print(f"{operation_name} failed: {e}")
            raise

    def get_performance_summary(self) -> dict:
        """Get performance summary."""
        avg_response_time = (
            self.metrics["total_response_time"] / self.metrics["request_count"]
            if self.metrics["request_count"] > 0
            else 0
        )

        error_rate = (
            self.metrics["error_count"] / self.metrics["request_count"]
            if self.metrics["request_count"] > 0
            else 0
        )

        cache_hit_rate = (
            self.metrics["cache_hits"] / (self.metrics["cache_hits"] + self.metrics["cache_misses"])
            if (self.metrics["cache_hits"] + self.metrics["cache_misses"]) > 0
            else 0
        )

        return {
            "total_requests": self.metrics["request_count"],
            "avg_response_time": f"{avg_response_time:.3f}s",
            "error_rate": f"{error_rate:.2%}",
            "cache_hit_rate": f"{cache_hit_rate:.2%}",
            "requests_per_second": self._calculate_rps()
        }

    def _calculate_rps(self) -> float:
        """Calculate requests per second."""
        # Simplified calculation
        return self.metrics["request_count"] / max(self.metrics["total_response_time"], 1)

Best Practices Summary

1. Connection Management

# ✅ Optimal connection setup
client = ZenooClient(
    "localhost",
    port=8069,
    http2=True,                    # Enable HTTP/2
    max_connections=100,           # Adequate pool size
    max_keepalive_connections=20,  # Persistent connections
    timeout=30.0                   # Reasonable timeout
)

2. Query Optimization

# ✅ Efficient query patterns
partners = await (
    client.model(ResPartner)
    .filter(is_company=True)
    .only("id", "name", "email")      # Select only needed fields
    .prefetch_related("category_id")   # Prevent N+1 queries
    .cache(ttl=300)                   # Cache results
    .limit(100)                       # Reasonable limits
    .all()
)

3. Batch Operations

# ✅ Efficient batch processing
async with client.batch_context(max_chunk_size=100) as batch:
    batch.create("res.partner", partner_data)
    results = await batch.execute()

4. Memory Management

# ✅ Memory-efficient processing
async for batch in stream_large_dataset(model_name, batch_size=500):
    await process_batch(batch)
    del batch  # Explicit cleanup

5. Caching Strategy

# ✅ Multi-tier caching
await client.setup_cache_manager(
    backend="memory",
    max_size=1000,
    default_ttl=300,
    strategy="lru"
)

Next Steps