Skip to content

Data Visualization and Interactive Dashboards

A comprehensive example showing how to create interactive data visualizations and dashboards using Zenoo RPC data with popular visualization libraries like Plotly, Matplotlib, and Chart.js.

Overview

This example demonstrates:

  • Real-time data visualization with Plotly
  • Interactive dashboards with Dash
  • Chart generation for web applications
  • Performance monitoring visualizations
  • Export capabilities for presentations
  • Integration with business intelligence tools

Complete Implementation

Data Visualization Service

import asyncio
import json
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, date, timedelta
from dataclasses import dataclass
import base64
from io import BytesIO

# Visualization libraries
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import seaborn as sns

from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner, SaleOrder
from zenoo_rpc.query.filters import Q

@dataclass
class ChartConfig:
    """Chart configuration settings."""
    chart_type: str  # line, bar, pie, scatter, heatmap
    title: str
    x_axis_label: str = ""
    y_axis_label: str = ""
    color_scheme: str = "viridis"
    width: int = 800
    height: int = 600
    interactive: bool = True

class DataVisualizationService:
    """Service for creating data visualizations from Zenoo RPC data."""

    def __init__(self, client: ZenooClient):
        self.client = client
        self.color_schemes = {
            "business": ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd"],
            "modern": ["#3498db", "#e74c3c", "#2ecc71", "#f39c12", "#9b59b6"],
            "pastel": ["#a8e6cf", "#dcedc1", "#ffd3a5", "#ffa8a8", "#c7ceea"]
        }

    async def create_sales_dashboard(
        self, 
        start_date: date, 
        end_date: date
    ) -> Dict[str, Any]:
        """Create comprehensive sales dashboard with multiple visualizations."""

        # Get sales data
        sales_data = await self._get_sales_data(start_date, end_date)
        customer_data = await self._get_customer_data()

        # Create multiple charts
        charts = {}

        # 1. Sales trend over time
        charts["sales_trend"] = await self._create_sales_trend_chart(sales_data)

        # 2. Top customers bar chart
        charts["top_customers"] = await self._create_top_customers_chart(sales_data)

        # 3. Sales by customer type pie chart
        charts["customer_type_pie"] = await self._create_customer_type_pie(customer_data)

        # 4. Monthly revenue heatmap
        charts["revenue_heatmap"] = await self._create_revenue_heatmap(sales_data)

        # 5. Performance metrics gauge
        charts["performance_gauge"] = await self._create_performance_gauge(sales_data)

        return {
            "dashboard_title": f"Sales Dashboard ({start_date} to {end_date})",
            "generated_at": datetime.now().isoformat(),
            "charts": charts,
            "summary_stats": self._calculate_summary_stats(sales_data)
        }

    async def create_customer_analytics_charts(self) -> Dict[str, Any]:
        """Create customer analytics visualizations."""

        # Get customer data with sales information
        customers = await (
            self.client.model(ResPartner)
            .filter(Q(customer_rank__gt=0))
            .only("name", "email", "is_company", "create_date", "country_id")
            .all()
        )

        # Get sales data for customers
        customer_sales = await self._get_customer_sales_data([c.id for c in customers])

        charts = {}

        # 1. Customer acquisition over time
        charts["acquisition_trend"] = self._create_customer_acquisition_chart(customers)

        # 2. Customer value distribution
        charts["value_distribution"] = self._create_customer_value_distribution(customer_sales)

        # 3. Geographic distribution
        charts["geographic_map"] = self._create_geographic_distribution(customers)

        # 4. Customer segmentation scatter plot
        charts["segmentation_scatter"] = self._create_customer_segmentation_scatter(
            customers, customer_sales
        )

        return {
            "dashboard_title": "Customer Analytics",
            "generated_at": datetime.now().isoformat(),
            "charts": charts,
            "total_customers": len(customers)
        }

    async def create_performance_monitoring_dashboard(self) -> Dict[str, Any]:
        """Create performance monitoring visualizations."""

        # Simulate performance metrics (in real implementation, get from monitoring system)
        performance_data = await self._get_performance_metrics()

        charts = {}

        # 1. Response time trend
        charts["response_time"] = self._create_response_time_chart(performance_data)

        # 2. Error rate monitoring
        charts["error_rate"] = self._create_error_rate_chart(performance_data)

        # 3. Cache hit ratio gauge
        charts["cache_performance"] = self._create_cache_performance_gauge(performance_data)

        # 4. Database query performance
        charts["query_performance"] = self._create_query_performance_chart(performance_data)

        return {
            "dashboard_title": "Performance Monitoring",
            "generated_at": datetime.now().isoformat(),
            "charts": charts,
            "system_health": self._assess_system_health(performance_data)
        }

    def create_interactive_web_dashboard(self, dashboard_data: Dict[str, Any]) -> str:
        """Create interactive web dashboard using Dash."""

        # This would create a Dash application
        # For this example, we'll return HTML with embedded Plotly charts

        html_template = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>{title}</title>
            <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .chart-container {{ margin: 20px 0; }}
                .summary-stats {{ 
                    background: #f8f9fa; 
                    padding: 20px; 
                    border-radius: 8px; 
                    margin: 20px 0; 
                }}
            </style>
        </head>
        <body>
            <h1>{title}</h1>
            <div class="summary-stats">
                <h3>Summary Statistics</h3>
                {summary_html}
            </div>
            {charts_html}
        </body>
        </html>
        """

        # Generate summary HTML
        summary_html = ""
        if "summary_stats" in dashboard_data:
            stats = dashboard_data["summary_stats"]
            summary_html = "<ul>"
            for key, value in stats.items():
                summary_html += f"<li><strong>{key.replace('_', ' ').title()}:</strong> {value}</li>"
            summary_html += "</ul>"

        # Generate charts HTML
        charts_html = ""
        for chart_name, chart_data in dashboard_data.get("charts", {}).items():
            charts_html += f"""
            <div class="chart-container">
                <div id="{chart_name}"></div>
                <script>
                    Plotly.newPlot('{chart_name}', {json.dumps(chart_data['data'])}, 
                                   {json.dumps(chart_data['layout'])});
                </script>
            </div>
            """

        return html_template.format(
            title=dashboard_data.get("dashboard_title", "Dashboard"),
            summary_html=summary_html,
            charts_html=charts_html
        )

    async def export_charts_to_images(
        self, 
        charts: Dict[str, Any], 
        output_dir: str = "/tmp"
    ) -> Dict[str, str]:
        """Export charts as image files for presentations."""

        exported_files = {}

        for chart_name, chart_data in charts.items():
            try:
                # Create Plotly figure
                fig = go.Figure(data=chart_data["data"], layout=chart_data["layout"])

                # Export as PNG
                img_bytes = fig.to_image(format="png", width=800, height=600)

                # Save to file
                filename = f"{output_dir}/{chart_name}.png"
                with open(filename, "wb") as f:
                    f.write(img_bytes)

                exported_files[chart_name] = filename

            except Exception as e:
                print(f"Error exporting chart {chart_name}: {e}")

        return exported_files

    async def _get_sales_data(self, start_date: date, end_date: date) -> List[Dict[str, Any]]:
        """Get sales data for the specified period."""

        orders = await (
            self.client.model(SaleOrder)
            .filter(
                Q(date_order__gte=start_date) & 
                Q(date_order__lte=end_date) &
                Q(state__in=["sale", "done"])
            )
            .only("date_order", "amount_total", "partner_id", "state")
            .order_by("date_order")
            .all()
        )

        # Convert to list of dictionaries for easier processing
        sales_data = []
        for order in orders:
            sales_data.append({
                "date": order.date_order.date() if isinstance(order.date_order, datetime) else order.date_order,
                "amount": float(order.amount_total),
                "partner_id": order.partner_id.id if hasattr(order.partner_id, 'id') else order.partner_id,
                "state": order.state
            })

        return sales_data

    async def _get_customer_data(self) -> List[Dict[str, Any]]:
        """Get customer data for analysis."""

        customers = await (
            self.client.model(ResPartner)
            .filter(Q(customer_rank__gt=0))
            .only("name", "is_company", "create_date", "country_id")
            .all()
        )

        customer_data = []
        for customer in customers:
            customer_data.append({
                "id": customer.id,
                "name": customer.name,
                "is_company": customer.is_company,
                "create_date": customer.create_date.date() if isinstance(customer.create_date, datetime) else customer.create_date,
                "country_id": getattr(customer, 'country_id', None)
            })

        return customer_data

    async def _create_sales_trend_chart(self, sales_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create sales trend line chart."""

        # Group by date
        daily_sales = {}
        for sale in sales_data:
            date_str = sale["date"].strftime("%Y-%m-%d")
            if date_str not in daily_sales:
                daily_sales[date_str] = 0
            daily_sales[date_str] += sale["amount"]

        # Sort by date
        sorted_dates = sorted(daily_sales.keys())
        amounts = [daily_sales[date] for date in sorted_dates]

        # Create Plotly chart
        trace = go.Scatter(
            x=sorted_dates,
            y=amounts,
            mode='lines+markers',
            name='Daily Sales',
            line=dict(color='#3498db', width=3),
            marker=dict(size=6)
        )

        layout = go.Layout(
            title="Sales Trend Over Time",
            xaxis=dict(title="Date"),
            yaxis=dict(title="Sales Amount ($)"),
            hovermode='x unified'
        )

        return {
            "data": [trace],
            "layout": layout
        }

    async def _create_top_customers_chart(self, sales_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create top customers bar chart."""

        # Group by customer
        customer_sales = {}
        for sale in sales_data:
            partner_id = sale["partner_id"]
            if partner_id not in customer_sales:
                customer_sales[partner_id] = 0
            customer_sales[partner_id] += sale["amount"]

        # Get top 10 customers
        top_customers = sorted(customer_sales.items(), key=lambda x: x[1], reverse=True)[:10]

        # Get customer names (simplified - in real implementation, fetch from database)
        customer_names = [f"Customer {cid}" for cid, _ in top_customers]
        amounts = [amount for _, amount in top_customers]

        trace = go.Bar(
            x=customer_names,
            y=amounts,
            marker=dict(color='#e74c3c'),
            name='Revenue'
        )

        layout = go.Layout(
            title="Top 10 Customers by Revenue",
            xaxis=dict(title="Customer"),
            yaxis=dict(title="Revenue ($)"),
            xaxis_tickangle=-45
        )

        return {
            "data": [trace],
            "layout": layout
        }

    def _create_customer_type_pie(self, customer_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create customer type pie chart."""

        companies = sum(1 for c in customer_data if c["is_company"])
        individuals = len(customer_data) - companies

        trace = go.Pie(
            labels=["Companies", "Individuals"],
            values=[companies, individuals],
            marker=dict(colors=["#2ecc71", "#f39c12"])
        )

        layout = go.Layout(
            title="Customer Distribution by Type"
        )

        return {
            "data": [trace],
            "layout": layout
        }

    async def _create_revenue_heatmap(self, sales_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create monthly revenue heatmap."""

        # Group by month and year
        monthly_data = {}
        for sale in sales_data:
            year = sale["date"].year
            month = sale["date"].month

            if year not in monthly_data:
                monthly_data[year] = {}
            if month not in monthly_data[year]:
                monthly_data[year][month] = 0

            monthly_data[year][month] += sale["amount"]

        # Prepare data for heatmap
        years = sorted(monthly_data.keys())
        months = list(range(1, 13))

        z_data = []
        for year in years:
            row = []
            for month in months:
                amount = monthly_data.get(year, {}).get(month, 0)
                row.append(amount)
            z_data.append(row)

        trace = go.Heatmap(
            z=z_data,
            x=[f"Month {m}" for m in months],
            y=[str(y) for y in years],
            colorscale="Viridis"
        )

        layout = go.Layout(
            title="Revenue Heatmap by Month and Year",
            xaxis=dict(title="Month"),
            yaxis=dict(title="Year")
        )

        return {
            "data": [trace],
            "layout": layout
        }

    async def _create_performance_gauge(self, sales_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create performance gauge chart."""

        total_revenue = sum(sale["amount"] for sale in sales_data)
        target_revenue = 100000  # Example target

        performance_percentage = min((total_revenue / target_revenue) * 100, 100)

        trace = go.Indicator(
            mode="gauge+number+delta",
            value=performance_percentage,
            domain={'x': [0, 1], 'y': [0, 1]},
            title={'text': "Revenue Target Achievement (%)"},
            delta={'reference': 100},
            gauge={
                'axis': {'range': [None, 100]},
                'bar': {'color': "darkblue"},
                'steps': [
                    {'range': [0, 50], 'color': "lightgray"},
                    {'range': [50, 80], 'color': "gray"}
                ],
                'threshold': {
                    'line': {'color': "red", 'width': 4},
                    'thickness': 0.75,
                    'value': 90
                }
            }
        )

        layout = go.Layout(
            title="Performance Gauge"
        )

        return {
            "data": [trace],
            "layout": layout
        }

    def _calculate_summary_stats(self, sales_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Calculate summary statistics for the dashboard."""

        if not sales_data:
            return {}

        total_revenue = sum(sale["amount"] for sale in sales_data)
        total_orders = len(sales_data)
        avg_order_value = total_revenue / total_orders if total_orders > 0 else 0

        return {
            "total_revenue": f"${total_revenue:,.2f}",
            "total_orders": f"{total_orders:,}",
            "average_order_value": f"${avg_order_value:,.2f}",
            "period_days": len(set(sale["date"] for sale in sales_data))
        }

    # Additional helper methods would be implemented here...
    async def _get_customer_sales_data(self, customer_ids: List[int]) -> Dict[int, Dict[str, Any]]:
        """Get sales data for specific customers."""
        # Implementation would fetch sales data for each customer
        return {}

    async def _get_performance_metrics(self) -> Dict[str, Any]:
        """Get performance metrics for monitoring dashboard."""
        # Implementation would fetch real performance data
        return {
            "response_times": [0.1, 0.2, 0.15, 0.3, 0.25],
            "error_rates": [0.01, 0.02, 0.01, 0.03, 0.02],
            "cache_hit_ratio": 0.85
        }

# Usage Example
async def main():
    """Demonstrate data visualization capabilities."""

    async with ZenooClient("localhost", port=8069) as client:
        await client.login("demo", "admin", "admin")

        # Initialize visualization service
        viz_service = DataVisualizationService(client)

        print("📊 Creating data visualizations...")

        # Create sales dashboard
        end_date = date.today()
        start_date = end_date - timedelta(days=30)

        sales_dashboard = await viz_service.create_sales_dashboard(start_date, end_date)
        print(f"Sales Dashboard: {len(sales_dashboard['charts'])} charts created")

        # Create customer analytics
        customer_analytics = await viz_service.create_customer_analytics_charts()
        print(f"Customer Analytics: {len(customer_analytics['charts'])} charts created")

        # Create interactive web dashboard
        web_dashboard = viz_service.create_interactive_web_dashboard(sales_dashboard)

        # Save dashboard to file
        with open("/tmp/sales_dashboard.html", "w") as f:
            f.write(web_dashboard)
        print("Interactive dashboard saved to /tmp/sales_dashboard.html")

        # Export charts as images
        exported_files = await viz_service.export_charts_to_images(
            sales_dashboard["charts"]
        )
        print(f"Exported {len(exported_files)} chart images")

if __name__ == "__main__":
    asyncio.run(main())

Key Features Demonstrated

1. Multiple Chart Types

  • Line charts for trends
  • Bar charts for comparisons
  • Pie charts for distributions
  • Heatmaps for patterns
  • Gauge charts for KPIs

2. Interactive Dashboards

  • Real-time data updates
  • Hover interactions
  • Zoom and pan capabilities
  • Responsive design

3. Export Capabilities

  • PNG/JPEG image export
  • HTML dashboard generation
  • PDF report integration
  • Presentation-ready formats

4. Performance Optimization

  • Efficient data aggregation
  • Cached visualization data
  • Optimized chart rendering

5. Business Intelligence

  • Customer segmentation visuals
  • Sales trend analysis
  • Performance monitoring
  • Geographic distribution

Integration Examples

FastAPI Web Dashboard

from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse

app = FastAPI()

@app.get("/dashboard", response_class=HTMLResponse)
async def get_dashboard(request: Request):
    viz_service = DataVisualizationService(client)
    dashboard_data = await viz_service.create_sales_dashboard(
        start_date=date.today() - timedelta(days=30),
        end_date=date.today()
    )

    html_content = viz_service.create_interactive_web_dashboard(dashboard_data)
    return HTMLResponse(content=html_content)

Real-time Dashboard with WebSockets

import websockets
import json

async def dashboard_websocket(websocket, path):
    """WebSocket endpoint for real-time dashboard updates."""
    while True:
        # Get latest data
        dashboard_data = await viz_service.create_sales_dashboard(
            start_date=date.today(),
            end_date=date.today()
        )

        # Send to client
        await websocket.send(json.dumps(dashboard_data))
        await asyncio.sleep(60)  # Update every minute

Next Steps