Skip to content

Integration Examples

This section provides comprehensive examples of integrating Zenoo RPC with popular frameworks, databases, and external services. Each integration includes complete setup, configuration, and best practices.

Overview

Integration examples cover:

  • FastAPI Integration: REST API development with Zenoo RPC backend
  • Django Integration: Using Zenoo RPC within Django applications
  • Celery Integration: Asynchronous task processing with Odoo data
  • Database Integration: PostgreSQL, Redis, and other database connections
  • Message Queue Integration: RabbitMQ, Apache Kafka integration patterns
  • Monitoring Integration: Prometheus, Grafana, and logging systems

FastAPI Integration

Complete REST API with Zenoo RPC Backend

import asyncio
import logging
from contextlib import asynccontextmanager
from typing import List, Optional, Dict, Any
from datetime import datetime

from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, EmailStr, Field
import uvicorn

from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner
from zenoo_rpc.exceptions import ValidationError, ZenooError
from zenoo_rpc.batch.manager import BatchManager
from zenoo_rpc.transaction.manager import TransactionManager

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Pydantic models for API
class CustomerCreate(BaseModel):
    name: str = Field(..., min_length=2, max_length=100)
    email: EmailStr
    phone: Optional[str] = Field(None, max_length=20)
    website: Optional[str] = None
    is_company: bool = True
    country_code: Optional[str] = Field(None, max_length=2)
    categories: Optional[List[str]] = None

class CustomerResponse(BaseModel):
    id: int
    name: str
    email: str
    phone: Optional[str]
    website: Optional[str]
    is_company: bool
    customer_rank: int
    country: Optional[Dict[str, Any]] = None
    created_at: Optional[datetime] = None

class CustomerUpdate(BaseModel):
    name: Optional[str] = Field(None, min_length=2, max_length=100)
    email: Optional[EmailStr] = None
    phone: Optional[str] = Field(None, max_length=20)
    website: Optional[str] = None
    categories: Optional[List[str]] = None

class BulkCreateRequest(BaseModel):
    customers: List[CustomerCreate]

class BulkCreateResponse(BaseModel):
    successful: List[Dict[str, Any]]
    failed: List[Dict[str, Any]]
    total: int
    success_count: int
    failure_count: int

# Global client instance
zenoo_client: Optional[ZenooClient] = None
batch_manager: Optional[BatchManager] = None
transaction_manager: Optional[TransactionManager] = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan management."""
    global zenoo_client, batch_manager, transaction_manager

    try:
        # Startup
        logger.info("Starting FastAPI application with Zenoo RPC")

        # Initialize Zenoo RPC client
        zenoo_client = ZenooClient("localhost", port=8069, protocol="http")
        await zenoo_client.__aenter__()

        # Authenticate
        await zenoo_client.login("demo", "admin", "admin")

        # Setup cache
        await zenoo_client.cache_manager.setup_memory_cache(
            name="api_cache",
            max_size=1000,
            strategy="ttl"
        )

        # Setup managers
        batch_manager = BatchManager(client=zenoo_client, max_chunk_size=100)
        transaction_manager = TransactionManager(zenoo_client)

        logger.info("Zenoo RPC client initialized successfully")

        yield

    except Exception as e:
        logger.error(f"Failed to initialize Zenoo RPC: {e}")
        raise
    finally:
        # Shutdown
        if zenoo_client:
            await zenoo_client.__aexit__(None, None, None)
        logger.info("FastAPI application shutdown complete")

# Create FastAPI app
app = FastAPI(
    title="Customer Management API",
    description="REST API for customer management using Zenoo RPC",
    version="1.0.0",
    lifespan=lifespan
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Dependency to get Zenoo client
async def get_zenoo_client() -> ZenooClient:
    """Dependency to get the Zenoo RPC client."""
    if not zenoo_client:
        raise HTTPException(status_code=503, detail="Zenoo RPC client not available")
    return zenoo_client

# Exception handlers
@app.exception_handler(ValidationError)
async def validation_exception_handler(request, exc: ValidationError):
    return JSONResponse(
        status_code=400,
        content={"error": "Validation Error", "detail": str(exc)}
    )

@app.exception_handler(ZenooError)
async def zenoo_exception_handler(request, exc: ZenooError):
    return JSONResponse(
        status_code=500,
        content={"error": "Zenoo RPC Error", "detail": str(exc)}
    )

# API Routes
@app.get("/health")
async def health_check():
    """Health check endpoint."""
    try:
        if zenoo_client and zenoo_client.is_authenticated:
            # Test connection with a simple query
            count = await zenoo_client.search_count("res.users", [])
            return {
                "status": "healthy",
                "zenoo_connected": True,
                "user_count": count,
                "timestamp": datetime.utcnow()
            }
        else:
            return {
                "status": "unhealthy",
                "zenoo_connected": False,
                "timestamp": datetime.utcnow()
            }
    except Exception as e:
        return JSONResponse(
            status_code=503,
            content={
                "status": "unhealthy",
                "error": str(e),
                "timestamp": datetime.utcnow().isoformat()
            }
        )

@app.post("/customers", response_model=CustomerResponse)
async def create_customer(
    customer: CustomerCreate,
    client: ZenooClient = Depends(get_zenoo_client)
):
    """Create a new customer."""
    try:
        async with transaction_manager.transaction() as tx:
            # Check for duplicates
            existing = await client.model(ResPartner).filter(
                email=customer.email
            ).first()

            if existing:
                raise HTTPException(
                    status_code=409,
                    detail=f"Customer with email {customer.email} already exists"
                )

            # Prepare customer data
            customer_data = {
                "name": customer.name,
                "email": customer.email,
                "is_company": customer.is_company,
                "customer_rank": 1,
                "supplier_rank": 0
            }

            # Add optional fields
            if customer.phone:
                customer_data["phone"] = customer.phone
            if customer.website:
                customer_data["website"] = customer.website

            # Handle country
            if customer.country_code:
                from zenoo_rpc.models.common import ResCountry
                country = await client.model(ResCountry).filter(
                    code=customer.country_code.upper()
                ).first()
                if country:
                    customer_data["country_id"] = country.id

            # Create customer
            customer_id = await client.create("res.partner", customer_data)

            # Handle categories
            if customer.categories:
                await _assign_categories(client, customer_id, customer.categories)

            # Fetch created customer for response
            created_customer = await client.model(ResPartner).filter(
                id=customer_id
            ).first()

            return await _format_customer_response(created_customer)

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Failed to create customer: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/customers", response_model=List[CustomerResponse])
async def list_customers(
    limit: int = 100,
    offset: int = 0,
    search: Optional[str] = None,
    is_company: Optional[bool] = None,
    country_code: Optional[str] = None,
    client: ZenooClient = Depends(get_zenoo_client)
):
    """List customers with filtering and pagination."""
    try:
        query = client.model(ResPartner)

        # Build filters
        filters = {}

        if search:
            from zenoo_rpc.query.filters import Q
            search_filter = Q(name__ilike=f"%{search}%") | Q(email__ilike=f"%{search}%")
            query = query.filter(search_filter)

        if is_company is not None:
            filters["is_company"] = is_company

        if country_code:
            filters["country_id.code"] = country_code.upper()

        # Apply filters
        if filters:
            query = query.filter(**filters)

        # Apply pagination
        partners = await query.offset(offset).limit(limit).all()

        # Format response
        customers = []
        for partner in partners:
            customer = await _format_customer_response(partner)
            customers.append(customer)

        return customers

    except Exception as e:
        logger.error(f"Failed to list customers: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/customers/{customer_id}", response_model=CustomerResponse)
async def get_customer(
    customer_id: int,
    client: ZenooClient = Depends(get_zenoo_client)
):
    """Get a specific customer by ID."""
    try:
        partner = await client.model(ResPartner).filter(id=customer_id).first()

        if not partner:
            raise HTTPException(status_code=404, detail="Customer not found")

        return await _format_customer_response(partner)

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Failed to get customer {customer_id}: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.put("/customers/{customer_id}", response_model=CustomerResponse)
async def update_customer(
    customer_id: int,
    customer_update: CustomerUpdate,
    client: ZenooClient = Depends(get_zenoo_client)
):
    """Update a customer."""
    try:
        async with transaction_manager.transaction() as tx:
            # Check if customer exists
            existing = await client.model(ResPartner).filter(id=customer_id).first()
            if not existing:
                raise HTTPException(status_code=404, detail="Customer not found")

            # Prepare update data
            update_data = {}

            if customer_update.name is not None:
                update_data["name"] = customer_update.name
            if customer_update.email is not None:
                # Check for email conflicts
                email_conflict = await client.model(ResPartner).filter(
                    email=customer_update.email,
                    id__ne=customer_id
                ).first()
                if email_conflict:
                    raise HTTPException(
                        status_code=409,
                        detail=f"Email {customer_update.email} is already in use"
                    )
                update_data["email"] = customer_update.email

            if customer_update.phone is not None:
                update_data["phone"] = customer_update.phone
            if customer_update.website is not None:
                update_data["website"] = customer_update.website

            # Update customer
            if update_data:
                await client.write("res.partner", [customer_id], update_data)

            # Handle categories
            if customer_update.categories is not None:
                await _assign_categories(client, customer_id, customer_update.categories)

            # Fetch updated customer
            updated_customer = await client.model(ResPartner).filter(
                id=customer_id
            ).first()

            return await _format_customer_response(updated_customer)

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Failed to update customer {customer_id}: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.delete("/customers/{customer_id}")
async def delete_customer(
    customer_id: int,
    client: ZenooClient = Depends(get_zenoo_client)
):
    """Delete a customer."""
    try:
        # Check if customer exists
        existing = await client.model(ResPartner).filter(id=customer_id).first()
        if not existing:
            raise HTTPException(status_code=404, detail="Customer not found")

        # Delete customer
        success = await client.unlink("res.partner", [customer_id])

        if success:
            return {"message": f"Customer {customer_id} deleted successfully"}
        else:
            raise HTTPException(status_code=500, detail="Failed to delete customer")

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Failed to delete customer {customer_id}: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/customers/bulk", response_model=BulkCreateResponse)
async def bulk_create_customers(
    request: BulkCreateRequest,
    background_tasks: BackgroundTasks,
    client: ZenooClient = Depends(get_zenoo_client)
):
    """Bulk create customers."""
    try:
        results = {
            "successful": [],
            "failed": [],
            "total": len(request.customers)
        }

        async with batch_manager.batch() as batch_context:
            # Validate and prepare data
            valid_customers = []

            for i, customer in enumerate(request.customers):
                try:
                    # Check for duplicates
                    existing = await client.model(ResPartner).filter(
                        email=customer.email
                    ).first()

                    if existing:
                        results["failed"].append({
                            "index": i,
                            "customer": customer.dict(),
                            "error": f"Customer with email {customer.email} already exists"
                        })
                        continue

                    # Prepare customer data
                    customer_data = {
                        "name": customer.name,
                        "email": customer.email,
                        "is_company": customer.is_company,
                        "customer_rank": 1,
                        "supplier_rank": 0
                    }

                    if customer.phone:
                        customer_data["phone"] = customer.phone
                    if customer.website:
                        customer_data["website"] = customer.website

                    valid_customers.append({
                        "index": i,
                        "original": customer,
                        "data": customer_data
                    })

                except Exception as e:
                    results["failed"].append({
                        "index": i,
                        "customer": customer.dict(),
                        "error": str(e)
                    })

            # Bulk create
            if valid_customers:
                customer_data_list = [item["data"] for item in valid_customers]

                created_ids = await batch_manager.bulk_create(
                    model="res.partner",
                    records=customer_data_list,
                    chunk_size=50
                )

                # Map results
                for i, item in enumerate(valid_customers):
                    if i < len(created_ids):
                        results["successful"].append({
                            "index": item["index"],
                            "customer": item["original"].dict(),
                            "customer_id": created_ids[i]
                        })
                    else:
                        results["failed"].append({
                            "index": item["index"],
                            "customer": item["original"].dict(),
                            "error": "Bulk creation failed"
                        })

        # Add background task for category assignment
        if results["successful"]:
            background_tasks.add_task(
                _assign_bulk_categories,
                client,
                results["successful"],
                request.customers
            )

        return BulkCreateResponse(
            successful=results["successful"],
            failed=results["failed"],
            total=results["total"],
            success_count=len(results["successful"]),
            failure_count=len(results["failed"])
        )

    except Exception as e:
        logger.error(f"Bulk create failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

# Helper functions
async def _format_customer_response(partner: ResPartner) -> CustomerResponse:
    """Format partner object as CustomerResponse."""
    response_data = {
        "id": partner.id,
        "name": partner.name,
        "email": partner.email,
        "phone": partner.phone,
        "website": partner.website,
        "is_company": partner.is_company,
        "customer_rank": partner.customer_rank,
        "created_at": partner.create_date
    }

    # Add country info if available
    if partner.country_id:
        country = await partner.country_id
        if country:
            response_data["country"] = {
                "id": country.id,
                "name": country.name,
                "code": country.code
            }

    return CustomerResponse(**response_data)

async def _assign_categories(client: ZenooClient, partner_id: int, category_names: List[str]):
    """Assign categories to a partner."""
    try:
        from zenoo_rpc.models.common import ResPartnerCategory

        category_ids = []
        for name in category_names:
            category = await client.model(ResPartnerCategory).filter(name=name).first()

            if not category:
                category_id = await client.create(
                    "res.partner.category",
                    {"name": name}
                )
                category_ids.append(category_id)
            else:
                category_ids.append(category.id)

        if category_ids:
            await client.write(
                "res.partner",
                [partner_id],
                {"category_id": [(6, 0, category_ids)]}
            )

    except Exception as e:
        logger.error(f"Failed to assign categories: {e}")

async def _assign_bulk_categories(
    client: ZenooClient,
    successful_customers: List[Dict[str, Any]],
    original_customers: List[CustomerCreate]
):
    """Background task to assign categories for bulk created customers."""
    try:
        for success_item in successful_customers:
            index = success_item["index"]
            customer_id = success_item["customer_id"]
            original_customer = original_customers[index]

            if original_customer.categories:
                await _assign_categories(client, customer_id, original_customer.categories)

    except Exception as e:
        logger.error(f"Background category assignment failed: {e}")

# Run the application
if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=True,
        log_level="info"
    )

Key Integration Features

1. Lifecycle Management

  • Proper startup/shutdown with lifespan events
  • Resource cleanup and connection management
  • Health check endpoints

2. Error Handling

  • Custom exception handlers for Zenoo RPC errors
  • Proper HTTP status codes
  • Detailed error responses

3. Performance Optimization

  • Connection pooling and caching
  • Background tasks for non-critical operations
  • Batch processing for bulk operations

4. Production Features

  • CORS middleware
  • Request validation with Pydantic
  • Comprehensive logging
  • Transaction management

Next Steps