Agent Foundry
LangGraph

Map-Reduce with Send API

AdvancedTopic 18 of 22Open in Colab

Map-Reduce with Send API

LangGraph's Send API enables dynamic parallel execution where the number of workers is not known at graph definition time. Instead of hardcoding parallel branches, you use conditional edges that return Send objects — each one spawns a worker with its own state. This is the foundation for orchestrator-worker patterns and fan-out/fan-in workflows.

The Send Object

A Send object tells LangGraph to invoke a specific node with a specific state. You return a list of Send objects from a conditional edge function, and LangGraph executes them all in parallel:

from langgraph.types import Send
 
def route_to_workers(state: dict) -> list[Send]:
    return [
        Send("worker", {"topic": topic})
        for topic in state["topics"]
    ]

Each Send gets its own isolated state — the worker node receives {"topic": "..."} rather than the full parent state.

Basic Map-Reduce Example

import operator
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
 
class OverallState(TypedDict):
    topics: list[str]
    results: Annotated[list[str], operator.add]
 
class WorkerState(TypedDict):
    topic: str
 
def orchestrator(state: OverallState) -> dict:
    return {"topics": state["topics"]}
 
def route_work(state: OverallState) -> list[Send]:
    return [Send("worker", {"topic": t}) for t in state["topics"]]
 
def worker(state: WorkerState) -> dict:
    result = f"Report on: {state['topic']}"
    return {"results": [result]}
 
graph = StateGraph(OverallState)
graph.add_node("orchestrator", orchestrator)
graph.add_node("worker", worker)
graph.add_conditional_edges("orchestrator", route_work, path_map=["worker"])
graph.add_edge(START, "orchestrator")
graph.add_edge("worker", END)
 
app = graph.compile()
 
result = app.invoke({"topics": ["AI", "Quantum Computing", "Robotics"]})
print(result["results"])

Three workers run in parallel, one per topic. The Annotated[list[str], operator.add] reducer aggregates all worker outputs into a single list.

Dynamic Worker Count

The power of Send is that the number of workers is determined at runtime, not at graph definition time:

import operator
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
 
class Section(BaseModel):
    title: str
    description: str
 
class Plan(BaseModel):
    sections: list[Section]
 
class OverallState(TypedDict):
    topic: str
    sections: Annotated[list[str], operator.add]
 
class SectionState(TypedDict):
    title: str
    description: str
 
llm = ChatOpenAI(model="gpt-4o-mini")
 
def planner(state: OverallState) -> dict:
    plan = llm.with_structured_output(Plan).invoke(
        f"Create 3-5 section titles for an article about: {state['topic']}"
    )
    return {"sections": []}
 
def route_sections(state: OverallState) -> list[Send]:
    plan = llm.with_structured_output(Plan).invoke(
        f"Create 3-5 section titles for an article about: {state['topic']}"
    )
    return [
        Send("writer", {"title": s.title, "description": s.description})
        for s in plan.sections
    ]
 
def writer(state: SectionState) -> dict:
    content = llm.invoke(
        f"Write a paragraph about: {state['title']} - {state['description']}"
    )
    return {"sections": [f"## {state['title']}\n{content.content}"]}
 
graph = StateGraph(OverallState)
graph.add_node("planner", planner)
graph.add_node("writer", writer)
graph.add_conditional_edges("planner", route_sections, path_map=["writer"])
graph.add_edge(START, "planner")
graph.add_edge("writer", END)
 
app = graph.compile()

The LLM decides how many sections to create, so the number of parallel workers varies per invocation.

Aggregation with List Reducers

The operator.add reducer is the standard way to aggregate results from parallel workers:

import operator
from typing import TypedDict, Annotated
 
class State(TypedDict):
    inputs: list[str]
    # Each worker appends to this list
    outputs: Annotated[list[dict], operator.add]
    # Numeric aggregation
    scores: Annotated[list[float], operator.add]

When multiple workers return values for the same key, the reducer combines them. Without a reducer, the last write wins — which is non-deterministic with parallel workers.

Orchestrator-Worker Pattern

The full orchestrator-worker pattern has three stages: plan, execute in parallel, and synthesize:

import operator
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
 
class OrchestratorState(TypedDict):
    query: str
    tasks: list[str]
    results: Annotated[list[str], operator.add]
    final_answer: str
 
class TaskState(TypedDict):
    task: str
 
def plan(state: OrchestratorState) -> dict:
    tasks = [
        f"Research: {state['query']}",
        f"Find examples: {state['query']}",
        f"Check alternatives: {state['query']}",
    ]
    return {"tasks": tasks}
 
def distribute(state: OrchestratorState) -> list[Send]:
    return [Send("execute", {"task": t}) for t in state["tasks"]]
 
def execute(state: TaskState) -> dict:
    return {"results": [f"Completed: {state['task']}"]}
 
def synthesize(state: OrchestratorState) -> dict:
    combined = "\n".join(state["results"])
    return {"final_answer": f"Synthesis of {len(state['results'])} results:\n{combined}"}
 
graph = StateGraph(OrchestratorState)
graph.add_node("plan", plan)
graph.add_node("execute", execute)
graph.add_node("synthesize", synthesize)
graph.add_conditional_edges("plan", distribute, path_map=["execute"])
graph.add_edge(START, "plan")
graph.add_edge("execute", "synthesize")
graph.add_edge("synthesize", END)
 
app = graph.compile()
 
result = app.invoke({"query": "LangGraph patterns"})
print(result["final_answer"])

Key Takeaways

  • Send objects from conditional edges create dynamic parallel workers at runtime
  • The number of workers is determined by data, not by graph structure — enabling flexible fan-out
  • Each Send receives its own isolated state, separate from the parent graph's state
  • Use Annotated[list, operator.add] reducers to aggregate results from parallel workers
  • The orchestrator-worker pattern combines planning, parallel execution via Send, and synthesis
  • Without a reducer on the aggregation field, parallel writes produce non-deterministic results