Automated Workflows and Business Process Automation¶
A comprehensive example demonstrating how to build automated workflows and business process automation using Zenoo RPC with event-driven architecture, task scheduling, and integration patterns.
Overview¶
This example shows how to:
- Create event-driven automated workflows
- Implement business process automation
- Schedule recurring tasks and jobs
- Handle workflow state management
- Integrate with external systems
- Monitor and track workflow execution
Complete Implementation¶
Workflow Engine Service¶
import asyncio
import json
from typing import Dict, List, Any, Optional, Callable, Union
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import uuid
from contextlib import asynccontextmanager
from zenoo_rpc import ZenooClient
from zenoo_rpc.models.common import ResPartner, SaleOrder
from zenoo_rpc.query.filters import Q
from zenoo_rpc.batch.manager import BatchManager
class WorkflowStatus(Enum):
"""Workflow execution status."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskStatus(Enum):
"""Individual task status."""
WAITING = "waiting"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class WorkflowTask:
"""Individual workflow task definition."""
task_id: str
name: str
task_type: str # action, condition, delay, notification
parameters: Dict[str, Any] = field(default_factory=dict)
dependencies: List[str] = field(default_factory=list)
retry_count: int = 0
max_retries: int = 3
timeout_seconds: int = 300
status: TaskStatus = TaskStatus.WAITING
result: Optional[Any] = None
error_message: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
@dataclass
class WorkflowDefinition:
"""Workflow definition with tasks and configuration."""
workflow_id: str
name: str
description: str
tasks: List[WorkflowTask]
triggers: List[Dict[str, Any]] = field(default_factory=list)
schedule: Optional[str] = None # cron expression
enabled: bool = True
version: str = "1.0"
@dataclass
class WorkflowExecution:
"""Workflow execution instance."""
execution_id: str
workflow_id: str
status: WorkflowStatus = WorkflowStatus.PENDING
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
context: Dict[str, Any] = field(default_factory=dict)
task_results: Dict[str, Any] = field(default_factory=dict)
error_message: Optional[str] = None
class WorkflowEngine:
"""Automated workflow engine for business process automation."""
def __init__(self, client: ZenooClient):
self.client = client
self.workflows: Dict[str, WorkflowDefinition] = {}
self.executions: Dict[str, WorkflowExecution] = {}
self.task_handlers: Dict[str, Callable] = {}
self.running = False
self._register_default_handlers()
def register_workflow(self, workflow: WorkflowDefinition):
"""Register a new workflow definition."""
self.workflows[workflow.workflow_id] = workflow
print(f"Registered workflow: {workflow.name} ({workflow.workflow_id})")
def register_task_handler(self, task_type: str, handler: Callable):
"""Register a task handler function."""
self.task_handlers[task_type] = handler
async def start_workflow(
self,
workflow_id: str,
context: Optional[Dict[str, Any]] = None
) -> str:
"""Start a workflow execution."""
if workflow_id not in self.workflows:
raise ValueError(f"Workflow {workflow_id} not found")
workflow = self.workflows[workflow_id]
if not workflow.enabled:
raise ValueError(f"Workflow {workflow_id} is disabled")
# Create execution instance
execution_id = str(uuid.uuid4())
execution = WorkflowExecution(
execution_id=execution_id,
workflow_id=workflow_id,
started_at=datetime.now(),
context=context or {}
)
self.executions[execution_id] = execution
# Start execution in background
asyncio.create_task(self._execute_workflow(execution))
return execution_id
async def _execute_workflow(self, execution: WorkflowExecution):
"""Execute a workflow instance."""
try:
execution.status = WorkflowStatus.RUNNING
workflow = self.workflows[execution.workflow_id]
print(f"Starting workflow execution: {execution.execution_id}")
# Execute tasks based on dependencies
completed_tasks = set()
while len(completed_tasks) < len(workflow.tasks):
# Find tasks ready to execute
ready_tasks = []
for task in workflow.tasks:
if (task.task_id not in completed_tasks and
task.status in [TaskStatus.WAITING, TaskStatus.FAILED] and
all(dep in completed_tasks for dep in task.dependencies)):
ready_tasks.append(task)
if not ready_tasks:
# Check if we're stuck
remaining_tasks = [t for t in workflow.tasks if t.task_id not in completed_tasks]
if remaining_tasks:
raise Exception(f"Workflow stuck - no ready tasks. Remaining: {[t.task_id for t in remaining_tasks]}")
break
# Execute ready tasks concurrently
await asyncio.gather(*[
self._execute_task(task, execution)
for task in ready_tasks
])
# Update completed tasks
for task in ready_tasks:
if task.status == TaskStatus.COMPLETED:
completed_tasks.add(task.task_id)
elif task.status == TaskStatus.FAILED and task.retry_count >= task.max_retries:
# Task failed permanently
execution.status = WorkflowStatus.FAILED
execution.error_message = f"Task {task.task_id} failed: {task.error_message}"
execution.completed_at = datetime.now()
return
# All tasks completed successfully
execution.status = WorkflowStatus.COMPLETED
execution.completed_at = datetime.now()
print(f"Workflow execution completed: {execution.execution_id}")
except Exception as e:
execution.status = WorkflowStatus.FAILED
execution.error_message = str(e)
execution.completed_at = datetime.now()
print(f"Workflow execution failed: {execution.execution_id} - {e}")
async def _execute_task(self, task: WorkflowTask, execution: WorkflowExecution):
"""Execute an individual task."""
try:
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
print(f"Executing task: {task.name} ({task.task_id})")
# Get task handler
if task.task_type not in self.task_handlers:
raise Exception(f"No handler registered for task type: {task.task_type}")
handler = self.task_handlers[task.task_type]
# Execute task with timeout
try:
result = await asyncio.wait_for(
handler(task, execution, self.client),
timeout=task.timeout_seconds
)
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
# Store result in execution context
execution.task_results[task.task_id] = result
print(f"Task completed: {task.name}")
except asyncio.TimeoutError:
raise Exception(f"Task timed out after {task.timeout_seconds} seconds")
except Exception as e:
task.status = TaskStatus.FAILED
task.error_message = str(e)
task.retry_count += 1
print(f"Task failed: {task.name} - {e}")
# Retry if possible
if task.retry_count < task.max_retries:
print(f"Retrying task: {task.name} (attempt {task.retry_count + 1})")
await asyncio.sleep(2 ** task.retry_count) # Exponential backoff
task.status = TaskStatus.WAITING
def _register_default_handlers(self):
"""Register default task handlers."""
self.register_task_handler("create_record", self._handle_create_record)
self.register_task_handler("update_record", self._handle_update_record)
self.register_task_handler("send_email", self._handle_send_email)
self.register_task_handler("condition", self._handle_condition)
self.register_task_handler("delay", self._handle_delay)
self.register_task_handler("batch_operation", self._handle_batch_operation)
self.register_task_handler("notification", self._handle_notification)
async def _handle_create_record(self, task: WorkflowTask, execution: WorkflowExecution, client: ZenooClient):
"""Handle record creation task."""
params = task.parameters
model_name = params.get("model")
data = params.get("data", {})
# Substitute context variables
data = self._substitute_context_variables(data, execution.context)
if model_name == "res.partner":
record = await client.model(ResPartner).create(data)
elif model_name == "sale.order":
record = await client.model(SaleOrder).create(data)
else:
# Generic model creation
record = await client.model(model_name).create(data)
return {"record_id": record.id, "model": model_name}
async def _handle_update_record(self, task: WorkflowTask, execution: WorkflowExecution, client: ZenooClient):
"""Handle record update task."""
params = task.parameters
model_name = params.get("model")
record_id = params.get("record_id")
data = params.get("data", {})
# Substitute context variables
data = self._substitute_context_variables(data, execution.context)
record_id = self._substitute_context_variables(record_id, execution.context)
if model_name == "res.partner":
await client.model(ResPartner).update(record_id, data)
elif model_name == "sale.order":
await client.model(SaleOrder).update(record_id, data)
else:
# Generic model update
await client.model(model_name).update(record_id, data)
return {"updated": True, "record_id": record_id}
async def _handle_send_email(self, task: WorkflowTask, execution: WorkflowExecution, client: ZenooClient):
"""Handle email sending task."""
params = task.parameters
# In a real implementation, this would integrate with an email service
email_data = {
"to": params.get("to"),
"subject": params.get("subject"),
"body": params.get("body"),
"template": params.get("template")
}
# Substitute context variables
email_data = self._substitute_context_variables(email_data, execution.context)
print(f"Sending email to {email_data['to']}: {email_data['subject']}")
# Simulate email sending
await asyncio.sleep(1)
return {"email_sent": True, "recipient": email_data["to"]}
async def _handle_condition(self, task: WorkflowTask, execution: WorkflowExecution, client: ZenooClient):
"""Handle conditional logic task."""
params = task.parameters
condition_type = params.get("type")
if condition_type == "record_exists":
model_name = params.get("model")
filters = params.get("filters", {})
# Substitute context variables
filters = self._substitute_context_variables(filters, execution.context)
# Check if record exists
if model_name == "res.partner":
record = await client.model(ResPartner).filter(**filters).first()
else:
record = await client.model(model_name).filter(**filters).first()
result = record is not None
elif condition_type == "value_comparison":
left_value = self._substitute_context_variables(params.get("left"), execution.context)
right_value = self._substitute_context_variables(params.get("right"), execution.context)
operator = params.get("operator", "eq")
if operator == "eq":
result = left_value == right_value
elif operator == "gt":
result = left_value > right_value
elif operator == "lt":
result = left_value < right_value
else:
result = False
else:
result = False
return {"condition_result": result}
async def _handle_delay(self, task: WorkflowTask, execution: WorkflowExecution, client: ZenooClient):
"""Handle delay task."""
delay_seconds = task.parameters.get("seconds", 1)
await asyncio.sleep(delay_seconds)
return {"delayed": delay_seconds}
async def _handle_batch_operation(self, task: WorkflowTask, execution: WorkflowExecution, client: ZenooClient):
"""Handle batch operations task."""
params = task.parameters
operations = params.get("operations", [])
async with client.batch() as batch:
for operation in operations:
op_type = operation.get("type")
model = operation.get("model")
data = self._substitute_context_variables(operation.get("data", {}), execution.context)
if op_type == "create":
batch.create(model, data)
elif op_type == "update":
record_ids = operation.get("record_ids", [])
batch.update(model, data, record_ids)
results = await batch.execute()
return {"batch_results": results}
async def _handle_notification(self, task: WorkflowTask, execution: WorkflowExecution, client: ZenooClient):
"""Handle notification task."""
params = task.parameters
message = self._substitute_context_variables(params.get("message"), execution.context)
print(f"Notification: {message}")
# In a real implementation, this could send to Slack, Teams, etc.
return {"notification_sent": True, "message": message}
def _substitute_context_variables(self, value: Any, context: Dict[str, Any]) -> Any:
"""Substitute context variables in values."""
if isinstance(value, str):
# Simple variable substitution
for key, val in context.items():
value = value.replace(f"{{{key}}}", str(val))
return value
elif isinstance(value, dict):
return {k: self._substitute_context_variables(v, context) for k, v in value.items()}
elif isinstance(value, list):
return [self._substitute_context_variables(item, context) for item in value]
else:
return value
def get_execution_status(self, execution_id: str) -> Optional[Dict[str, Any]]:
"""Get workflow execution status."""
if execution_id not in self.executions:
return None
execution = self.executions[execution_id]
workflow = self.workflows[execution.workflow_id]
return {
"execution_id": execution_id,
"workflow_name": workflow.name,
"status": execution.status.value,
"started_at": execution.started_at.isoformat() if execution.started_at else None,
"completed_at": execution.completed_at.isoformat() if execution.completed_at else None,
"task_status": [
{
"task_id": task.task_id,
"name": task.name,
"status": task.status.value,
"retry_count": task.retry_count
}
for task in workflow.tasks
],
"error_message": execution.error_message
}
# Predefined Workflow Examples
def create_customer_onboarding_workflow() -> WorkflowDefinition:
"""Create a customer onboarding workflow."""
tasks = [
WorkflowTask(
task_id="validate_customer",
name="Validate Customer Data",
task_type="condition",
parameters={
"type": "record_exists",
"model": "res.partner",
"filters": {"email": "{customer_email}"}
}
),
WorkflowTask(
task_id="create_customer",
name="Create Customer Record",
task_type="create_record",
dependencies=["validate_customer"],
parameters={
"model": "res.partner",
"data": {
"name": "{customer_name}",
"email": "{customer_email}",
"customer_rank": 1
}
}
),
WorkflowTask(
task_id="send_welcome_email",
name="Send Welcome Email",
task_type="send_email",
dependencies=["create_customer"],
parameters={
"to": "{customer_email}",
"subject": "Welcome to Our Platform!",
"body": "Dear {customer_name}, welcome to our platform!"
}
),
WorkflowTask(
task_id="notify_sales_team",
name="Notify Sales Team",
task_type="notification",
dependencies=["create_customer"],
parameters={
"message": "New customer onboarded: {customer_name} ({customer_email})"
}
)
]
return WorkflowDefinition(
workflow_id="customer_onboarding",
name="Customer Onboarding Process",
description="Automated workflow for onboarding new customers",
tasks=tasks
)
def create_order_processing_workflow() -> WorkflowDefinition:
"""Create an order processing workflow."""
tasks = [
WorkflowTask(
task_id="validate_order",
name="Validate Order Data",
task_type="condition",
parameters={
"type": "value_comparison",
"left": "{order_amount}",
"operator": "gt",
"right": 0
}
),
WorkflowTask(
task_id="create_order",
name="Create Sales Order",
task_type="create_record",
dependencies=["validate_order"],
parameters={
"model": "sale.order",
"data": {
"partner_id": "{customer_id}",
"amount_total": "{order_amount}"
}
}
),
WorkflowTask(
task_id="send_confirmation",
name="Send Order Confirmation",
task_type="send_email",
dependencies=["create_order"],
parameters={
"to": "{customer_email}",
"subject": "Order Confirmation",
"body": "Your order has been confirmed. Amount: ${order_amount}"
}
)
]
return WorkflowDefinition(
workflow_id="order_processing",
name="Order Processing Workflow",
description="Automated workflow for processing new orders",
tasks=tasks
)
# Usage Example
async def main():
"""Demonstrate automated workflow capabilities."""
async with ZenooClient("localhost", port=8069) as client:
await client.login("demo", "admin", "admin")
# Initialize workflow engine
engine = WorkflowEngine(client)
# Register workflows
engine.register_workflow(create_customer_onboarding_workflow())
engine.register_workflow(create_order_processing_workflow())
print("🔄 Starting automated workflows...")
# Start customer onboarding workflow
onboarding_execution = await engine.start_workflow(
"customer_onboarding",
context={
"customer_name": "John Doe",
"customer_email": "john.doe@example.com"
}
)
# Start order processing workflow
order_execution = await engine.start_workflow(
"order_processing",
context={
"customer_id": 1,
"customer_email": "customer@example.com",
"order_amount": 1500.00
}
)
# Wait for workflows to complete
await asyncio.sleep(10)
# Check execution status
onboarding_status = engine.get_execution_status(onboarding_execution)
order_status = engine.get_execution_status(order_execution)
print(f"\n📊 Workflow Results:")
print(f"Customer Onboarding: {onboarding_status['status']}")
print(f"Order Processing: {order_status['status']}")
if __name__ == "__main__":
asyncio.run(main())
Key Features Demonstrated¶
1. Event-Driven Architecture¶
- Task dependency management
- Asynchronous execution
- Event-based triggers
2. Business Process Automation¶
- Customer onboarding workflows
- Order processing automation
- Notification systems
3. Workflow Management¶
- State tracking and monitoring
- Error handling and retries
- Conditional logic support
4. Integration Capabilities¶
- Email automation
- External system notifications
- Batch operations
5. Scalability Features¶
- Concurrent task execution
- Resource management
- Performance monitoring
Integration Examples¶
Webhook Triggers¶
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/webhook/customer-signup")
async def customer_signup_webhook(request: Request):
"""Webhook endpoint for customer signup events."""
data = await request.json()
# Start customer onboarding workflow
execution_id = await engine.start_workflow(
"customer_onboarding",
context={
"customer_name": data["name"],
"customer_email": data["email"]
}
)
return {"workflow_started": execution_id}
Scheduled Workflows¶
import schedule
import time
def schedule_daily_reports():
"""Schedule daily report generation workflow."""
asyncio.create_task(engine.start_workflow(
"daily_reports",
context={"date": datetime.now().strftime("%Y-%m-%d")}
))
schedule.every().day.at("08:00").do(schedule_daily_reports)
Next Steps¶
- Scheduled Tasks - Advanced task scheduling
- Email Automation - Email workflow integration
- Document Processing - Document workflow automation