Agent Foundry
CrewAI

Project: Customer Support System

AdvancedTopic 24 of 24Open in Colab

Project: Customer Support System

What You'll Build

You will build a customer support pipeline with CrewAI: a triage agent reads each message and classifies it into a structured SupportTicket, ConditionalTask steps route billing and technical work to specialists (with a general fallback), and a QA reviewer produces a validated SupportResponse—optionally after human input. Shared FAQ context comes from a StringKnowledgeSource, and the crew uses memory so follow-up reasoning stays coherent across steps.

Features Used

  • Flows — a small Flow wraps the crew kickoff so the support pipeline is an explicit orchestration entrypoint
  • Conditional Tasks — billing and technical paths run only when triage indicates the right category; general runs when triage says general
  • Knowledge Sources — crew-level StringKnowledgeSource with company FAQ text
  • Custom Tools — a @tool that simulates CRM ticket lookup by id
  • Structured Outputoutput_pydantic=SupportTicket on triage and output_pydantic=SupportResponse on QA
  • Human Inputhuman_input=True on the QA task for a human gate before the final structured review
  • Memorymemory=True on the crew for short-term recall across tasks

Step 1: Define data models

from pydantic import BaseModel
 
class SupportTicket(BaseModel):
    category: str  # "billing", "technical", "general"
    priority: str  # "low", "medium", "high"
    summary: str
 
class SupportResponse(BaseModel):
    ticket_summary: str
    resolution: str
    follow_up_needed: bool
    satisfaction_prediction: str

Step 2: Set up Knowledge source with company FAQ

from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
 
faq = StringKnowledgeSource(
    content=(
        "Q: How do I reset my password? A: Go to Settings > Security > Reset password and follow the email link. "
        "Q: I was charged twice. A: Open Billing > Invoices, note the duplicate charge ids, and request a refund review. "
        "Q: The app returns error 503. A: Check status.example.com; if green, try clearing cache or reinstalling the app."
    ),
)

Attach this (and any other sources) to the Crew via knowledge_sources=[faq] so every agent can retrieve FAQ passages during tasks.

Step 3: Create a custom tool for ticket lookup

from crewai.tools import tool
 
@tool("Ticket Lookup")
def lookup_ticket(ticket_id: str) -> str:
    """Look up a support ticket by id in the CRM (simulated)."""
    crm = {
        "T-1001": "Status: open. Issue: duplicate subscription charge from 2026-03-10.",
        "T-2044": "Status: waiting on customer. Issue: SMTP errors when sending reports.",
    }
    return crm.get(ticket_id.strip().upper(), "No ticket found for that id.")

Specialists can call lookup_ticket when the customer mentions an id such as T-1001.

Step 4: Define agents — Triage, Billing, Technical, QA

Give the triage agent a policy-focused brief; give billing and technical agents permission to use lookup_ticket; keep QA separate with a review-oriented goal.

from crewai import Agent
 
triage_agent = Agent(
    role="Support Triage",
    goal="Classify incoming messages into category, priority, and a short internal summary",
    backstory="You scan customer messages quickly and assign accurate routing labels without solving the full case.",
    verbose=True,
    allow_delegation=False,
)
 
billing_specialist = Agent(
    role="Billing Specialist",
    goal="Resolve billing, invoices, refunds, and plan questions using policy and tools",
    backstory="You know subscription billing inside out and escalate edge cases clearly.",
    tools=[lookup_ticket],
    verbose=True,
    allow_delegation=False,
)
 
technical_specialist = Agent(
    role="Technical Specialist",
    goal="Diagnose product and integration issues with concrete steps and expectations",
    backstory="You reproduce issues mentally, cite known error patterns, and avoid guessing when data is missing.",
    tools=[lookup_ticket],
    verbose=True,
    allow_delegation=False,
)
 
qa_reviewer = Agent(
    role="Quality Assurance Reviewer",
    goal="Ensure the proposed resolution is accurate, on-brand, and complete before it reaches the customer",
    backstory="You edit for clarity, policy alignment, and risk; you catch contradictions with the FAQ.",
    verbose=True,
    allow_delegation=False,
)

Step 5: Define tasks — triage, conditionals, general, QA

Use output_pydantic=SupportTicket on triage. For ConditionalTask conditions, inspect the previous task output: after skipped billing, the “previous executed” task is still triage, so a small helper that reads category from triage-shaped text (or recognizes specialist prose) keeps routing stable.

from crewai import Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
 
def _category_from_previous(output: TaskOutput) -> str | None:
    text = output.raw.lower()
    for cat in ("billing", "technical", "general"):
        if f'"category": "{cat}"' in text or f"'category': '{cat}'" in text or f'category":"{cat}"' in text:
            return cat
    return None
 
def is_billing(output: TaskOutput) -> bool:
    return _category_from_previous(output) == "billing"
 
def is_technical(output: TaskOutput) -> bool:
    return _category_from_previous(output) == "technical"
 
def is_general(output: TaskOutput) -> bool:
    return _category_from_previous(output) == "general"
 
triage_task = Task(
    description=(
        "Customer message: {customer_query}\n"
        "Return a SupportTicket: category must be exactly one of billing, technical, general; "
        "priority one of low, medium, high; summary in one short paragraph."
    ),
    expected_output="A validated SupportTicket",
    agent=triage_agent,
    output_pydantic=SupportTicket,
)
 
billing_task = ConditionalTask(
    description=(
        "Resolve this billing case. Message: {customer_query}. "
        "Use FAQ knowledge. If a ticket id like T-1001 appears, call lookup_ticket. "
        "Write the resolution you would send to the customer (plain text)."
    ),
    expected_output="Customer-ready billing resolution",
    agent=billing_specialist,
    condition=is_billing,
)
 
technical_task = ConditionalTask(
    description=(
        "Resolve this technical case. Message: {customer_query}. "
        "Use FAQ knowledge and lookup_ticket if an id is present. "
        "Write numbered troubleshooting steps and what to do if they fail."
    ),
    expected_output="Customer-ready technical resolution",
    agent=technical_specialist,
    condition=is_technical,
)
 
general_task = ConditionalTask(
    description=(
        "Handle this general inquiry. Message: {customer_query}. "
        "Use FAQ knowledge. Answer helpfully and state any limits clearly."
    ),
    expected_output="Customer-ready general answer",
    agent=triage_agent,
    condition=is_general,
)
 
qa_task = Task(
    description=(
        "Review the specialist output (from billing, technical, or general handling). "
        "Produce SupportResponse: ticket_summary (brief), resolution (final customer-facing text), "
        "follow_up_needed (bool), satisfaction_prediction (low|medium|high)."
    ),
    expected_output="A validated SupportResponse",
    agent=qa_reviewer,
    context=[billing_task, technical_task, general_task],
    output_pydantic=SupportResponse,
    human_input=True,
)

Task order is triage → billing (conditional) → technical (conditional) → general (conditional) → QA. With sequential process, when a conditional is skipped, the next task still sees the last executed task’s output, so _category_from_previous continues to read triage until a specialist runs.

Step 6: Build a Flow that orchestrates the support pipeline

Use BaseModel from step 1 for flow state.

from crewai.flow.flow import Flow, start
 
class SupportFlowState(BaseModel):
    customer_query: str = ""
 
support_crew = Crew(
    agents=[triage_agent, billing_specialist, technical_specialist, qa_reviewer],
    tasks=[triage_task, billing_task, technical_task, general_task, qa_task],
    process=Process.sequential,
    knowledge_sources=[faq],
    memory=True,
    verbose=True,
)
 
class CustomerSupportFlow(Flow[SupportFlowState]):
    @start()
    def run_support_crew(self):
        return support_crew.kickoff(inputs={"customer_query": self.state.customer_query})

CustomerSupportFlow gives you a named workflow around kickoff: you can later add @listen, @router, or extra steps (notifications, logging, second crew) without changing the crew definition.

Step 7: Run with a sample customer query

flow = CustomerSupportFlow()
final = flow.kickoff(inputs={"customer_query": "I was charged twice—my ticket is T-1001. Please fix it."})
 
resp = final.pydantic
print(resp.ticket_summary)
print(resp.resolution)
print(resp.follow_up_needed, resp.satisfaction_prediction)

When human_input=True triggers, the runtime prompts for your feedback (where input() works—terminal or many notebooks) before finalizing SupportResponse. Inspect final.raw for the full trace.

Full Code

import os
from getpass import getpass
 
from pydantic import BaseModel
 
from crewai import Agent, Crew, Process, Task
from crewai.flow.flow import Flow, start
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.tools import tool
 
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
 
class SupportTicket(BaseModel):
    category: str
    priority: str
    summary: str
 
class SupportResponse(BaseModel):
    ticket_summary: str
    resolution: str
    follow_up_needed: bool
    satisfaction_prediction: str
 
faq = StringKnowledgeSource(
    content=(
        "Q: How do I reset my password? A: Go to Settings > Security > Reset password and follow the email link. "
        "Q: I was charged twice. A: Open Billing > Invoices, note the duplicate charge ids, and request a refund review. "
        "Q: The app returns error 503. A: Check status.example.com; if green, try clearing cache or reinstalling the app."
    ),
)
 
@tool("Ticket Lookup")
def lookup_ticket(ticket_id: str) -> str:
    """Look up a support ticket by id in the CRM (simulated)."""
    crm = {
        "T-1001": "Status: open. Issue: duplicate subscription charge from 2026-03-10.",
        "T-2044": "Status: waiting on customer. Issue: SMTP errors when sending reports.",
    }
    return crm.get(ticket_id.strip().upper(), "No ticket found for that id.")
 
triage_agent = Agent(
    role="Support Triage",
    goal="Classify incoming messages into category, priority, and a short internal summary",
    backstory="You scan customer messages quickly and assign accurate routing labels without solving the full case.",
    verbose=True,
    allow_delegation=False,
)
 
billing_specialist = Agent(
    role="Billing Specialist",
    goal="Resolve billing, invoices, refunds, and plan questions using policy and tools",
    backstory="You know subscription billing inside out and escalate edge cases clearly.",
    tools=[lookup_ticket],
    verbose=True,
    allow_delegation=False,
)
 
technical_specialist = Agent(
    role="Technical Specialist",
    goal="Diagnose product and integration issues with concrete steps and expectations",
    backstory="You reproduce issues mentally, cite known error patterns, and avoid guessing when data is missing.",
    tools=[lookup_ticket],
    verbose=True,
    allow_delegation=False,
)
 
qa_reviewer = Agent(
    role="Quality Assurance Reviewer",
    goal="Ensure the proposed resolution is accurate, on-brand, and complete before it reaches the customer",
    backstory="You edit for clarity, policy alignment, and risk; you catch contradictions with the FAQ.",
    verbose=True,
    allow_delegation=False,
)
 
def _category_from_previous(output: TaskOutput) -> str | None:
    text = output.raw.lower()
    for cat in ("billing", "technical", "general"):
        if f'"category": "{cat}"' in text or f"'category': '{cat}'" in text or f'category":"{cat}"' in text:
            return cat
    return None
 
def is_billing(output: TaskOutput) -> bool:
    return _category_from_previous(output) == "billing"
 
def is_technical(output: TaskOutput) -> bool:
    return _category_from_previous(output) == "technical"
 
def is_general(output: TaskOutput) -> bool:
    return _category_from_previous(output) == "general"
 
triage_task = Task(
    description=(
        "Customer message: {customer_query}\n"
        "Return a SupportTicket: category must be exactly one of billing, technical, general; "
        "priority one of low, medium, high; summary in one short paragraph."
    ),
    expected_output="A validated SupportTicket",
    agent=triage_agent,
    output_pydantic=SupportTicket,
)
 
billing_task = ConditionalTask(
    description=(
        "Resolve this billing case. Message: {customer_query}. "
        "Use FAQ knowledge. If a ticket id like T-1001 appears, call lookup_ticket. "
        "Write the resolution you would send to the customer (plain text)."
    ),
    expected_output="Customer-ready billing resolution",
    agent=billing_specialist,
    condition=is_billing,
)
 
technical_task = ConditionalTask(
    description=(
        "Resolve this technical case. Message: {customer_query}. "
        "Use FAQ knowledge and lookup_ticket if an id is present. "
        "Write numbered troubleshooting steps and what to do if they fail."
    ),
    expected_output="Customer-ready technical resolution",
    agent=technical_specialist,
    condition=is_technical,
)
 
general_task = ConditionalTask(
    description=(
        "Handle this general inquiry. Message: {customer_query}. "
        "Use FAQ knowledge. Answer helpfully and state any limits clearly."
    ),
    expected_output="Customer-ready general answer",
    agent=triage_agent,
    condition=is_general,
)
 
qa_task = Task(
    description=(
        "Review the specialist output (from billing, technical, or general handling). "
        "Produce SupportResponse: ticket_summary (brief), resolution (final customer-facing text), "
        "follow_up_needed (bool), satisfaction_prediction (low|medium|high)."
    ),
    expected_output="A validated SupportResponse",
    agent=qa_reviewer,
    context=[billing_task, technical_task, general_task],
    output_pydantic=SupportResponse,
    human_input=True,
)
 
support_crew = Crew(
    agents=[triage_agent, billing_specialist, technical_specialist, qa_reviewer],
    tasks=[triage_task, billing_task, technical_task, general_task, qa_task],
    process=Process.sequential,
    knowledge_sources=[faq],
    memory=True,
    verbose=True,
)
 
class SupportFlowState(BaseModel):
    customer_query: str = ""
 
class CustomerSupportFlow(Flow[SupportFlowState]):
    @start()
    def run_support_crew(self):
        return support_crew.kickoff(inputs={"customer_query": self.state.customer_query})
 
flow = CustomerSupportFlow()
final = flow.kickoff(inputs={"customer_query": "I was charged twice—my ticket is T-1001. Please fix it."})
print(final.pydantic)

What You Learned

You built an end-to-end support pipeline: structured triage with SupportTicket, FAQ-backed answers via StringKnowledgeSource, branching with ConditionalTask and small category predicates, a custom lookup tool, crew memory for cross-task coherence, and a QA step that emits SupportResponse with human_input=True. Wrapping kickoff in a Flow gives you a clear place to grow the system (routers, side effects, or multiple crews) while keeping the core crew definition readable.