Sales Dashboard with Real-Time Analytics¶
A comprehensive example showing how to build a real-time sales dashboard using Zenoo RPC with advanced querying, caching, and data aggregation.
Overview¶
This example demonstrates building a sales dashboard that provides:
- Real-time sales metrics and KPIs
- Customer analytics and segmentation
- Performance tracking with caching
- Automated report generation
- Interactive data visualization support
Complete Implementation¶
Sales Analytics Service¶
import asyncio
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, date, timedelta
from decimal import Decimal
from dataclasses import dataclass
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner, SaleOrder
from zenoo_rpc.query.filters import Q
from zenoo_rpc.cache.strategies import TTLCache
@dataclass
class SalesMetrics:
"""Sales metrics data structure."""
total_revenue: Decimal
total_orders: int
average_order_value: Decimal
new_customers: int
conversion_rate: float
period_start: date
period_end: date
@dataclass
class CustomerSegment:
"""Customer segment analytics."""
segment_name: str
customer_count: int
total_revenue: Decimal
average_revenue_per_customer: Decimal
top_customers: List[Dict[str, Any]]
class SalesDashboard:
"""Sales dashboard service with real-time analytics."""
def __init__(self, client: ZenooClient):
self.client = client
# Setup caching for performance
self.cache = TTLCache(max_size=1000, default_ttl=300) # 5 minutes
async def get_sales_overview(
self,
start_date: date,
end_date: date
) -> SalesMetrics:
"""Get comprehensive sales overview for the period."""
# Cache key for this query
cache_key = f"sales_overview_{start_date}_{end_date}"
# Try to get from cache first
cached_result = await self.cache.get(cache_key)
if cached_result:
return SalesMetrics(**cached_result)
# Build date filter
date_filter = Q(date_order__gte=start_date) & Q(date_order__lte=end_date)
# Get all sales orders for the period
orders = await (
self.client.model(SaleOrder)
.filter(date_filter & Q(state__in=["sale", "done"]))
.only("amount_total", "partner_id", "date_order", "state")
.all()
)
# Calculate metrics
total_revenue = sum(order.amount_total for order in orders)
total_orders = len(orders)
average_order_value = total_revenue / total_orders if total_orders > 0 else Decimal('0')
# Get new customers in this period
new_customers_count = await self._get_new_customers_count(start_date, end_date)
# Calculate conversion rate (simplified)
conversion_rate = await self._calculate_conversion_rate(start_date, end_date)
metrics = SalesMetrics(
total_revenue=total_revenue,
total_orders=total_orders,
average_order_value=average_order_value,
new_customers=new_customers_count,
conversion_rate=conversion_rate,
period_start=start_date,
period_end=end_date
)
# Cache the result
await self.cache.set(cache_key, metrics.__dict__)
return metrics
async def get_top_customers(
self,
start_date: date,
end_date: date,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get top customers by revenue for the period."""
cache_key = f"top_customers_{start_date}_{end_date}_{limit}"
cached_result = await self.cache.get(cache_key)
if cached_result:
return cached_result
# Get sales orders with customer info
orders = await (
self.client.model(SaleOrder)
.filter(
Q(date_order__gte=start_date) &
Q(date_order__lte=end_date) &
Q(state__in=["sale", "done"])
)
.only("partner_id", "amount_total")
.all()
)
# Aggregate by customer
customer_revenue = {}
for order in orders:
partner_id = order.partner_id.id if hasattr(order.partner_id, 'id') else order.partner_id
if partner_id not in customer_revenue:
customer_revenue[partner_id] = Decimal('0')
customer_revenue[partner_id] += order.amount_total
# Get top customers
top_customer_ids = sorted(
customer_revenue.keys(),
key=lambda x: customer_revenue[x],
reverse=True
)[:limit]
# Get customer details
top_customers = []
if top_customer_ids:
customers = await (
self.client.model(ResPartner)
.filter(Q(id__in=top_customer_ids))
.only("name", "email", "is_company")
.all()
)
customer_dict = {c.id: c for c in customers}
for customer_id in top_customer_ids:
customer = customer_dict.get(customer_id)
if customer:
top_customers.append({
"id": customer_id,
"name": customer.name,
"email": customer.email,
"is_company": customer.is_company,
"total_revenue": float(customer_revenue[customer_id]),
"order_count": len([o for o in orders if (
o.partner_id.id if hasattr(o.partner_id, 'id') else o.partner_id
) == customer_id])
})
# Cache the result
await self.cache.set(cache_key, top_customers)
return top_customers
async def get_sales_by_period(
self,
start_date: date,
end_date: date,
period_type: str = "daily"
) -> List[Dict[str, Any]]:
"""Get sales data grouped by time period."""
cache_key = f"sales_by_period_{start_date}_{end_date}_{period_type}"
cached_result = await self.cache.get(cache_key)
if cached_result:
return cached_result
# Get all orders for the period
orders = await (
self.client.model(SaleOrder)
.filter(
Q(date_order__gte=start_date) &
Q(date_order__lte=end_date) &
Q(state__in=["sale", "done"])
)
.only("date_order", "amount_total")
.order_by("date_order")
.all()
)
# Group by period
period_data = {}
for order in orders:
order_date = order.date_order.date() if isinstance(order.date_order, datetime) else order.date_order
if period_type == "daily":
period_key = order_date.strftime("%Y-%m-%d")
elif period_type == "weekly":
# Get Monday of the week
monday = order_date - timedelta(days=order_date.weekday())
period_key = monday.strftime("%Y-%m-%d")
elif period_type == "monthly":
period_key = order_date.strftime("%Y-%m")
else:
period_key = order_date.strftime("%Y-%m-%d")
if period_key not in period_data:
period_data[period_key] = {
"period": period_key,
"revenue": Decimal('0'),
"order_count": 0
}
period_data[period_key]["revenue"] += order.amount_total
period_data[period_key]["order_count"] += 1
# Convert to list and sort
result = list(period_data.values())
result.sort(key=lambda x: x["period"])
# Convert Decimal to float for JSON serialization
for item in result:
item["revenue"] = float(item["revenue"])
# Cache the result
await self.cache.set(cache_key, result)
return result
async def get_customer_segments(
self,
start_date: date,
end_date: date
) -> List[CustomerSegment]:
"""Analyze customer segments based on purchase behavior."""
cache_key = f"customer_segments_{start_date}_{end_date}"
cached_result = await self.cache.get(cache_key)
if cached_result:
return [CustomerSegment(**segment) for segment in cached_result]
# Get customer revenue data
top_customers = await self.get_top_customers(start_date, end_date, limit=1000)
if not top_customers:
return []
# Define segments based on revenue
revenues = [c["total_revenue"] for c in top_customers]
revenues.sort(reverse=True)
# Calculate percentiles
total_customers = len(revenues)
high_value_threshold = revenues[int(total_customers * 0.1)] if total_customers > 10 else revenues[0]
medium_value_threshold = revenues[int(total_customers * 0.3)] if total_customers > 3 else revenues[-1]
# Segment customers
segments = {
"High Value": [],
"Medium Value": [],
"Low Value": []
}
for customer in top_customers:
revenue = customer["total_revenue"]
if revenue >= high_value_threshold:
segments["High Value"].append(customer)
elif revenue >= medium_value_threshold:
segments["Medium Value"].append(customer)
else:
segments["Low Value"].append(customer)
# Create segment objects
segment_objects = []
for segment_name, customers in segments.items():
if customers:
total_revenue = sum(c["total_revenue"] for c in customers)
avg_revenue = total_revenue / len(customers)
top_5 = customers[:5] # Top 5 customers in segment
segment_objects.append(CustomerSegment(
segment_name=segment_name,
customer_count=len(customers),
total_revenue=Decimal(str(total_revenue)),
average_revenue_per_customer=Decimal(str(avg_revenue)),
top_customers=top_5
))
# Cache the result
cache_data = [
{
"segment_name": s.segment_name,
"customer_count": s.customer_count,
"total_revenue": float(s.total_revenue),
"average_revenue_per_customer": float(s.average_revenue_per_customer),
"top_customers": s.top_customers
}
for s in segment_objects
]
await self.cache.set(cache_key, cache_data)
return segment_objects
async def generate_dashboard_data(
self,
start_date: date,
end_date: date
) -> Dict[str, Any]:
"""Generate complete dashboard data efficiently using batch operations."""
# Use asyncio.gather for concurrent execution
results = await asyncio.gather(
self.get_sales_overview(start_date, end_date),
self.get_top_customers(start_date, end_date, limit=10),
self.get_sales_by_period(start_date, end_date, "daily"),
self.get_customer_segments(start_date, end_date),
return_exceptions=True
)
# Handle any exceptions
sales_overview = results[0] if not isinstance(results[0], Exception) else None
top_customers = results[1] if not isinstance(results[1], Exception) else []
sales_by_period = results[2] if not isinstance(results[2], Exception) else []
customer_segments = results[3] if not isinstance(results[3], Exception) else []
return {
"overview": sales_overview.__dict__ if sales_overview else None,
"top_customers": top_customers,
"sales_by_period": sales_by_period,
"customer_segments": [
{
"segment_name": s.segment_name,
"customer_count": s.customer_count,
"total_revenue": float(s.total_revenue),
"average_revenue_per_customer": float(s.average_revenue_per_customer),
"top_customers": s.top_customers[:3] # Limit for dashboard
}
for s in customer_segments
],
"generated_at": datetime.now().isoformat(),
"cache_info": {
"hit_ratio": getattr(self.cache, 'hit_ratio', 0),
"size": getattr(self.cache, 'size', 0)
}
}
async def _get_new_customers_count(self, start_date: date, end_date: date) -> int:
"""Get count of new customers in the period."""
new_customers = await (
self.client.model(ResPartner)
.filter(
Q(create_date__gte=start_date) &
Q(create_date__lte=end_date) &
Q(customer_rank__gt=0)
)
.count()
)
return new_customers
async def _calculate_conversion_rate(self, start_date: date, end_date: date) -> float:
"""Calculate conversion rate (simplified)."""
# This is a simplified calculation
# In reality, you'd track leads, opportunities, etc.
total_orders = await (
self.client.model(SaleOrder)
.filter(
Q(date_order__gte=start_date) &
Q(date_order__lte=end_date)
)
.count()
)
confirmed_orders = await (
self.client.model(SaleOrder)
.filter(
Q(date_order__gte=start_date) &
Q(date_order__lte=end_date) &
Q(state__in=["sale", "done"])
)
.count()
)
return (confirmed_orders / total_orders * 100) if total_orders > 0 else 0.0
# Usage Example
async def main():
"""Demonstrate sales dashboard functionality."""
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "admin")
# Initialize dashboard
dashboard = SalesDashboard(client)
# Set date range (last 30 days)
end_date = date.today()
start_date = end_date - timedelta(days=30)
print(f"Generating sales dashboard for {start_date} to {end_date}")
# Generate complete dashboard data
dashboard_data = await dashboard.generate_dashboard_data(start_date, end_date)
# Display results
if dashboard_data["overview"]:
overview = dashboard_data["overview"]
print(f"\n📊 Sales Overview:")
print(f" Total Revenue: ${overview['total_revenue']:,.2f}")
print(f" Total Orders: {overview['total_orders']:,}")
print(f" Average Order Value: ${overview['average_order_value']:,.2f}")
print(f" New Customers: {overview['new_customers']:,}")
print(f" Conversion Rate: {overview['conversion_rate']:.1f}%")
# Top customers
print(f"\n🏆 Top Customers:")
for i, customer in enumerate(dashboard_data["top_customers"][:5], 1):
print(f" {i}. {customer['name']} - ${customer['total_revenue']:,.2f}")
# Customer segments
print(f"\n👥 Customer Segments:")
for segment in dashboard_data["customer_segments"]:
print(f" {segment['segment_name']}: {segment['customer_count']} customers, "
f"${segment['total_revenue']:,.2f} revenue")
# Cache performance
cache_info = dashboard_data["cache_info"]
print(f"\n⚡ Cache Performance:")
print(f" Hit Ratio: {cache_info['hit_ratio']:.1%}")
print(f" Cache Size: {cache_info['size']} items")
if __name__ == "__main__":
asyncio.run(main())
Key Features Demonstrated¶
1. Real-Time Analytics¶
- Concurrent data fetching with
asyncio.gather - Efficient caching with TTL strategies
- Performance monitoring and optimization
2. Advanced Querying¶
- Complex date range filtering
- Aggregation and grouping
- Customer segmentation logic
3. Caching Strategy¶
- TTL-based caching for performance
- Cache hit ratio monitoring
- Strategic cache key design
4. Data Structures¶
- Type-safe data classes
- Structured metrics and segments
- JSON-serializable outputs
5. Business Intelligence¶
- Customer lifetime value analysis
- Sales trend analysis
- Performance KPIs
Integration Examples¶
Web Dashboard (FastAPI)¶
from fastapi import FastAPI, Depends
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/api/dashboard")
async def get_dashboard(
start_date: date = Query(...),
end_date: date = Query(...),
dashboard: SalesDashboard = Depends(get_dashboard_service)
):
data = await dashboard.generate_dashboard_data(start_date, end_date)
return JSONResponse(data)
Scheduled Reports¶
import schedule
import time
async def generate_daily_report():
"""Generate and email daily sales report."""
dashboard = SalesDashboard(client)
yesterday = date.today() - timedelta(days=1)
data = await dashboard.generate_dashboard_data(yesterday, yesterday)
# Send email with data
schedule.every().day.at("08:00").do(generate_daily_report)
Next Steps¶
- Performance Metrics - Monitor dashboard performance
- Custom Reports - Build custom report generators
- Data Visualization - Create interactive charts