My App
User guideTutorials

Your First Workflow

Your First Workflow

Your First Workflow

Learn to build a complete AI workflow from scratch using Duragraph.

What You'll Build

A customer support chatbot that:

  1. Receives customer queries
  2. Analyzes sentiment
  3. Routes to appropriate response strategy
  4. Generates personalized responses

Prerequisites

  • Duragraph running locally (Quickstart Guide)
  • Python 3.8+ installed
  • Basic understanding of AI/LLM concepts

Step 1: Define the Workflow

Create customer_support.py:

from duragraph import Workflow, Step

# Create workflow
workflow = Workflow(name="Customer Support Agent")

# Step 1: Analyze sentiment
workflow.add_step("analyze_sentiment", Step.llm_call(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "system", "content": "Analyze the sentiment of this customer message. Respond with just: positive, negative, or neutral."},
        {"role": "user", "content": "{customer_message}"}
    ],
    output_key="sentiment"
))

# Step 2: Route based on sentiment
workflow.add_conditional(
    "route_by_sentiment",
    condition=lambda state: state["sentiment"].lower() == "negative",
    if_true="escalation_response",
    if_false="standard_response"
)

# Step 3a: Handle negative sentiment (escalation)
workflow.add_step("escalation_response", Step.llm_call(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "You are an empathetic customer service manager. Address this concern with care and offer to escalate."},
        {"role": "user", "content": "{customer_message}"}
    ],
    output_key="response"
))

# Step 3b: Handle positive/neutral sentiment
workflow.add_step("standard_response", Step.llm_call(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "system", "content": "You are a helpful customer service agent. Provide a friendly, helpful response."},
        {"role": "user", "content": "{customer_message}"}
    ],
    output_key="response"
))

Step 2: Execute the Workflow

from duragraph import DuragraphClient

# Initialize client
client = DuragraphClient(base_url="http://localhost:8080")

# Create assistant
assistant = client.create_assistant(
    name="Customer Support Bot",
    workflow=workflow
)

# Create conversation thread
thread = client.create_thread(
    metadata={"customer_id": "12345", "channel": "email"}
)

# Test with different sentiments
test_messages = [
    "Hi, I love your product! How can I upgrade my plan?",  # Positive
    "Your service is terrible and I want a refund NOW!",    # Negative
    "Can you help me reset my password please?"             # Neutral
]

for message in test_messages:
    print(f"\n🎯 Testing: {message}")

    # Create run
    run = client.create_run(
        assistant_id=assistant.id,
        thread_id=thread.id,
        inputs={"customer_message": message}
    )

    # Stream results
    for event in client.stream_events(run.id):
        if event.type == "step_complete":
            print(f"✅ {event.step_name}: {event.output}")
        elif event.type == "workflow_complete":
            print(f"🎉 Final response: {event.outputs['response']}")

Step 3: Add Human-in-the-Loop

Enhance the workflow with human oversight for escalations:

# Add human approval step for escalations
workflow.add_step("human_review", Step.human_input(
    prompt="Customer escalation requires review:",
    input_schema={
        "type": "object",
        "properties": {
            "approved": {"type": "boolean"},
            "override_response": {"type": "string", "optional": True}
        }
    },
    depends_on=["escalation_response"]
))

# Final response step
workflow.add_step("final_response", Step.function_call(
    function=lambda state: {
        "final_response": (
            state["human_review"]["override_response"]
            if state["human_review"].get("override_response")
            else state["response"]
        )
    },
    depends_on=["human_review", "standard_response"]
))

Step 4: Add Observability

Monitor your workflow performance:

import time

# Add timing and logging
start_time = time.time()

run = client.create_run(
    assistant_id=assistant.id,
    thread_id=thread.id,
    inputs={"customer_message": message},
    metadata={
        "channel": "email",
        "priority": "normal",
        "timestamp": start_time
    }
)

# Track metrics
metrics = {
    "total_time": 0,
    "sentiment_analysis_time": 0,
    "response_generation_time": 0,
    "tokens_used": 0
}

for event in client.stream_events(run.id):
    if event.type == "step_complete":
        step_time = event.metadata.get("duration", 0)
        tokens = event.metadata.get("tokens_used", 0)

        metrics["total_time"] += step_time
        metrics["tokens_used"] += tokens

        if event.step_name == "analyze_sentiment":
            metrics["sentiment_analysis_time"] = step_time
        elif "response" in event.step_name:
            metrics["response_generation_time"] = step_time

        print(f"📊 {event.step_name}: {step_time:.2f}s, {tokens} tokens")

print(f"\n📈 Total metrics: {metrics}")

Step 5: Error Handling

Add robust error handling:

from duragraph import WorkflowError, RetryPolicy

# Configure retry policy
workflow.set_retry_policy(RetryPolicy(
    max_attempts=3,
    backoff_multiplier=2.0,
    retry_on_errors=["LLMTimeoutError", "LLMRateLimitError"]
))

# Add error handling step
workflow.add_step("error_fallback", Step.llm_call(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "system", "content": "Generate a polite fallback response for when the system encounters an error."},
        {"role": "user", "content": "Original message: {customer_message}"}
    ],
    output_key="fallback_response",
    triggers_on_error=["analyze_sentiment", "escalation_response", "standard_response"]
))

# Execute with error handling
try:
    run = client.create_run(
        assistant_id=assistant.id,
        thread_id=thread.id,
        inputs={"customer_message": message}
    )

    result = run.wait_for_completion(timeout=60)

except WorkflowError as e:
    print(f"Workflow failed: {e.message}")
    # Use fallback response
    fallback = client.get_step_output(run.id, "error_fallback")
    print(f"Fallback response: {fallback}")

Complete Example

Here's the full working example:

from duragraph import DuragraphClient, Workflow, Step, RetryPolicy
import time

def create_customer_support_workflow():
    workflow = Workflow(name="Customer Support Agent v1.0")

    # Sentiment analysis
    workflow.add_step("analyze_sentiment", Step.llm_call(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "Analyze sentiment: positive, negative, or neutral."},
            {"role": "user", "content": "{customer_message}"}
        ],
        output_key="sentiment"
    ))

    # Conditional routing
    workflow.add_conditional(
        "route_by_sentiment",
        condition=lambda state: state["sentiment"].lower() == "negative",
        if_true="escalation_response",
        if_false="standard_response"
    )

    # Response generation
    workflow.add_step("escalation_response", Step.llm_call(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "Empathetic customer service manager response."},
            {"role": "user", "content": "{customer_message}"}
        ],
        output_key="response"
    ))

    workflow.add_step("standard_response", Step.llm_call(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "Helpful customer service agent."},
            {"role": "user", "content": "{customer_message}"}
        ],
        output_key="response"
    ))

    # Error handling
    workflow.set_retry_policy(RetryPolicy(max_attempts=3))

    return workflow

def main():
    # Initialize
    client = DuragraphClient(base_url="http://localhost:8080")
    workflow = create_customer_support_workflow()

    # Create assistant
    assistant = client.create_assistant(
        name="Customer Support Bot v1.0",
        workflow=workflow
    )

    # Test the workflow
    test_message = "Your app keeps crashing and I'm losing my work!"

    thread = client.create_thread()
    run = client.create_run(
        assistant_id=assistant.id,
        thread_id=thread.id,
        inputs={"customer_message": test_message}
    )

    print(f"🚀 Processing: {test_message}")

    for event in client.stream_events(run.id):
        if event.type == "step_complete":
            print(f"✅ {event.step_name}: {event.output}")
        elif event.type == "workflow_complete":
            print(f"🎉 Final response: {event.outputs['response']}")

if __name__ == "__main__":
    main()

Next Steps

🎯 Enhance your workflow:

🔍 Explore advanced features:

🛠️ Production readiness:

Need help? Check our troubleshooting guide or join the community!