Task Dependencies & Context
Task Dependencies & Context
In CrewAI, the context parameter on a Task lets you declare explicitly which other tasks’ outputs should be injected as background for the current task. The model then sees those prior results when it runs—so you control the dependency graph in code instead of relying only on implicit ordering.
What context does
Pass a list of Task instances whose completed outputs should feed into this task. CrewAI gathers their results and supplies them as context for the agent working on the dependent task.
Sequential process: automatic vs explicit wiring
With Process.sequential, tasks run in list order, and by default each step can already “see” the flow of work before it in ways the framework sets up. context still matters when you want finer control: you can name exactly which tasks count as inputs (including non-adjacent tasks), combine multiple upstream branches (fan-in), or align dependencies with async_execution so concurrent tasks finish before a downstream merge.
Explicit context (single predecessor)
Here, analysis_task only treats research_task as its input context—not an ambiguous “whatever came before”:
research_task = Task(description="Research AI trends", expected_output="...", agent=researcher)
analysis_task = Task(
description="Analyze the research",
expected_output="...",
agent=analyst,
context=[research_task],
)Multiple context sources (fan-in)
A summary (or merge) task can depend on two or more upstream tasks by listing them all in context. That is a fan-in: several parallel or sequential branches join into one consumer task.
research_task = Task(description="Research market trends", expected_output="...", agent=researcher)
data_task = Task(description="Extract metrics from the provided figures", expected_output="...", agent=analyst)
summary_task = Task(
description="Write an executive summary using both the research narrative and the metrics.",
expected_output="One page: bullets for insights plus a small table of numbers.",
agent=writer,
context=[research_task, data_task],
)async_execution with context
Set async_execution=True on tasks that should run concurrently with other async-eligible work (within the same crew / process rules your version supports). A downstream task that lists those async tasks in context will wait until all of them have finished, then receive all of their outputs together.
Pattern:
task_aandtask_b:async_execution=Truemerge_task:context=[task_a, task_b]— runs after both complete
Complete example: two parallel research tasks → one synthesis
Two researchers work in parallel; a single writer synthesizes both outputs.
from crewai import Agent, Crew, Process, Task
researcher = Agent(
role="Research Analyst",
goal="Gather concise facts from the prompt only",
backstory="You stay brief and avoid inventing sources.",
verbose=True,
)
synthesizer = Agent(
role="Synthesis Lead",
goal="Merge multiple inputs into one coherent brief",
backstory="You reconcile angles without losing detail.",
verbose=True,
)
research_llms = Task(
description="List three current themes in LLM product adoption (no web search; reasoning only).",
expected_output="Three short bullet points.",
agent=researcher,
async_execution=True,
)
research_agents = Task(
description="List three themes in multi-agent orchestration (no web search; reasoning only).",
expected_output="Three short bullet points.",
agent=researcher,
async_execution=True,
)
synthesis_task = Task(
description="Combine the two research outputs into six non-redundant bullets grouped under two headings.",
expected_output="Markdown with two ## headings and six bullets total.",
agent=synthesizer,
context=[research_llms, research_agents],
)
crew = Crew(
agents=[researcher, synthesizer],
tasks=[research_llms, research_agents, synthesis_task],
process=Process.sequential,
verbose=True,
)
result = crew.kickoff()
print(result)Task callbacks
A callback runs when a task completes, and receives the task’s output object (commonly typed as TaskOutput). Use it for logging, metrics, or handing results to another system.
from crewai.tasks.task_output import TaskOutput
def on_complete(output: TaskOutput):
print(f"Task done: {output.description}")
task = Task(..., callback=on_complete)Wire the same way for any task, including merge tasks that depend on context.
Key takeaways
context=[...]declares which upstreamTaskoutputs are passed into the current task.- Sequential crews still benefit from explicit
contextwhen you need precise or multi-source dependencies. - Fan-in = one task with
context=[task_a, task_b, ...]. async_execution=Trueon several tasks lets them run in parallel; a later task with those tasks incontextblocks until all are done, then sees combined context.callbackhooks completion and receives aTaskOutput(for example to logoutput.descriptionor inspectoutput.raw).