Retry Policies API Reference¶
Comprehensive retry policies with circuit breakers, timeout management, exception filtering, and graceful degradation for production-ready resilience patterns.
Overview¶
Retry policies provide:
- Exception Classification: Automatic retryable vs non-retryable detection
- Timeout Management: Total operation timeout with circuit breaker integration
- Circuit Breaker Support: Fault tolerance patterns for external dependencies
- Graceful Degradation: Fallback mechanisms when retries are exhausted
- Custom Conditions: Flexible retry decision logic
RetryPolicy Base Class¶
Comprehensive retry policy configuration with advanced features.
Constructor¶
@dataclass
class RetryPolicy:
"""Enhanced retry policy configuration."""
strategy: RetryStrategy = field(default_factory=lambda: ExponentialBackoffStrategy())
retryable_exceptions: Set[Type[Exception]] = field(default_factory=set)
non_retryable_exceptions: Set[Type[Exception]] = field(default_factory=set)
timeout: Optional[float] = None
retry_condition: Optional[Callable[[Exception], bool]] = None
# Enhanced features
circuit_breaker_hook: Optional[Callable[[RetryContext], bool]] = None
idempotency_check: Optional[Callable[[RetryContext], bool]] = None
graceful_degradation: Optional[Callable[[RetryContext], Any]] = None
max_total_delay: Optional[float] = None
backoff_multiplier_on_failure: float = 1.0
success_callback: Optional[Callable[[RetryContext], None]] = None
failure_callback: Optional[Callable[[RetryContext], None]] = None
Parameters:
strategy(RetryStrategy): Retry strategy to useretryable_exceptions(Set[Type[Exception]]): Exceptions that should trigger retrynon_retryable_exceptions(Set[Type[Exception]]): Exceptions that should not retrytimeout(Optional[float]): Total operation timeout in secondsretry_condition(Optional[Callable]): Custom retry condition functioncircuit_breaker_hook(Optional[Callable]): Circuit breaker integrationgraceful_degradation(Optional[Callable]): Fallback function when retries exhausted
Decision Making¶
make_retry_decision(context)¶
Make comprehensive retry decision with enhanced context.
Parameters:
context(RetryContext): Retry context with comprehensive information
Returns: RetryDecision - Decision indicating what action to take
Example:
from zenoo_rpc.retry.policies import RetryPolicy, RetryContext, RetryDecision
policy = RetryPolicy()
# Create retry context
context = RetryContext(
attempt_number=2,
exception=ConnectionError("Network error"),
start_time=time.time()
)
# Make decision
decision = policy.make_retry_decision(context)
if decision == RetryDecision.RETRY:
print("Should retry")
elif decision == RetryDecision.STOP:
print("Should stop retrying")
elif decision == RetryDecision.CIRCUIT_OPEN:
print("Circuit breaker is open")
Built-in Policies¶
DefaultRetryPolicy¶
General-purpose retry policy for common scenarios.
class DefaultRetryPolicy(RetryPolicy):
"""Default retry policy for common scenarios."""
def __init__(self):
# Retryable: ConnectionError, TimeoutError, OSError
# Non-retryable: ValueError, TypeError, AttributeError, KeyError, IndexError
# Strategy: ExponentialBackoff (3 attempts, 1s base, 2x multiplier, 30s max)
# Timeout: 60 seconds
super().__init__(
strategy=ExponentialBackoffStrategy(max_attempts=3),
timeout=60.0
)
Configuration:
- Max Attempts: 3
- Base Delay: 1.0 seconds
- Multiplier: 2.0
- Max Delay: 30.0 seconds
- Timeout: 60 seconds
- Jitter: Enabled
Example:
from zenoo_rpc.retry.policies import DefaultRetryPolicy
from zenoo_rpc.retry.decorators import async_retry
from zenoo_rpc.models.common import ResPartner
# Use default policy
@async_retry(policy=DefaultRetryPolicy())
async def reliable_operation():
return await client.model(ResPartner).all()
# Delays: ~1s, ~2s, ~4s (with jitter)
NetworkRetryPolicy¶
Specialized policy for network operations with HTTP-specific handling.
class NetworkRetryPolicy(RetryPolicy):
"""Network-specific retry policy."""
def __init__(self):
# Retryable: ConnectionError, TimeoutError, OSError, httpx errors
# Strategy: ExponentialBackoff (5 attempts, 0.5s base, 1.5x multiplier, 10s max)
# Timeout: 30 seconds
# Custom condition: HTTP 5xx errors are retryable
super().__init__(
strategy=ExponentialBackoffStrategy(max_attempts=5),
timeout=30.0
)
Configuration:
- Max Attempts: 5
- Base Delay: 0.5 seconds
- Multiplier: 1.5
- Max Delay: 10.0 seconds
- Timeout: 30 seconds
- HTTP Handling: 5xx errors are retryable, 4xx are not
Example:
from zenoo_rpc.retry.policies import NetworkRetryPolicy
@async_retry(policy=NetworkRetryPolicy())
async def api_call():
return await client.search("res.partner", [])
# Optimized for network operations
# Handles HTTP errors appropriately
DatabaseRetryPolicy¶
Specialized policy for database operations with database-specific error handling.
class DatabaseRetryPolicy(RetryPolicy):
"""Database-specific retry policy."""
def __init__(self):
# Retryable: ConnectionError, TimeoutError, OSError, psycopg2 errors
# Strategy: ExponentialBackoff (3 attempts, 2s base, 2x multiplier, 60s max)
# Timeout: 120 seconds
Configuration:
- Max Attempts: 3
- Base Delay: 2.0 seconds
- Multiplier: 2.0
- Max Delay: 60.0 seconds
- Timeout: 120 seconds
- Database Errors: Handles psycopg2 OperationalError, InterfaceError
Example:
from zenoo_rpc.retry.policies import DatabaseRetryPolicy
@async_retry(policy=DatabaseRetryPolicy())
async def database_operation():
return await client.create("res.partner", partner_data)
# Longer delays suitable for database recovery
# Handles database-specific exceptions
QuickRetryPolicy¶
Fast retry policy for quick operations with minimal delay.
class QuickRetryPolicy(RetryPolicy):
"""Quick retry policy for fast operations."""
def __init__(self):
# Strategy: ExponentialBackoff (2 attempts, 0.1s base, 2x multiplier, 1s max)
# Timeout: 5 seconds
# Jitter: Disabled for predictable timing
super().__init__(
strategy=ExponentialBackoffStrategy(max_attempts=2),
timeout=5.0
)
Configuration:
- Max Attempts: 2
- Base Delay: 0.1 seconds
- Multiplier: 2.0
- Max Delay: 1.0 seconds
- Timeout: 5 seconds
- Jitter: Disabled
Example:
from zenoo_rpc.retry.policies import QuickRetryPolicy
@async_retry(policy=QuickRetryPolicy())
async def fast_operation():
return await client.search_count("res.partner", [])
# Delays: 0.1s, 0.2s
# Total time: ~0.3s + operation time
AggressiveRetryPolicy¶
Aggressive retry policy for critical operations with many attempts.
class AggressiveRetryPolicy(RetryPolicy):
"""Aggressive retry policy for critical operations."""
def __init__(self):
# Strategy: ExponentialBackoff (10 attempts, 0.5s base, 1.2x multiplier, 30s max)
# Timeout: 300 seconds (5 minutes)
Configuration:
- Max Attempts: 10
- Base Delay: 0.5 seconds
- Multiplier: 1.2 (slower growth)
- Max Delay: 30.0 seconds
- Timeout: 300 seconds (5 minutes)
Example:
from zenoo_rpc.retry.policies import AggressiveRetryPolicy
@async_retry(policy=AggressiveRetryPolicy())
async def critical_operation():
return await client.create("res.partner", critical_data)
# Many attempts with gradual backoff
# Suitable for critical business operations
Advanced Policies¶
CircuitBreakerRetryPolicy¶
Retry policy with integrated circuit breaker for fault tolerance.
class CircuitBreakerRetryPolicy(RetryPolicy):
"""Retry policy with circuit breaker integration."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
half_open_max_calls: int = 3
):
"""Initialize circuit breaker retry policy."""
Parameters:
failure_threshold(int): Failures before circuit opens (default: 5)recovery_timeout(float): Time before testing recovery (default: 60.0)half_open_max_calls(int): Max calls in half-open state (default: 3)
Example:
from zenoo_rpc.retry.policies import CircuitBreakerRetryPolicy
# Circuit breaker for external service
circuit_policy = CircuitBreakerRetryPolicy(
failure_threshold=3,
recovery_timeout=30.0,
half_open_max_calls=2
)
@async_retry(policy=circuit_policy)
async def external_service_call():
return await client.external_api()
# Circuit states: CLOSED -> OPEN -> HALF_OPEN -> CLOSED
IdempotentRetryPolicy¶
Retry policy for idempotent operations with duplicate detection.
class IdempotentRetryPolicy(RetryPolicy):
"""Retry policy for idempotent operations."""
def __init__(
self,
idempotency_key_generator: Optional[Callable[[], str]] = None,
idempotency_store: Optional[Dict[str, Any]] = None
):
"""Initialize idempotent retry policy."""
Features:
- Idempotency key generation
- Duplicate operation detection
- Safe retry for non-idempotent operations
Example:
from zenoo_rpc.retry.policies import IdempotentRetryPolicy
import uuid
def generate_key():
return str(uuid.uuid4())
idempotent_policy = IdempotentRetryPolicy(
idempotency_key_generator=generate_key
)
@async_retry(policy=idempotent_policy)
async def idempotent_create():
return await client.create("res.partner", data)
# Safe to retry without duplicates
GracefulDegradationRetryPolicy¶
Retry policy with fallback mechanisms when retries are exhausted.
class GracefulDegradationRetryPolicy(RetryPolicy):
"""Retry policy with graceful degradation."""
def __init__(
self,
fallback_function: Optional[Callable[[RetryContext], Any]] = None,
fallback_timeout: float = 10.0
):
"""Initialize graceful degradation retry policy."""
Example:
from zenoo_rpc.retry.policies import GracefulDegradationRetryPolicy
def fallback_data(context):
"""Provide fallback data when retries fail."""
return {"id": -1, "name": "Fallback Partner", "active": False}
fallback_policy = GracefulDegradationRetryPolicy(
fallback_function=fallback_data
)
@async_retry(policy=fallback_policy)
async def get_partner_with_fallback():
return await client.model(ResPartner).filter(id=123).first()
# Returns fallback data if all retries fail
Custom Retry Conditions¶
HTTP Status Code Handling¶
def http_retry_condition(exception):
"""Custom retry condition for HTTP errors."""
if hasattr(exception, 'response'):
status_code = exception.response.status_code
# Retry on 5xx server errors and 429 rate limit
return status_code >= 500 or status_code == 429
return True # Retry other exceptions
policy = RetryPolicy(
retry_condition=http_retry_condition,
timeout=60.0
)
Database Error Handling¶
def database_retry_condition(exception):
"""Custom retry condition for database errors."""
error_message = str(exception).lower()
# Retry on connection issues
if any(keyword in error_message for keyword in
['connection', 'timeout', 'deadlock', 'lock']):
return True
# Don't retry on constraint violations
if any(keyword in error_message for keyword in
['unique', 'foreign key', 'check constraint']):
return False
return True # Default to retry
db_policy = RetryPolicy(
retry_condition=database_retry_condition,
timeout=120.0
)
Business Logic Conditions¶
def business_retry_condition(exception):
"""Custom retry condition for business logic."""
# Retry on temporary business errors
if isinstance(exception, TemporaryBusinessError):
return True
# Don't retry on validation errors
if isinstance(exception, ValidationError):
return False
# Don't retry on permission errors
if isinstance(exception, PermissionError):
return False
return True
business_policy = RetryPolicy(
retry_condition=business_retry_condition,
timeout=30.0
)
Policy Factory Functions¶
Convenience functions for creating common policy configurations.
create_network_policy()¶
def create_network_policy(
max_attempts: int = 5,
base_delay: float = 0.5,
timeout: float = 30.0
) -> NetworkRetryPolicy:
"""Create a network retry policy with custom parameters."""
create_database_policy()¶
def create_database_policy(
max_attempts: int = 3,
base_delay: float = 2.0,
timeout: float = 120.0
) -> DatabaseRetryPolicy:
"""Create a database retry policy with custom parameters."""
create_circuit_breaker_policy()¶
def create_circuit_breaker_policy(
failure_threshold: int = 5,
recovery_timeout: float = 60.0
) -> CircuitBreakerRetryPolicy:
"""Create a circuit breaker retry policy with custom parameters."""
Example:
from zenoo_rpc.retry.policies import (
create_network_policy,
create_database_policy,
create_circuit_breaker_policy
)
# Custom network policy
network_policy = create_network_policy(
max_attempts=7,
base_delay=1.0,
timeout=45.0
)
# Custom database policy
db_policy = create_database_policy(
max_attempts=5,
base_delay=1.5,
timeout=180.0
)
# Custom circuit breaker
circuit_policy = create_circuit_breaker_policy(
failure_threshold=3,
recovery_timeout=30.0
)
Policy Composition¶
Combining Policies¶
class CompositeRetryPolicy(RetryPolicy):
"""Composite policy combining multiple policies."""
def __init__(self, *policies: RetryPolicy):
self.policies = policies
# Use most restrictive settings
super().__init__(
timeout=min(p.timeout for p in policies if p.timeout),
strategy=policies[0].strategy # Use first strategy
)
def make_retry_decision(self, context):
# All policies must agree to retry
for policy in self.policies:
decision = policy.make_retry_decision(context)
if decision != RetryDecision.RETRY:
return decision
return RetryDecision.RETRY
# Combine network and circuit breaker policies
composite = CompositeRetryPolicy(
NetworkRetryPolicy(),
CircuitBreakerRetryPolicy()
)
Conditional Policy Selection¶
class ConditionalRetryPolicy(RetryPolicy):
"""Policy that selects strategy based on conditions."""
def __init__(self):
self.network_policy = NetworkRetryPolicy()
self.database_policy = DatabaseRetryPolicy()
self.default_policy = DefaultRetryPolicy()
def make_retry_decision(self, context):
# Select policy based on exception type
if isinstance(context.exception, (ConnectionError, TimeoutError)):
return self.network_policy.make_retry_decision(context)
elif 'database' in str(context.exception).lower():
return self.database_policy.make_retry_decision(context)
else:
return self.default_policy.make_retry_decision(context)
conditional_policy = ConditionalRetryPolicy()
Best Practices¶
1. Choose Appropriate Policies¶
# ✅ Good: Match policy to operation type
@async_retry(policy=NetworkRetryPolicy())
async def api_call():
return await client.search("res.partner", [])
@async_retry(policy=DatabaseRetryPolicy())
async def database_write():
return await client.create("res.partner", data)
@async_retry(policy=QuickRetryPolicy())
async def fast_check():
return await client.search_count("res.partner", [])
2. Configure Timeouts Appropriately¶
# ✅ Good: Set reasonable timeouts
quick_policy = QuickRetryPolicy() # 5 seconds
network_policy = NetworkRetryPolicy() # 30 seconds
database_policy = DatabaseRetryPolicy() # 120 seconds
aggressive_policy = AggressiveRetryPolicy() # 300 seconds
# ❌ Avoid: Infinite or very long timeouts without justification
3. Use Circuit Breakers for External Dependencies¶
# ✅ Good: Circuit breaker for external services
external_policy = CircuitBreakerRetryPolicy(
failure_threshold=5,
recovery_timeout=60.0
)
@async_retry(policy=external_policy)
async def external_service():
return await client.external_api()
4. Implement Graceful Degradation¶
# ✅ Good: Provide fallback when retries fail
def fallback_function(context):
logger.warning(f"Using fallback after {context.attempt_number} attempts")
return default_data
graceful_policy = GracefulDegradationRetryPolicy(
fallback_function=fallback_function
)
5. Monitor Policy Performance¶
# ✅ Good: Monitor retry patterns
def success_callback(context):
metrics.increment('retry.success', tags=[
f'attempts:{context.attempt_number}',
f'duration:{context.elapsed_time:.2f}'
])
def failure_callback(context):
metrics.increment('retry.failure', tags=[
f'attempts:{context.attempt_number}',
f'exception:{type(context.exception).__name__}'
])
monitored_policy = RetryPolicy(
success_callback=success_callback,
failure_callback=failure_callback
)
Next Steps¶
- Learn about Retry Strategies for backoff algorithms
- Explore Retry Decorators for function-level retry logic
- Check Circuit Breakers for fault tolerance patterns