CrewAI Flows
CrewAI Flows
Flows are CrewAI’s event-driven orchestration layer. They sit above crews: you still define agents, tasks, and crews as usual, but a Flow decides when each step runs, how state moves between steps, and how branches and parallel entry points compose into a larger workflow. Use Flows when you need to chain multiple crews, hold structured state across steps, route conditionally, or model multi-stage pipelines that go beyond a single Crew.kickoff().
Core decorators
| Decorator | Role |
|---|---|
@start() | Marks entry points when the flow kicks off. You can define multiple @start methods so several branches begin in parallel. |
@listen(method) | Runs when the referenced method (or a string label from a router) completes; the listener receives that step’s return value. |
@router(method) | After method finishes, CrewAI calls your router with that output; the string you return selects which @listen("label") handlers run next. |
Together, these turn a class into a graph of steps: starts fire first, then listeners react to completions, and routers choose paths.
Basic Flow
A minimal linear pipeline: one start, then a chain of listeners. Each method’s return value is passed to the next listener as an argument.
from crewai.flow.flow import Flow, listen, start
class ResearchFlow(Flow):
@start()
def gather_topic(self):
return "AI Agents in 2025"
@listen(gather_topic)
def research(self, topic):
# Could call a crew here
return f"Research results for: {topic}"
@listen(research)
def write_report(self, results):
return f"Final report based on: {results}"
flow = ResearchFlow()
result = flow.kickoff()kickoff() runs the flow from all @start methods through the listener graph and returns the final outcome according to your flow’s configuration.
Structured state with Pydantic
Flows can carry a typed state object so steps read and write shared fields instead of threading many return values by hand. Subclass Flow with a Pydantic model as the generic parameter and assign to self.state inside steps.
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
class ResearchState(BaseModel):
topic: str = ""
findings: list[str] = []
report: str = ""
class ResearchFlow(Flow[ResearchState]):
@start()
def set_topic(self):
self.state.topic = "AI Agents"Listeners can still return values for downstream steps, or you can keep mutating self.state for a single source of truth across the run.
Conditional branching with @router
When the next step depends on runtime data, use @router on a method that runs after the step you want to branch on. Return a string label; methods decorated with @listen("that_label") run for that branch.
from crewai.flow.flow import Flow, listen, router, start
class BranchingFlow(Flow):
@start()
def research(self):
return "x" * 500 # example payload
@router(research)
def decide_next(self, results):
if len(results) > 1000:
return "detailed_analysis"
return "quick_summary"
@listen("detailed_analysis")
def analyze(self):
...
@listen("quick_summary")
def summarize(self):
...Ensure every path your router can return has a matching @listen("label") (or you will get routing errors at runtime).
Calling a Crew inside a Flow step
A Flow step is plain Python: instantiate or reuse a Crew, then call kickoff() (or kickoff_for_each) and return whatever you need for the next listener—raw text, structured output, or a summary written into self.state.
from crewai import Agent, Crew, Process, Task
from crewai.flow.flow import Flow, listen, start
researcher = Agent(
role="Researcher",
goal="Summarize the topic briefly",
backstory="You are concise and factual.",
verbose=True,
)
research_task = Task(
description="Research: {topic}. Output 3 short bullets.",
expected_output="Three bullets.",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
process=Process.sequential,
verbose=True,
)
class ReportFlow(Flow):
@start()
def run_research_crew(self):
out = crew.kickoff(inputs={"topic": "AI agents"})
return out.raw
@listen(run_research_crew)
def next_step(self, crew_output):
return f"Done. Crew said: {crew_output[:200]}..."You can also set self.state.findings = out.raw (with a Pydantic state model) before returning, so later steps and routers see crew results without passing huge strings through every return.
or_() and and_() combinators
When a step should run only after several predecessors finish, use the combinators from the flow module (names may be or_ and and_ depending on your CrewAI version):
or_(a, b, ...)— fire when any of the listed methods completes (first completion wins for that trigger; consult the docs for your version if you need strict “wait for all” semantics on OR paths).and_(a, b, ...)— fire when all listed methods have completed, so you can merge parallel@startor parallel branches before a single downstream step.
Use these when you split work with multiple @start methods or parallel listeners and need one consolidation step.
When to use Flows vs plain Crews
| Situation | Prefer |
|---|---|
One crew, linear tasks, single kickoff | Crew only |
| Multiple crews in sequence or with shared state | Flow |
| If / else routing based on outputs or state | Flow with @router |
| Long-running or multi-phase apps with clear stages | Flow |
| Simple agent + task DAG with no extra orchestration | Crew |
Crews excel at “this team runs these tasks once.” Flows excel when orchestration itself is a program: events, branches, parallel starts, and stateful workflows.
Key takeaways
- Flows add an event-driven layer on top of crews:
@start→@listenchains, optional@routerlabels, and optional Pydantic state onFlow[YourState]. - Multiple
@startmethods enable parallel entry points;or_/and_help express “any of” or “all of” triggers for downstream steps. - Crews are invoked inside flow methods like normal Python; pass
kickoffinputs, then forwardCrewOutputor state fields to the next step. - Choose Crew-only for a single run; choose Flows for multi-step orchestration, conditionals, and durable-feeling state across the run.