Cache API Reference¶
The cache module provides intelligent caching with TTL and LRU strategies, supporting both in-memory and Redis backends for high-performance data caching.
Overview¶
The cache system consists of:
- CacheManager: Main interface for cache operations
- Backends: MemoryCache and RedisCache implementations
- Strategies: TTL, LRU, and LFU caching strategies
- Decorators: Function-level caching decorators
- Key Management: Cache key generation and validation
CacheManager¶
Main cache management interface coordinating between backends and strategies.
Constructor¶
class CacheManager:
"""Main cache manager for Zenoo RPC."""
def __init__(self):
self.backends: Dict[str, CacheBackend] = {}
self.strategies: Dict[str, CacheStrategy] = {}
self.default_backend = "memory"
self.default_strategy = "ttl"
Setup Methods¶
async setup_memory_cache(name="memory", max_size=1000, default_ttl=None, strategy="ttl")¶
Setup in-memory cache backend.
Parameters:
name(str): Backend name (default: "memory")max_size(int): Maximum cache size (default: 1000)default_ttl(int, optional): Default TTL in secondsstrategy(str): Cache strategy ("ttl", "lru", "lfu") (default: "ttl")
Example:
# Basic memory cache
await client.cache_manager.setup_memory_cache()
# Custom configuration
await client.cache_manager.setup_memory_cache(
name="custom_memory",
max_size=5000,
default_ttl=600,
strategy="lru"
)
async setup_redis_cache(name="redis", url="redis://localhost:6379/0", namespace="zenoo_rpc", strategy="ttl", enable_fallback=True, **kwargs)¶
Setup Redis cache backend with production features.
Parameters:
name(str): Backend name (default: "redis")url(str): Redis connection URLnamespace(str): Cache namespace (default: "zenoo_rpc")strategy(str): Cache strategy (default: "ttl")enable_fallback(bool): Enable fallback to memory on Redis failuremax_connections(int): Maximum Redis connections (default: 20)retry_attempts(int): Retry attempts for failed operations (default: 3)circuit_breaker_threshold(int): Circuit breaker failure threshold (default: 5)
Example:
# Basic Redis cache
await client.cache_manager.setup_redis_cache(
url="redis://localhost:6379/0"
)
# Production Redis cache
await client.cache_manager.setup_redis_cache(
name="production_redis",
url="redis://redis-cluster:6379/0",
namespace="myapp",
max_connections=50,
retry_attempts=5,
enable_fallback=True,
circuit_breaker_threshold=10
)
Cache Operations¶
async set(key, value, ttl=None, backend=None)¶
Set a value in cache.
Parameters:
key(str | CacheKey): Cache keyvalue(Any): Value to cachettl(int, optional): Time to live in secondsbackend(str, optional): Backend name (uses default if None)
Returns: bool - True if successful
Example:
# Simple set
await cache_manager.set("user:123", user_data, ttl=300)
# With specific backend
await cache_manager.set("session:abc", session_data, backend="redis")
# Using CacheKey
from zenoo_rpc.cache.keys import make_cache_key
key = make_cache_key("res.partner", "search", {"domain": [("is_company", "=", True)]})
await cache_manager.set(key, partners_data, ttl=600)
async get(key, backend=None)¶
Get a value from cache.
Parameters:
key(str | CacheKey): Cache keybackend(str, optional): Backend name (uses default if None)
Returns: Any - Cached value or None if not found
Example:
# Simple get
user_data = await cache_manager.get("user:123")
# With specific backend
session_data = await cache_manager.get("session:abc", backend="redis")
# Check if value exists
if user_data is not None:
print(f"Found cached user: {user_data['name']}")
async delete(key, backend=None)¶
Delete a value from cache.
Parameters:
key(str | CacheKey): Cache keybackend(str, optional): Backend name (uses default if None)
Returns: bool - True if successful
Example:
# Delete specific key
await cache_manager.delete("user:123")
# Delete from specific backend
await cache_manager.delete("session:abc", backend="redis")
async exists(key, backend=None)¶
Check if a key exists in cache.
Parameters:
key(str | CacheKey): Cache keybackend(str, optional): Backend name (uses default if None)
Returns: bool - True if key exists
Example:
async clear(backend=None)¶
Clear cache.
Parameters:
backend(str, optional): Backend name (clears all if None)
Returns: bool - True if successful
Example:
# Clear specific backend
await cache_manager.clear("memory")
# Clear all backends
await cache_manager.clear()
Statistics and Monitoring¶
async get_stats(backend=None)¶
Get cache statistics.
Parameters:
backend(str, optional): Backend name (all backends if None)
Returns: Dict[str, Any] - Statistics data
Example:
# Get all stats
stats = await cache_manager.get_stats()
print(f"Total hits: {stats['total_hits']}")
print(f"Total misses: {stats['total_misses']}")
print(f"Hit rate: {stats['total_hits'] / (stats['total_hits'] + stats['total_misses']):.2%}")
# Get specific backend stats
redis_stats = await cache_manager.get_stats("redis")
Cache Backends¶
MemoryCache¶
In-memory cache backend with TTL and LRU support.
class MemoryCache(CacheBackend):
"""In-memory cache backend."""
def __init__(
self,
max_size: int = 1000,
default_ttl: Optional[int] = None,
cleanup_interval: int = 60
):
"""Initialize memory cache."""
Features:
- TTL (Time To Live) support
- LRU (Least Recently Used) eviction
- Thread-safe operations
- Memory usage tracking
- Automatic cleanup of expired items
Example:
# Direct usage (usually not needed)
from zenoo_rpc.cache.backends import MemoryCache
cache = MemoryCache(max_size=5000, default_ttl=300)
await cache.set("key1", "value1", ttl=60)
value = await cache.get("key1")
RedisCache¶
Enterprise-grade Redis cache backend with advanced features.
class RedisCache(CacheBackend):
"""Enhanced Redis cache backend."""
def __init__(
self,
url: str = "redis://localhost:6379/0",
namespace: str = "zenoo_rpc",
serializer: str = "json",
max_connections: int = 20,
retry_attempts: int = 3,
circuit_breaker_threshold: int = 5,
enable_fallback: bool = True,
**kwargs
):
"""Initialize Redis cache."""
Enhanced Features:
- Connection pool management
- Circuit breaker pattern for fault tolerance
- Exponential backoff retry with jitter
- Health checking and automatic reconnection
- Comprehensive metrics and observability
- Graceful shutdown and resource cleanup
- Fallback mechanisms for high availability
Example:
# Direct usage (usually not needed)
from zenoo_rpc.cache.backends import RedisCache
cache = RedisCache(
url="redis://localhost:6379/0",
namespace="myapp",
max_connections=50,
enable_fallback=True
)
await cache.connect()
await cache.set("key1", {"data": "value"}, ttl=300)
value = await cache.get("key1")
await cache.close()
Cache Strategies¶
TTLCache¶
Time To Live cache strategy with automatic expiration.
class TTLCache(CacheStrategy):
"""TTL cache strategy."""
def __init__(
self,
backend: CacheBackend,
default_ttl: int = 300,
cleanup_interval: int = 60
):
"""Initialize TTL cache strategy."""
Features:
- Automatic expiration based on TTL
- Configurable default TTL
- Per-item TTL override
- Lazy expiration on access
Example:
# Used automatically by CacheManager
await cache_manager.setup_memory_cache(strategy="ttl", default_ttl=300)
LRUCache¶
Least Recently Used cache strategy with size-based eviction.
class LRUCache(CacheStrategy):
"""LRU cache strategy."""
def __init__(
self,
backend: CacheBackend,
max_size: int = 1000
):
"""Initialize LRU cache strategy."""
Features:
- LRU eviction policy
- Configurable maximum size
- Access order tracking
- Efficient O(1) operations
Example:
# Used automatically by CacheManager
await cache_manager.setup_memory_cache(strategy="lru", max_size=1000)
LFUCache¶
Least Frequently Used cache strategy.
class LFUCache(CacheStrategy):
"""LFU cache strategy."""
def __init__(
self,
backend: CacheBackend,
max_size: int = 1000
):
"""Initialize LFU cache strategy."""
Features:
- LFU eviction policy
- Frequency tracking
- Configurable maximum size
- Efficient operations
Cache Decorators¶
@async_cached¶
Enhanced async cache decorator with advanced features.
def async_cached(
ttl: Optional[int] = None,
key_prefix: Optional[str] = None,
backend: Optional[str] = None,
cache_manager: Optional[CacheManager] = None,
skip_cache: Optional[Callable] = None,
key_builder: Optional[Callable] = None,
prevent_stampede: bool = True,
enable_metrics: bool = True,
stale_while_revalidate: bool = False,
stale_ttl: Optional[int] = None
):
"""Enhanced async cache decorator."""
Features:
- Cache stampede prevention
- Comprehensive metrics and monitoring
- Stale-while-revalidate pattern support
- Circuit breaker integration hooks
- Thread-safe async operations
Example:
from zenoo_rpc.cache.decorators import async_cached
@async_cached(ttl=300, prevent_stampede=True, enable_metrics=True)
async def get_partner_data(client, partner_id):
"""Get partner data with caching."""
return await client.model(ResPartner).filter(id=partner_id).first()
# Usage
partner = await get_partner_data(client, 123)
@cache_result¶
Decorator for caching Odoo operation results.
def cache_result(
model: str,
operation: str,
ttl: Optional[int] = None,
backend: Optional[str] = None,
invalidate_on: Optional[List[str]] = None
):
"""Cache Odoo operation results."""
Example:
from zenoo_rpc.cache.decorators import cache_result
@cache_result("res.partner", "search", ttl=300)
async def search_partners(client, domain, **kwargs):
"""Search partners with caching."""
return await client.search_read("res.partner", domain, **kwargs)
# Usage
partners = await search_partners(client, [("is_company", "=", True)])
Cache Key Management¶
CacheKey¶
Structured cache key with metadata.
@dataclass
class CacheKey:
"""Structured cache key."""
key: str
namespace: str
model: Optional[str] = None
operation: Optional[str] = None
params: Optional[Dict[str, Any]] = None
make_cache_key()¶
Generate cache key for Odoo operations.
def make_cache_key(
model: str,
operation: str,
params: Optional[Dict[str, Any]] = None,
namespace: str = "zenoo_rpc",
include_hash: bool = True
) -> CacheKey:
"""Generate cache key for Odoo operations."""
Example:
from zenoo_rpc.cache.keys import make_cache_key
# Generate key for search operation
key = make_cache_key(
model="res.partner",
operation="search",
params={"domain": [("is_company", "=", True)], "limit": 10}
)
# Use key for caching
await cache_manager.set(key, search_results, ttl=300)
make_model_cache_key()¶
Generate cache key for model records.
def make_model_cache_key(
model: str,
record_id: Union[int, List[int]],
fields: Optional[List[str]] = None,
namespace: str = "zenoo_rpc"
) -> CacheKey:
"""Generate cache key for model records."""
Example:
from zenoo_rpc.cache.keys import make_model_cache_key
# Single record
key = make_model_cache_key("res.partner", 123, ["name", "email"])
# Multiple records
key = make_model_cache_key("res.partner", [1, 2, 3], ["name", "email"])
Cache Integration with QuerySet¶
Query-Level Caching¶
# Cache query results
partners = await client.model(ResPartner).filter(
is_company=True
).cache(
key="all_companies",
ttl=600,
backend="redis"
).all()
# Cache with auto-generated key
partners = await client.model(ResPartner).filter(
customer_rank__gt=0
).cache(ttl=300).all()
# Cache count queries
count = await client.model(ResPartner).filter(
is_company=True
).cache(ttl=600).count()
Cache Invalidation¶
# Manual invalidation
await cache_manager.delete("all_companies")
# Pattern-based invalidation (Redis only)
await cache_manager.delete_pattern("res.partner:*")
# Model-based invalidation
await cache_manager.invalidate_model("res.partner")
Error Handling¶
Cache Exceptions¶
from zenoo_rpc.cache.exceptions import CacheError, CacheBackendError
try:
await cache_manager.set("key", "value")
except CacheBackendError as e:
print(f"Cache backend error: {e}")
except CacheError as e:
print(f"Cache error: {e}")
Graceful Degradation¶
async def get_data_with_fallback(key, fetch_func):
"""Get data with cache fallback."""
try:
# Try cache first
cached_data = await cache_manager.get(key)
if cached_data is not None:
return cached_data
except CacheError:
# Cache error, continue to fetch
pass
# Fetch fresh data
data = await fetch_func()
try:
# Try to cache result
await cache_manager.set(key, data, ttl=300)
except CacheError:
# Cache error, but we have data
pass
return data
Performance Considerations¶
Cache Sizing¶
# Memory cache sizing
await cache_manager.setup_memory_cache(
max_size=10000, # Adjust based on available memory
strategy="lru" # Use LRU for memory management
)
# Redis cache with connection pooling
await cache_manager.setup_redis_cache(
max_connections=50, # Adjust based on load
retry_attempts=3,
circuit_breaker_threshold=10
)
TTL Strategy¶
# Different TTL for different data types
await cache_manager.set("user:123", user_data, ttl=300) # 5 minutes
await cache_manager.set("config:app", config_data, ttl=3600) # 1 hour
await cache_manager.set("static:countries", countries, ttl=86400) # 1 day
Cache Warming¶
async def warm_cache(client):
"""Warm up cache with frequently accessed data."""
# Cache all countries
countries = await client.model(ResCountry).all()
await cache_manager.set("all_countries", countries, ttl=86400)
# Cache active partners
partners = await client.model(ResPartner).filter(active=True).all()
await cache_manager.set("active_partners", partners, ttl=3600)
Next Steps¶
- Learn about Cache Strategies in detail
- Explore Cache Backends configuration
- Check Cache Decorators for function-level caching
- Understand Cache Keys management