User guideTutorials
Your First Workflow
Your First Workflow
Your First Workflow
Learn to build a complete AI workflow from scratch using Duragraph.
What You'll Build
A customer support chatbot that:
- Receives customer queries
- Analyzes sentiment
- Routes to appropriate response strategy
- Generates personalized responses
Prerequisites
- Duragraph running locally (Quickstart Guide)
- Python 3.8+ installed
- Basic understanding of AI/LLM concepts
Step 1: Define the Workflow
Create customer_support.py:
from duragraph import Workflow, Step
# Create workflow
workflow = Workflow(name="Customer Support Agent")
# Step 1: Analyze sentiment
workflow.add_step("analyze_sentiment", Step.llm_call(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Analyze the sentiment of this customer message. Respond with just: positive, negative, or neutral."},
{"role": "user", "content": "{customer_message}"}
],
output_key="sentiment"
))
# Step 2: Route based on sentiment
workflow.add_conditional(
"route_by_sentiment",
condition=lambda state: state["sentiment"].lower() == "negative",
if_true="escalation_response",
if_false="standard_response"
)
# Step 3a: Handle negative sentiment (escalation)
workflow.add_step("escalation_response", Step.llm_call(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an empathetic customer service manager. Address this concern with care and offer to escalate."},
{"role": "user", "content": "{customer_message}"}
],
output_key="response"
))
# Step 3b: Handle positive/neutral sentiment
workflow.add_step("standard_response", Step.llm_call(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful customer service agent. Provide a friendly, helpful response."},
{"role": "user", "content": "{customer_message}"}
],
output_key="response"
))Step 2: Execute the Workflow
from duragraph import DuragraphClient
# Initialize client
client = DuragraphClient(base_url="http://localhost:8080")
# Create assistant
assistant = client.create_assistant(
name="Customer Support Bot",
workflow=workflow
)
# Create conversation thread
thread = client.create_thread(
metadata={"customer_id": "12345", "channel": "email"}
)
# Test with different sentiments
test_messages = [
"Hi, I love your product! How can I upgrade my plan?", # Positive
"Your service is terrible and I want a refund NOW!", # Negative
"Can you help me reset my password please?" # Neutral
]
for message in test_messages:
print(f"\n🎯 Testing: {message}")
# Create run
run = client.create_run(
assistant_id=assistant.id,
thread_id=thread.id,
inputs={"customer_message": message}
)
# Stream results
for event in client.stream_events(run.id):
if event.type == "step_complete":
print(f"✅ {event.step_name}: {event.output}")
elif event.type == "workflow_complete":
print(f"🎉 Final response: {event.outputs['response']}")Step 3: Add Human-in-the-Loop
Enhance the workflow with human oversight for escalations:
# Add human approval step for escalations
workflow.add_step("human_review", Step.human_input(
prompt="Customer escalation requires review:",
input_schema={
"type": "object",
"properties": {
"approved": {"type": "boolean"},
"override_response": {"type": "string", "optional": True}
}
},
depends_on=["escalation_response"]
))
# Final response step
workflow.add_step("final_response", Step.function_call(
function=lambda state: {
"final_response": (
state["human_review"]["override_response"]
if state["human_review"].get("override_response")
else state["response"]
)
},
depends_on=["human_review", "standard_response"]
))Step 4: Add Observability
Monitor your workflow performance:
import time
# Add timing and logging
start_time = time.time()
run = client.create_run(
assistant_id=assistant.id,
thread_id=thread.id,
inputs={"customer_message": message},
metadata={
"channel": "email",
"priority": "normal",
"timestamp": start_time
}
)
# Track metrics
metrics = {
"total_time": 0,
"sentiment_analysis_time": 0,
"response_generation_time": 0,
"tokens_used": 0
}
for event in client.stream_events(run.id):
if event.type == "step_complete":
step_time = event.metadata.get("duration", 0)
tokens = event.metadata.get("tokens_used", 0)
metrics["total_time"] += step_time
metrics["tokens_used"] += tokens
if event.step_name == "analyze_sentiment":
metrics["sentiment_analysis_time"] = step_time
elif "response" in event.step_name:
metrics["response_generation_time"] = step_time
print(f"📊 {event.step_name}: {step_time:.2f}s, {tokens} tokens")
print(f"\n📈 Total metrics: {metrics}")Step 5: Error Handling
Add robust error handling:
from duragraph import WorkflowError, RetryPolicy
# Configure retry policy
workflow.set_retry_policy(RetryPolicy(
max_attempts=3,
backoff_multiplier=2.0,
retry_on_errors=["LLMTimeoutError", "LLMRateLimitError"]
))
# Add error handling step
workflow.add_step("error_fallback", Step.llm_call(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Generate a polite fallback response for when the system encounters an error."},
{"role": "user", "content": "Original message: {customer_message}"}
],
output_key="fallback_response",
triggers_on_error=["analyze_sentiment", "escalation_response", "standard_response"]
))
# Execute with error handling
try:
run = client.create_run(
assistant_id=assistant.id,
thread_id=thread.id,
inputs={"customer_message": message}
)
result = run.wait_for_completion(timeout=60)
except WorkflowError as e:
print(f"Workflow failed: {e.message}")
# Use fallback response
fallback = client.get_step_output(run.id, "error_fallback")
print(f"Fallback response: {fallback}")Complete Example
Here's the full working example:
from duragraph import DuragraphClient, Workflow, Step, RetryPolicy
import time
def create_customer_support_workflow():
workflow = Workflow(name="Customer Support Agent v1.0")
# Sentiment analysis
workflow.add_step("analyze_sentiment", Step.llm_call(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Analyze sentiment: positive, negative, or neutral."},
{"role": "user", "content": "{customer_message}"}
],
output_key="sentiment"
))
# Conditional routing
workflow.add_conditional(
"route_by_sentiment",
condition=lambda state: state["sentiment"].lower() == "negative",
if_true="escalation_response",
if_false="standard_response"
)
# Response generation
workflow.add_step("escalation_response", Step.llm_call(
model="gpt-4",
messages=[
{"role": "system", "content": "Empathetic customer service manager response."},
{"role": "user", "content": "{customer_message}"}
],
output_key="response"
))
workflow.add_step("standard_response", Step.llm_call(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Helpful customer service agent."},
{"role": "user", "content": "{customer_message}"}
],
output_key="response"
))
# Error handling
workflow.set_retry_policy(RetryPolicy(max_attempts=3))
return workflow
def main():
# Initialize
client = DuragraphClient(base_url="http://localhost:8080")
workflow = create_customer_support_workflow()
# Create assistant
assistant = client.create_assistant(
name="Customer Support Bot v1.0",
workflow=workflow
)
# Test the workflow
test_message = "Your app keeps crashing and I'm losing my work!"
thread = client.create_thread()
run = client.create_run(
assistant_id=assistant.id,
thread_id=thread.id,
inputs={"customer_message": test_message}
)
print(f"🚀 Processing: {test_message}")
for event in client.stream_events(run.id):
if event.type == "step_complete":
print(f"✅ {event.step_name}: {event.output}")
elif event.type == "workflow_complete":
print(f"🎉 Final response: {event.outputs['response']}")
if __name__ == "__main__":
main()Next Steps
🎯 Enhance your workflow:
🔍 Explore advanced features:
🛠️ Production readiness:
Need help? Check our troubleshooting guide or join the community!