Testing Guide¶
Comprehensive testing guide for Zenoo RPC covering unit tests, integration tests, performance tests, and testing best practices.
Testing Philosophy¶
Zenoo RPC follows a comprehensive testing strategy:
- Unit Tests: Test individual components in isolation
- Integration Tests: Test component interactions and real Odoo connectivity
- Performance Tests: Validate performance characteristics and benchmarks
- End-to-End Tests: Test complete workflows and user scenarios
- Property-Based Tests: Test with generated data to find edge cases
Test Structure¶
Test Organization¶
tests/
├── __init__.py
├── conftest.py # Pytest configuration and fixtures
├── unit/ # Unit tests
│ ├── __init__.py
│ ├── test_client.py # Client tests
│ ├── test_models.py # Model tests
│ ├── test_query.py # Query builder tests
│ ├── test_cache.py # Cache tests
│ ├── test_batch.py # Batch operation tests
│ └── test_retry.py # Retry mechanism tests
├── integration/ # Integration tests
│ ├── __init__.py
│ ├── test_odoo_integration.py
│ ├── test_cache_backends.py
│ └── test_real_workflows.py
├── performance/ # Performance tests
│ ├── __init__.py
│ ├── test_benchmarks.py
│ └── test_memory_usage.py
├── fixtures/ # Test data and fixtures
│ ├── __init__.py
│ ├── sample_data.py
│ └── mock_responses.py
└── utils/ # Test utilities
├── __init__.py
├── helpers.py
└── assertions.py
Test Configuration¶
# tests/conftest.py
import pytest
import asyncio
import os
from unittest.mock import AsyncMock, MagicMock
from typing import AsyncGenerator, Generator
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner
from zenoo_rpc.transport.httpx_transport import AsyncTransport
# Pytest configuration
pytest_plugins = ["pytest_asyncio"]
@pytest.fixture(scope="session")
def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_transport() -> AsyncMock:
"""Create mock transport for testing."""
transport = AsyncMock(spec=AsyncTransport)
transport.json_rpc_call.return_value = {"result": "success"}
return transport
@pytest.fixture
async def mock_client(mock_transport) -> AsyncGenerator[ZenooClient, None]:
"""Create mock client for testing."""
client = ZenooClient("localhost", port=8069)
client.transport = mock_transport
client.uid = 1
client.database = "test_db"
client.password = "test_password"
yield client
await client.close()
@pytest.fixture
def sample_partner_data() -> dict:
"""Sample partner data for testing."""
return {
"id": 1,
"name": "Test Company",
"is_company": True,
"email": "test@example.com",
"phone": "+1-555-0100",
"active": True
}
# Integration test fixtures
@pytest.fixture(scope="session")
async def real_client() -> AsyncGenerator[ZenooClient, None]:
"""Create real client for integration tests."""
if not os.getenv("ODOO_TEST_URL"):
pytest.skip("No Odoo test server configured")
client = ZenooClient(
host=os.getenv("ODOO_TEST_URL", "localhost"),
port=int(os.getenv("ODOO_TEST_PORT", "8069"))
)
try:
await client.login(
database=os.getenv("ODOO_TEST_DB", "demo"),
username=os.getenv("ODOO_TEST_USER", "admin"),
password=os.getenv("ODOO_TEST_PASSWORD", "admin")
)
yield client
finally:
await client.close()
Unit Testing¶
Client Testing¶
# tests/unit/test_client.py
import pytest
from unittest.mock import AsyncMock, patch
from zenoo_rpc import ZenooClient
from zenoo_rpc.exceptions import AuthenticationError, NetworkError
class TestZenooClient:
"""Test suite for ZenooClient."""
async def test_login_success(self, mock_client, mock_transport):
"""Test successful login."""
mock_transport.json_rpc_call.return_value = 1 # User ID
await mock_client.login("demo", "admin", "admin")
assert mock_client.uid == 1
assert mock_client.database == "demo"
mock_transport.json_rpc_call.assert_called_once_with(
"common", "authenticate", ["demo", "admin", "admin", {}]
)
async def test_login_failure(self, mock_client, mock_transport):
"""Test login failure."""
mock_transport.json_rpc_call.return_value = False
with pytest.raises(AuthenticationError, match="Authentication failed"):
await mock_client.login("demo", "admin", "wrong_password")
@pytest.mark.parametrize("domain,expected_call", [
([], []),
([("name", "=", "Test")], [("name", "=", "Test")]),
([("id", "in", [1, 2, 3])], [("id", "in", [1, 2, 3])]),
])
async def test_search_domains(self, mock_client, mock_transport, domain, expected_call):
"""Test search with different domains."""
mock_transport.json_rpc_call.return_value = [1, 2, 3]
result = await mock_client.search("res.partner", domain)
assert result == [1, 2, 3]
mock_transport.json_rpc_call.assert_called_with(
"object", "execute_kw", [
mock_client.database, mock_client.uid, mock_client.password,
"res.partner", "search", [expected_call]
]
)
async def test_create_record(self, mock_client, mock_transport, sample_partner_data):
"""Test record creation."""
mock_transport.json_rpc_call.return_value = 1 # Created ID
result = await mock_client.create("res.partner", sample_partner_data)
assert result == 1
mock_transport.json_rpc_call.assert_called_with(
"object", "execute_kw", [
mock_client.database, mock_client.uid, mock_client.password,
"res.partner", "create", [sample_partner_data]
]
)
async def test_network_error_handling(self, mock_client, mock_transport):
"""Test network error handling."""
mock_transport.json_rpc_call.side_effect = NetworkError("Connection failed")
with pytest.raises(NetworkError):
await mock_client.search("res.partner", [])
Model Testing¶
# tests/unit/test_models.py
import pytest
from pydantic import ValidationError
from zenoo_rpc.models.common import ResPartner
from zenoo_rpc.models.fields import CharField, EmailField
class TestOdooModels:
"""Test suite for Odoo models."""
def test_model_creation(self, sample_partner_data):
"""Test model instance creation."""
partner = ResPartner(**sample_partner_data)
assert partner.id == 1
assert partner.name == "Test Company"
assert partner.is_company is True
assert partner.email == "test@example.com"
def test_model_validation(self):
"""Test model field validation."""
# Valid data
partner = ResPartner(name="Valid Company", is_company=True)
assert partner.name == "Valid Company"
# Invalid data - missing required field
with pytest.raises(ValidationError):
ResPartner(is_company=True) # Missing name
def test_model_serialization(self, sample_partner_data):
"""Test model serialization."""
partner = ResPartner(**sample_partner_data)
# Test dict conversion
partner_dict = partner.dict()
assert partner_dict["name"] == "Test Company"
assert partner_dict["is_company"] is True
# Test JSON serialization
partner_json = partner.json()
assert "Test Company" in partner_json
assert "true" in partner_json.lower()
def test_custom_field_types(self):
"""Test custom field type validation."""
# Test email field
with pytest.raises(ValidationError):
ResPartner(name="Test", email="invalid-email")
# Valid email
partner = ResPartner(name="Test", email="valid@example.com")
assert partner.email == "valid@example.com"
@pytest.mark.parametrize("field_name,valid_value,invalid_value", [
("name", "Valid Name", ""),
("email", "test@example.com", "invalid-email"),
("phone", "+1-555-0100", None), # Phone can be None
])
def test_field_validation_parametrized(self, field_name, valid_value, invalid_value):
"""Test field validation with parameters."""
# Test valid value
data = {"name": "Test Company"}
if valid_value is not None:
data[field_name] = valid_value
partner = ResPartner(**data)
if valid_value is not None:
assert getattr(partner, field_name) == valid_value
# Test invalid value (if applicable)
if invalid_value is not None and field_name == "name":
with pytest.raises(ValidationError):
ResPartner(**{field_name: invalid_value})
Query Builder Testing¶
# tests/unit/test_query.py
import pytest
from unittest.mock import AsyncMock
from zenoo_rpc.query.builder import QueryBuilder
from zenoo_rpc.models.common import ResPartner
class TestQueryBuilder:
"""Test suite for QueryBuilder."""
@pytest.fixture
def query_builder(self, mock_client):
"""Create query builder for testing."""
return QueryBuilder(ResPartner, mock_client)
def test_filter_building(self, query_builder):
"""Test filter building."""
# Simple filter
query_builder.filter(name="Test")
assert ("name", "=", "Test") in query_builder._domain
# Multiple filters
query_builder.filter(is_company=True, active=True)
assert ("is_company", "=", True) in query_builder._domain
assert ("active", "=", True) in query_builder._domain
def test_complex_filters(self, query_builder):
"""Test complex filter expressions."""
# ilike filter
query_builder.filter(name__ilike="Test%")
assert ("name", "ilike", "Test%") in query_builder._domain
# in filter
query_builder.filter(id__in=[1, 2, 3])
assert ("id", "in", [1, 2, 3]) in query_builder._domain
# gt filter
query_builder.filter(id__gt=10)
assert ("id", ">", 10) in query_builder._domain
def test_limit_and_offset(self, query_builder):
"""Test limit and offset."""
query_builder.limit(10).offset(20)
assert query_builder._limit == 10
assert query_builder._offset == 20
def test_field_selection(self, query_builder):
"""Test field selection."""
query_builder.only("name", "email")
assert query_builder._fields == ["name", "email"]
async def test_query_execution(self, query_builder, mock_client):
"""Test query execution."""
mock_client.search_read.return_value = [
{"id": 1, "name": "Test", "is_company": True}
]
results = await query_builder.filter(is_company=True).all()
assert len(results) == 1
assert isinstance(results[0], ResPartner)
assert results[0].name == "Test"
mock_client.search_read.assert_called_once()
Cache Testing¶
# tests/unit/test_cache.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from zenoo_rpc.cache.backends import MemoryCache, RedisCache
from zenoo_rpc.cache.manager import CacheManager
class TestMemoryCache:
"""Test suite for MemoryCache."""
@pytest.fixture
def memory_cache(self):
"""Create memory cache for testing."""
return MemoryCache(max_size=100, default_ttl=60)
async def test_set_and_get(self, memory_cache):
"""Test basic set and get operations."""
await memory_cache.set("key1", "value1")
result = await memory_cache.get("key1")
assert result == "value1"
async def test_ttl_expiration(self, memory_cache):
"""Test TTL expiration."""
await memory_cache.set("key1", "value1", ttl=1)
# Should exist immediately
result = await memory_cache.get("key1")
assert result == "value1"
# Should expire after TTL
await asyncio.sleep(1.1)
result = await memory_cache.get("key1")
assert result is None
async def test_lru_eviction(self, memory_cache):
"""Test LRU eviction."""
# Fill cache to capacity
for i in range(100):
await memory_cache.set(f"key{i}", f"value{i}")
# Add one more item (should evict oldest)
await memory_cache.set("key100", "value100")
# First item should be evicted
result = await memory_cache.get("key0")
assert result is None
# Last item should exist
result = await memory_cache.get("key100")
assert result == "value100"
class TestCacheManager:
"""Test suite for CacheManager."""
@pytest.fixture
def cache_manager(self):
"""Create cache manager for testing."""
return CacheManager()
async def test_backend_registration(self, cache_manager):
"""Test cache backend registration."""
memory_cache = MemoryCache()
await cache_manager.register_backend("memory", memory_cache)
assert "memory" in cache_manager.backends
assert cache_manager.backends["memory"] == memory_cache
async def test_cache_operations(self, cache_manager):
"""Test cache operations through manager."""
memory_cache = MemoryCache()
await cache_manager.register_backend("memory", memory_cache)
await cache_manager.set_default_backend("memory")
# Test set and get
await cache_manager.set("test_key", "test_value")
result = await cache_manager.get("test_key")
assert result == "test_value"
async def test_cache_stats(self, cache_manager):
"""Test cache statistics."""
memory_cache = MemoryCache()
await cache_manager.register_backend("memory", memory_cache)
await cache_manager.set_default_backend("memory")
# Perform operations
await cache_manager.set("key1", "value1")
await cache_manager.get("key1") # Hit
await cache_manager.get("key2") # Miss
stats = await cache_manager.get_stats()
assert stats["hits"] >= 1
assert stats["misses"] >= 1
Integration Testing¶
Real Odoo Integration¶
# tests/integration/test_odoo_integration.py
import pytest
import os
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner
# Skip if no test server
pytestmark = pytest.mark.skipif(
not os.getenv("ODOO_TEST_URL"),
reason="No Odoo test server configured"
)
class TestOdooIntegration:
"""Integration tests with real Odoo server."""
async def test_authentication(self, real_client):
"""Test authentication with real server."""
assert real_client.uid is not None
assert real_client.database is not None
# Test version call
version = await real_client.version()
assert "server_version" in version
async def test_basic_crud_operations(self, real_client):
"""Test basic CRUD operations."""
# Create
partner_data = {
"name": f"Test Partner {asyncio.get_event_loop().time()}",
"is_company": True,
"email": "test@example.com"
}
partner_id = await real_client.create("res.partner", partner_data)
assert isinstance(partner_id, int)
# Read
partner = await real_client.read("res.partner", [partner_id])
assert len(partner) == 1
assert partner[0]["name"] == partner_data["name"]
# Update
await real_client.write("res.partner", [partner_id], {"phone": "+1-555-0100"})
updated_partner = await real_client.read("res.partner", [partner_id])
assert updated_partner[0]["phone"] == "+1-555-0100"
# Delete
await real_client.unlink("res.partner", [partner_id])
# Verify deletion
deleted_partner = await real_client.read("res.partner", [partner_id])
assert len(deleted_partner) == 0
async def test_model_query_builder(self, real_client):
"""Test model query builder with real data."""
partners = await (
real_client.model(ResPartner)
.filter(is_company=True)
.limit(5)
.only("id", "name", "email")
.all()
)
assert isinstance(partners, list)
assert len(partners) <= 5
assert all(isinstance(p, ResPartner) for p in partners)
assert all(hasattr(p, "name") for p in partners)
async def test_batch_operations(self, real_client):
"""Test batch operations with real server."""
# Prepare test data
partner_data = [
{"name": f"Batch Partner {i}", "is_company": True}
for i in range(10)
]
# Batch create
async with real_client.batch_context() as batch:
batch.create("res.partner", partner_data)
results = await batch.execute()
created_ids = results[0].result
assert len(created_ids) == 10
assert all(isinstance(id_, int) for id_ in created_ids)
# Cleanup
await real_client.unlink("res.partner", created_ids)
Performance Testing¶
Benchmark Tests¶
# tests/performance/test_benchmarks.py
import pytest
import time
import asyncio
from zenoo_rpc import ZenooClient
class TestPerformanceBenchmarks:
"""Performance benchmark tests."""
@pytest.mark.performance
async def test_search_performance(self, real_client):
"""Benchmark search operations."""
iterations = 100
start_time = time.time()
for _ in range(iterations):
await real_client.search("res.partner", [], limit=10)
end_time = time.time()
total_time = end_time - start_time
avg_time = total_time / iterations
print(f"Search performance: {avg_time:.3f}s per operation")
assert avg_time < 0.1 # Should be under 100ms per search
@pytest.mark.performance
async def test_concurrent_operations(self, real_client):
"""Test concurrent operation performance."""
async def search_operation():
return await real_client.search("res.partner", [], limit=5)
# Run 10 concurrent searches
start_time = time.time()
tasks = [search_operation() for _ in range(10)]
results = await asyncio.gather(*tasks)
end_time = time.time()
total_time = end_time - start_time
print(f"Concurrent operations: {total_time:.3f}s for 10 operations")
assert len(results) == 10
assert total_time < 2.0 # Should complete within 2 seconds
@pytest.mark.performance
async def test_cache_performance(self, real_client):
"""Test cache performance impact."""
# Setup cache
await real_client.setup_cache_manager(backend="memory")
# First query (no cache)
start_time = time.time()
result1 = await (
real_client.model(ResPartner)
.filter(is_company=True)
.limit(10)
.cache(ttl=300)
.all()
)
first_query_time = time.time() - start_time
# Second query (cached)
start_time = time.time()
result2 = await (
real_client.model(ResPartner)
.filter(is_company=True)
.limit(10)
.cache(ttl=300)
.all()
)
cached_query_time = time.time() - start_time
print(f"First query: {first_query_time:.3f}s")
print(f"Cached query: {cached_query_time:.3f}s")
print(f"Cache speedup: {first_query_time / cached_query_time:.1f}x")
assert len(result1) == len(result2)
assert cached_query_time < first_query_time # Cache should be faster
Memory Usage Tests¶
# tests/performance/test_memory_usage.py
import pytest
import tracemalloc
import gc
from zenoo_rpc import ZenooClient
class TestMemoryUsage:
"""Memory usage tests."""
@pytest.mark.performance
async def test_memory_leak_detection(self, real_client):
"""Test for memory leaks in repeated operations."""
tracemalloc.start()
# Take initial snapshot
snapshot1 = tracemalloc.take_snapshot()
# Perform many operations
for _ in range(100):
await real_client.search("res.partner", [], limit=1)
# Force garbage collection
gc.collect()
# Take final snapshot
snapshot2 = tracemalloc.take_snapshot()
# Compare snapshots
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
# Check for significant memory growth
total_growth = sum(stat.size_diff for stat in top_stats)
print(f"Total memory growth: {total_growth / 1024 / 1024:.2f} MB")
# Should not grow more than 10MB for 100 operations
assert total_growth < 10 * 1024 * 1024
@pytest.mark.performance
async def test_connection_pool_memory(self):
"""Test connection pool memory usage."""
tracemalloc.start()
# Create multiple clients
clients = []
for _ in range(10):
client = ZenooClient("localhost", port=8069)
clients.append(client)
snapshot1 = tracemalloc.take_snapshot()
# Close all clients
for client in clients:
await client.close()
gc.collect()
snapshot2 = tracemalloc.take_snapshot()
# Memory should be released
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
total_change = sum(stat.size_diff for stat in top_stats)
print(f"Memory change after cleanup: {total_change / 1024:.2f} KB")
# Should release most memory
assert abs(total_change) < 1024 * 1024 # Less than 1MB difference
Test Utilities¶
Custom Assertions¶
# tests/utils/assertions.py
import pytest
from typing import Any, List, Dict
def assert_valid_partner(partner_data: Dict[str, Any]):
"""Assert that partner data is valid."""
assert "id" in partner_data
assert "name" in partner_data
assert isinstance(partner_data["id"], int)
assert isinstance(partner_data["name"], str)
assert len(partner_data["name"]) > 0
def assert_rpc_call_made(mock_transport, service: str, method: str, *args):
"""Assert that specific RPC call was made."""
mock_transport.json_rpc_call.assert_called_with(service, method, list(args))
def assert_performance_threshold(duration: float, threshold: float, operation: str):
"""Assert that operation completed within performance threshold."""
assert duration < threshold, f"{operation} took {duration:.3f}s, expected < {threshold}s"
class AsyncContextManager:
"""Helper for testing async context managers."""
def __init__(self, async_cm):
self.async_cm = async_cm
self.result = None
async def __aenter__(self):
self.result = await self.async_cm.__aenter__()
return self.result
async def __aexit__(self, *args):
return await self.async_cm.__aexit__(*args)
Test Data Generators¶
# tests/fixtures/sample_data.py
import random
import string
from typing import Dict, List, Any
def generate_partner_data(count: int = 1) -> List[Dict[str, Any]]:
"""Generate sample partner data for testing."""
partners = []
for i in range(count):
partner = {
"name": f"Test Company {i}",
"is_company": random.choice([True, False]),
"email": f"test{i}@example.com",
"phone": f"+1-555-{random.randint(1000, 9999)}",
"active": True
}
if partner["is_company"]:
partner["website"] = f"https://company{i}.example.com"
partners.append(partner)
return partners
def generate_random_string(length: int = 10) -> str:
"""Generate random string for testing."""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def generate_large_dataset(size: int = 1000) -> List[Dict[str, Any]]:
"""Generate large dataset for performance testing."""
return [
{
"name": f"Partner {i}",
"ref": generate_random_string(8),
"is_company": i % 3 == 0,
"active": True
}
for i in range(size)
]
Running Tests¶
Test Commands¶
# Run all tests
pytest
# Run specific test categories
pytest tests/unit/ # Unit tests only
pytest tests/integration/ # Integration tests only
pytest tests/performance/ # Performance tests only
# Run with coverage
pytest --cov=zenoo_rpc --cov-report=html --cov-report=term-missing
# Run with performance markers
pytest -m performance
# Run tests in parallel
pytest -n auto
# Run with verbose output
pytest -v -s
# Run specific test file
pytest tests/unit/test_client.py
# Run specific test method
pytest tests/unit/test_client.py::TestZenooClient::test_login_success
Environment Variables for Testing¶
# Integration test configuration
export ODOO_TEST_URL="http://localhost:8069"
export ODOO_TEST_DB="demo"
export ODOO_TEST_USER="admin"
export ODOO_TEST_PASSWORD="admin"
# Performance test configuration
export PERFORMANCE_TESTS=true
export BENCHMARK_ITERATIONS=100
# Cache test configuration
export REDIS_TEST_URL="redis://localhost:6379/1"
Continuous Integration¶
# .github/workflows/test.yml
name: Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, 3.9, "3.10", "3.11"]
services:
redis:
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[dev,redis]"
- name: Run tests
run: |
pytest --cov=zenoo_rpc --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
Best Practices¶
1. Test Organization¶
- Keep tests focused and independent
- Use descriptive test names
- Group related tests in classes
- Use fixtures for common setup
2. Mocking Strategy¶
- Mock external dependencies (HTTP calls, databases)
- Use real objects for unit tests when possible
- Mock at the right level (transport, not client methods)
3. Performance Testing¶
- Set realistic performance thresholds
- Test with representative data sizes
- Monitor memory usage and leaks
- Use profiling for optimization
4. Integration Testing¶
- Test with real Odoo instances when possible
- Use test databases for integration tests
- Clean up test data after tests
- Test error conditions and edge cases
Next Steps¶
- Review Documentation Guide for documentation testing
- Check Development Guide for development setup
- Explore Release Process for release testing
- Learn about CI/CD pipeline configuration