Frequently Asked Questions (FAQ)¶
Common questions and answers about Zenoo RPC usage, troubleshooting, and best practices.
General Questions¶
Q: What is Zenoo RPC and how does it differ from odoorpc?¶
A: Zenoo RPC is a modern, async-first Python library for interacting with Odoo servers. Key differences from odoorpc:
- Async/Await Support: Built for modern async Python applications
- Type Safety: Full type hints and Pydantic model validation
- Performance: HTTP/2 support, connection pooling, intelligent caching
- Developer Experience: Fluent query API, IDE autocompletion
- Enterprise Features: Batch operations, retry mechanisms, circuit breakers
# odoorpc (synchronous)
import odoorpc
odoo = odoorpc.ODOO('localhost', port=8069)
odoo.login('demo', 'admin', 'admin')
partners = odoo.env['res.partner'].search_read([('is_company', '=', True)])
# Zenoo RPC (asynchronous, type-safe)
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "admin")
partners = await client.model(ResPartner).filter(is_company=True).all()
Q: Is Zenoo RPC compatible with all Odoo versions?¶
A: Zenoo RPC is currently tested and verified with Odoo 18.0 only. While it uses standard JSON-RPC protocols that should be consistent across Odoo versions, compatibility with other versions is not yet verified:
- Odoo 18.0: ✅ Tested and supported (latest version)
- Odoo 12.0-17.0: ⚠️ Compatibility unknown (testing planned for v0.2.0)
We plan to add comprehensive version testing for older Odoo versions in upcoming releases.
Q: Can I use Zenoo RPC in synchronous code?¶
A: Zenoo RPC is designed for async/await patterns, but you can use it in synchronous code with asyncio.run():
import asyncio
from zenoo_rpc import ZenooClient
def sync_function():
"""Use Zenoo RPC in synchronous code."""
async def async_operation():
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "admin")
return await client.search("res.partner", [])
# Run async code from sync function
return asyncio.run(async_operation())
# Usage
partners = sync_function()
Installation & Setup¶
Q: How do I install Zenoo RPC with all optional dependencies?¶
A: Install with optional dependencies for full functionality:
# Basic installation
pip install zenoo-rpc
# With Redis support
pip install zenoo-rpc[redis]
# With development tools
pip install zenoo-rpc[dev]
# With all optional dependencies
pip install zenoo-rpc[redis,dev]
# From source (latest development version)
pip install git+https://github.com/tuanle96/zenoo-rpc.git
Q: What are the minimum system requirements?¶
A: Zenoo RPC requirements:
- Python: 3.8 or later (3.11+ recommended for best performance)
- Memory: 64MB minimum (more for caching and batch operations)
- Network: HTTP/HTTPS access to Odoo server
- Optional: Redis server for distributed caching
Q: How do I configure Zenoo RPC for production use?¶
A: Production configuration example:
from zenoo_rpc import ZenooClient
# Production client configuration
async def create_production_client():
client = ZenooClient(
host="odoo.company.com",
port=443, # HTTPS
use_ssl=True,
verify_ssl=True,
# Connection optimization
max_connections=50,
max_keepalive_connections=20,
timeout=30.0,
# Performance features
http2=True,
# Security headers
headers={
"User-Agent": "CompanyApp/1.0 (ZenooRPC)",
"X-Forwarded-For": "10.0.0.1"
}
)
# Setup caching
await client.setup_cache_manager(
backend="redis",
url="redis://redis.company.com:6379/0",
default_ttl=300,
max_size=10000
)
# Setup batch processing
await client.setup_batch_manager(
max_chunk_size=100,
max_concurrency=10
)
return client
Authentication & Security¶
Q: How do I handle authentication errors?¶
A: Common authentication issues and solutions:
from zenoo_rpc.exceptions import AuthenticationError
async def handle_auth_errors():
try:
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "wrong_password")
except AuthenticationError as e:
if "Access Denied" in str(e):
print("Invalid username or password")
elif "database" in str(e).lower():
print("Database does not exist")
else:
print(f"Authentication failed: {e}")
# Debug steps:
# 1. Check available databases
databases = await client.list_databases()
print(f"Available databases: {databases}")
# 2. Verify Odoo server is accessible
version = await client.version()
print(f"Odoo version: {version}")
Q: How do I implement secure authentication in production?¶
A: Production authentication best practices:
import os
from cryptography.fernet import Fernet
class SecureAuthManager:
def __init__(self):
# Use environment variables for credentials
self.host = os.getenv("ODOO_HOST")
self.database = os.getenv("ODOO_DATABASE")
self.username = os.getenv("ODOO_USERNAME")
# Encrypt stored passwords
key = os.getenv("ENCRYPTION_KEY").encode()
self.cipher = Fernet(key)
encrypted_password = os.getenv("ODOO_PASSWORD_ENCRYPTED")
self.password = self.cipher.decrypt(encrypted_password.encode()).decode()
async def get_authenticated_client(self):
client = ZenooClient(self.host, port=443, use_ssl=True)
await client.login(self.database, self.username, self.password)
return client
# Environment variables:
# ODOO_HOST=odoo.company.com
# ODOO_DATABASE=production
# ODOO_USERNAME=api_user
# ODOO_PASSWORD_ENCRYPTED=gAAAAABh...
# ENCRYPTION_KEY=your-encryption-key
Q: How do I handle session timeouts?¶
A: Implement automatic session refresh:
import time
from zenoo_rpc.exceptions import AuthenticationError
class SessionManager:
def __init__(self, client, database, username, password):
self.client = client
self.database = database
self.username = username
self.password = password
self.last_login = 0
self.session_timeout = 3600 # 1 hour
async def ensure_authenticated(self):
"""Ensure client is authenticated, refresh if needed."""
current_time = time.time()
if current_time - self.last_login > self.session_timeout:
await self.refresh_session()
async def refresh_session(self):
"""Refresh authentication session."""
try:
await self.client.login(self.database, self.username, self.password)
self.last_login = time.time()
except AuthenticationError:
# Handle re-authentication failure
raise
async def call_with_retry(self, method, *args, **kwargs):
"""Call method with automatic session refresh on auth failure."""
try:
await self.ensure_authenticated()
return await method(*args, **kwargs)
except AuthenticationError:
# Try refreshing session once
await self.refresh_session()
return await method(*args, **kwargs)
Performance & Optimization¶
Q: How can I improve query performance?¶
A: Performance optimization strategies:
# ❌ Inefficient: N+1 queries
async def inefficient_queries():
partner_ids = await client.search("res.partner", [], limit=100)
partners = []
for partner_id in partner_ids:
partner = await client.read("res.partner", [partner_id])
partners.append(partner[0])
return partners
# ✅ Efficient: Single query
async def efficient_queries():
return await client.search_read("res.partner", [], limit=100)
# ✅ Even better: With caching and field selection
async def optimized_queries():
return await (
client.model(ResPartner)
.filter(active=True)
.only("id", "name", "email") # Select only needed fields
.limit(100)
.cache(ttl=300) # Cache for 5 minutes
.all()
)
Q: How do I optimize batch operations?¶
A: Batch operation optimization:
# ✅ Optimal batch size (usually 50-200 records)
async def optimized_batch_create():
large_dataset = [{"name": f"Partner {i}"} for i in range(1000)]
async with client.batch_context(max_chunk_size=100) as batch:
batch.create("res.partner", large_dataset)
results = await batch.execute()
return results
# ✅ Concurrent batch processing
async def concurrent_batches():
datasets = [
[{"name": f"Batch1-{i}"} for i in range(100)],
[{"name": f"Batch2-{i}"} for i in range(100)],
[{"name": f"Batch3-{i}"} for i in range(100)]
]
tasks = []
for dataset in datasets:
task = create_batch(dataset)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def create_batch(data):
async with client.batch_context() as batch:
batch.create("res.partner", data)
return await batch.execute()
Q: When should I use caching and what are the best practices?¶
A: Caching best practices:
# ✅ Cache read-heavy, rarely changing data
async def cache_static_data():
# Cache countries (rarely change)
countries = await (
client.model(ResCountry)
.cache(ttl=3600) # 1 hour
.all()
)
# Cache user preferences (user-specific)
user_prefs = await (
client.model(ResUsers)
.filter(id=client.uid)
.cache(ttl=300, key_prefix=f"user_{client.uid}")
.first()
)
# ❌ Don't cache frequently changing data
async def avoid_caching():
# Don't cache real-time data
current_stock = await client.model(StockQuant).all() # No cache
# Don't cache user-specific data without proper keys
my_orders = await client.model(SaleOrder).filter(
user_id=client.uid
).all() # No cache or use user-specific key
# ✅ Cache invalidation strategy
async def cache_with_invalidation():
# Cache with tags for selective invalidation
partners = await (
client.model(ResPartner)
.filter(is_company=True)
.cache(ttl=600, tags=["partners", "companies"])
.all()
)
# Invalidate cache when data changes
await client.create("res.partner", {"name": "New Company"})
await client.cache_manager.invalidate_tags(["partners", "companies"])
Error Handling¶
Q: How do I handle network timeouts and connection errors?¶
A: Robust error handling with retries:
from zenoo_rpc.retry import async_retry, ExponentialBackoffStrategy
from zenoo_rpc.exceptions import NetworkError, TimeoutError
@async_retry(
max_attempts=3,
strategy=ExponentialBackoffStrategy(base_delay=1.0, multiplier=2.0)
)
async def robust_operation():
"""Operation with automatic retry on network errors."""
try:
return await client.search("res.partner", [])
except (NetworkError, TimeoutError) as e:
print(f"Network error, will retry: {e}")
raise # Re-raise to trigger retry
except Exception as e:
print(f"Non-retryable error: {e}")
raise
# Manual retry with custom logic
async def manual_retry():
max_attempts = 3
for attempt in range(max_attempts):
try:
return await client.search("res.partner", [])
except (NetworkError, TimeoutError) as e:
if attempt == max_attempts - 1:
raise # Last attempt, give up
wait_time = 2 ** attempt # Exponential backoff
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
Q: How do I handle Odoo-specific errors (ValidationError, AccessError, etc.)?¶
A: Handle Odoo business logic errors:
from zenoo_rpc.exceptions import ValidationError, AccessError, UserError
async def handle_odoo_errors():
try:
# This might fail due to validation rules
partner = await client.create("res.partner", {
"name": "", # Empty name might be invalid
"email": "invalid-email" # Invalid email format
})
except ValidationError as e:
print(f"Validation failed: {e}")
# Handle validation errors (fix data and retry)
except AccessError as e:
print(f"Access denied: {e}")
# Handle permission errors (check user rights)
except UserError as e:
print(f"Business rule violation: {e}")
# Handle business logic errors
except Exception as e:
print(f"Unexpected error: {e}")
# Handle other errors
# Graceful error handling with fallbacks
async def create_partner_with_fallback(partner_data):
"""Create partner with graceful error handling."""
try:
return await client.create("res.partner", partner_data)
except ValidationError as e:
# Try with minimal required data
minimal_data = {"name": partner_data.get("name", "Unknown")}
return await client.create("res.partner", minimal_data)
except AccessError:
# Log the attempt and return None
print(f"Access denied for partner creation: {partner_data}")
return None
Models & Data Types¶
Q: How do I work with custom Odoo models?¶
A: Define custom models for type safety:
from zenoo_rpc.models.base import OdooModel
from zenoo_rpc.models.fields import CharField, DateTimeField, Many2OneField
from typing import ClassVar, Optional
from datetime import datetime
class CustomModel(OdooModel):
"""Custom Odoo model definition."""
_odoo_name: ClassVar[str] = "custom.model"
# Define fields with proper types
name: str = CharField(description="Model name")
description: Optional[str] = CharField(description="Description")
created_date: Optional[datetime] = DateTimeField(description="Creation date")
partner_id: Optional[int] = Many2OneField("res.partner", description="Related partner")
# Usage with type safety
async def work_with_custom_model():
# Type-safe queries
records = await client.model(CustomModel).filter(
name__ilike="test"
).all()
# IDE autocompletion works
for record in records:
print(f"Name: {record.name}")
print(f"Created: {record.created_date}")
# Type checking catches errors
# record.invalid_field # This would be caught by type checker
# Dynamic model for unknown structures
async def work_with_dynamic_model():
# Use generic model for unknown structures
records = await client.search_read("unknown.model", [])
# Access as dictionaries
for record in records:
print(f"ID: {record['id']}")
print(f"Fields: {record.keys()}")
Q: How do I handle Many2one and One2many relationships?¶
A: Relationship handling patterns:
# Many2one relationships
async def work_with_many2one():
# Get partner with country information
partner = await (
client.model(ResPartner)
.filter(id=1)
.prefetch_related("country_id") # Avoid N+1 queries
.first()
)
if partner.country_id:
print(f"Country: {partner.country_id.name}")
# One2many relationships
async def work_with_one2many():
# Get partner with all contacts
company = await (
client.model(ResPartner)
.filter(id=1, is_company=True)
.prefetch_related("child_ids") # Load contacts
.first()
)
for contact in company.child_ids:
print(f"Contact: {contact.name}")
# Many2many relationships
async def work_with_many2many():
# Get partner with categories
partner = await (
client.model(ResPartner)
.filter(id=1)
.prefetch_related("category_id") # Load categories
.first()
)
for category in partner.category_id:
print(f"Category: {category.name}")
# Manual relationship loading
async def manual_relationship_loading():
# Get partner
partner = await client.model(ResPartner).filter(id=1).first()
# Manually load country
if partner.country_id:
country = await client.model(ResCountry).filter(
id=partner.country_id
).first()
print(f"Country: {country.name}")
Migration & Integration¶
Q: How do I migrate from odoorpc to Zenoo RPC?¶
A: Migration guide with examples:
# Before (odoorpc)
import odoorpc
odoo = odoorpc.ODOO('localhost', port=8069)
odoo.login('demo', 'admin', 'admin')
# Search and read
partner_ids = odoo.env['res.partner'].search([('is_company', '=', True)])
partners = odoo.env['res.partner'].browse(partner_ids)
# Create
new_partner = odoo.env['res.partner'].create({
'name': 'Test Company',
'is_company': True
})
# After (Zenoo RPC)
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner
async def migrated_code():
async with ZenooClient('localhost', port=8069) as client:
await client.login('demo', 'admin', 'admin')
# Search and read (single query)
partners = await client.model(ResPartner).filter(
is_company=True
).all()
# Create
new_partner = await client.create('res.partner', {
'name': 'Test Company',
'is_company': True
})
# Migration helper function
def migrate_odoorpc_code(odoorpc_function):
"""Helper to wrap odoorpc code for gradual migration."""
async def async_wrapper(*args, **kwargs):
# Convert sync odoorpc call to async Zenoo RPC
# This is a simplified example
result = await asyncio.get_event_loop().run_in_executor(
None, odoorpc_function, *args, **kwargs
)
return result
return async_wrapper
Q: How do I integrate Zenoo RPC with web frameworks (FastAPI, Django, Flask)?¶
A: Web framework integration examples:
# FastAPI integration
from fastapi import FastAPI, Depends
from zenoo_rpc import ZenooClient
app = FastAPI()
# Dependency for client
async def get_odoo_client():
client = ZenooClient("localhost", port=8069)
await client.login("demo", "admin", "admin")
try:
yield client
finally:
await client.close()
@app.get("/partners")
async def get_partners(client: ZenooClient = Depends(get_odoo_client)):
return await client.model(ResPartner).filter(is_company=True).all()
# Django integration (using async views in Django 4.1+)
from django.http import JsonResponse
from asgiref.sync import sync_to_async
class PartnerViewSet:
def __init__(self):
self.client = None
async def get_client(self):
if not self.client:
self.client = ZenooClient("localhost", port=8069)
await self.client.login("demo", "admin", "admin")
return self.client
async def list_partners(self, request):
client = await self.get_client()
partners = await client.model(ResPartner).filter(is_company=True).all()
return JsonResponse({"partners": [p.dict() for p in partners]})
# Flask integration (using asyncio)
from flask import Flask, jsonify
import asyncio
app = Flask(__name__)
@app.route('/partners')
def get_partners():
async def async_get_partners():
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "admin")
return await client.model(ResPartner).filter(is_company=True).all()
partners = asyncio.run(async_get_partners())
return jsonify([p.dict() for p in partners])
Best Practices¶
Q: What are the recommended patterns for production applications?¶
A: Production-ready patterns:
# 1. Connection management
class OdooService:
def __init__(self):
self.client = None
self._lock = asyncio.Lock()
async def get_client(self):
async with self._lock:
if not self.client:
self.client = ZenooClient("localhost", port=8069)
await self.client.login("demo", "admin", "admin")
# Setup production features
await self.client.setup_cache_manager(backend="redis")
await self.client.setup_batch_manager()
return self.client
async def close(self):
if self.client:
await self.client.close()
# 2. Error handling and logging
import logging
logger = logging.getLogger(__name__)
async def robust_operation(service: OdooService):
try:
client = await service.get_client()
result = await client.search("res.partner", [])
logger.info(f"Successfully retrieved {len(result)} partners")
return result
except Exception as e:
logger.error(f"Operation failed: {e}", exc_info=True)
# Handle error appropriately
raise
# 3. Configuration management
from dataclasses import dataclass
import os
@dataclass
class OdooConfig:
host: str = os.getenv("ODOO_HOST", "localhost")
port: int = int(os.getenv("ODOO_PORT", "8069"))
database: str = os.getenv("ODOO_DATABASE", "demo")
username: str = os.getenv("ODOO_USERNAME", "admin")
password: str = os.getenv("ODOO_PASSWORD", "admin")
use_ssl: bool = os.getenv("ODOO_SSL", "false").lower() == "true"
# Performance settings
max_connections: int = int(os.getenv("ODOO_MAX_CONNECTIONS", "20"))
timeout: float = float(os.getenv("ODOO_TIMEOUT", "30.0"))
# Cache settings
cache_backend: str = os.getenv("CACHE_BACKEND", "memory")
cache_url: str = os.getenv("CACHE_URL", "redis://localhost:6379/0")
cache_ttl: int = int(os.getenv("CACHE_TTL", "300"))
async def create_configured_client(config: OdooConfig):
client = ZenooClient(
host=config.host,
port=config.port,
use_ssl=config.use_ssl,
max_connections=config.max_connections,
timeout=config.timeout
)
await client.login(config.database, config.username, config.password)
if config.cache_backend == "redis":
await client.setup_cache_manager(
backend="redis",
url=config.cache_url,
default_ttl=config.cache_ttl
)
return client
Q: How do I monitor and debug production issues?¶
A: Production monitoring and debugging:
# 1. Metrics collection
import time
from collections import defaultdict
class MetricsCollector:
def __init__(self):
self.metrics = defaultdict(list)
self.counters = defaultdict(int)
def record_operation(self, operation: str, duration: float, success: bool):
self.metrics[f"{operation}_duration"].append(duration)
self.counters[f"{operation}_total"] += 1
if success:
self.counters[f"{operation}_success"] += 1
else:
self.counters[f"{operation}_error"] += 1
def get_stats(self):
stats = {}
for operation, durations in self.metrics.items():
if durations:
stats[operation] = {
"avg": sum(durations) / len(durations),
"min": min(durations),
"max": max(durations),
"count": len(durations)
}
stats.update(dict(self.counters))
return stats
# 2. Health checks
async def health_check(client: ZenooClient) -> dict:
"""Comprehensive health check."""
health = {"status": "healthy", "checks": {}}
try:
# Test basic connectivity
start = time.time()
version = await client.version()
health["checks"]["connectivity"] = {
"status": "ok",
"response_time": time.time() - start,
"version": version
}
except Exception as e:
health["status"] = "unhealthy"
health["checks"]["connectivity"] = {
"status": "error",
"error": str(e)
}
try:
# Test authentication
start = time.time()
await client.call("common", "authenticate",
client.database, client.username, client.password, {})
health["checks"]["authentication"] = {
"status": "ok",
"response_time": time.time() - start
}
except Exception as e:
health["status"] = "unhealthy"
health["checks"]["authentication"] = {
"status": "error",
"error": str(e)
}
# Test cache if available
if hasattr(client, 'cache_manager') and client.cache_manager:
try:
await client.cache_manager.set("health_check", "ok", ttl=10)
cached = await client.cache_manager.get("health_check")
health["checks"]["cache"] = {
"status": "ok" if cached == "ok" else "error"
}
except Exception as e:
health["checks"]["cache"] = {
"status": "error",
"error": str(e)
}
return health
# 3. Alerting
async def check_and_alert():
"""Monitor system and send alerts."""
health = await health_check(client)
if health["status"] != "healthy":
# Send alert (email, Slack, etc.)
await send_alert(f"Odoo health check failed: {health}")
# Check performance metrics
stats = metrics_collector.get_stats()
avg_response_time = stats.get("search_duration", {}).get("avg", 0)
if avg_response_time > 5.0: # 5 second threshold
await send_alert(f"High response time detected: {avg_response_time:.2f}s")
Next Steps¶
For more detailed information, check out:
- Debugging Guide for detailed troubleshooting techniques
- API Reference for complete API documentation
- Performance Guide for optimization strategies
- Security Guide for production security
- Examples for practical usage examples