Agent Foundry
CrewAI

Project: AI Newsletter Pipeline

AdvancedTopic 23 of 24Open in Colab

Project: AI Newsletter Pipeline

This project ties together Flows, multiple crews, and shared Pydantic state to automate a weekly AI newsletter: one crew researches trends, another writes article sections, and a final crew assembles and writes the finished issue to disk. You will use @start, @listen, and @router to orchestrate the stages and branch on a simple quality gate.

What You'll Build

A Flow that orchestrates three crews end to end:

  1. Research crew — discovers what is trending in AI for the week.
  2. Writing crew — turns that research into three newsletter sections (sequential tasks with task context).
  3. Assembly crew — combines sections into one newsletter document and saves it with output_file.

The Flow sets the topic, runs research, then writing, then uses a router to choose publish (go straight to assembly) or revise (polish the draft first, then assemble). Along the way you will use structured output where it helps, task context inside crews, and memory on the crews so agents can retain useful context across steps.

Features Used

  • Flows@start, @listen, @router
  • Crews — three specialized crews (research, writing, assembly)
  • Agents — researcher, analyst, writer, editor
  • Tasks — chained descriptions and context between tasks
  • Structured output — Pydantic models on selected tasks (output_pydantic)
  • Task context — downstream tasks consume prior task outputs in the same crew
  • Memorymemory=True on crews for conversational continuity within a run

Step 1: Define the state model

Use a single Pydantic model as Flow[NewsletterState] state. Every step reads and writes these fields so you do not have to thread long strings through every return value.

from pydantic import BaseModel
 
 
class NewsletterState(BaseModel):
    topic: str = "AI Agents"
    research: str = ""
    articles: list[str] = []
    newsletter: str = ""
    quality_score: int = 0

Step 2: Create the Research Crew (researcher + analyst, 2 tasks)

  • Researcher — first task: gather trends and sources for {topic} (and {research} placeholder if you inject prior notes from inputs).
  • Analyst — second task: synthesize the researcher’s output. Give this task context=[research_task] so it runs after the first task and sees its result.

Optionally set output_pydantic on the analyst task with a small model (for example a ResearchBrief with summary and key_points) so the crew returns structured fields you can log or pass into the next crew via kickoff(inputs=...).

Enable memory=True on the Crew so the analyst can use crew memory where the runtime supports it.


Step 3: Create the Writing Crew (writer agent, 3 section tasks)

One writer agent runs three sequential tasks:

  1. Section 1 — intro and top story from the research briefing (use {topic} and {research} in the description).
  2. Section 2context=[section_1_task] — deeper dive or second story, building on section 1.
  3. Section 3context=[section_2_task] — roundup, links, or “what to watch” using prior sections as context.

After kickoff(inputs={"topic": ..., "research": ...}), collect the three bodies (for example from result.tasks_output or your structured models) into self.state.articles. Derive a simple quality_score (for example from total length or from a structured “score” field) for the router in the next step.


Step 4: Create the Assembly Crew (editor agent, output_file)

A single editor agent with one assembly task:

  • Description — merge the three sections into a cohesive newsletter (Markdown): title, date line, sections, sign-off.
  • kickoff(inputs={"topic": ..., "combined": ...}) — pass the three sections joined as combined so the editor does not reuse task objects from the writing crew (cleaner than cross-crew context).
  • output_file — e.g. "newsletter.md" so the final artifact is written to disk.

Keep memory=True if you want the editor to align tone with earlier crew memory in the same process.


Step 5: Build the Flow with @start, @listen, and @router

Subclass Flow[NewsletterState] and wire the graph:

  1. @start() — set self.state.topic (and optionally override from kickoff inputs if your version supports passing initial state).
  2. @listen(set_topic) — run the research crew, assign self.state.research from the crew output (raw or structured).
  3. @listen(run_research_crew) — run the writing crew, fill self.state.articles and self.state.quality_score.
  4. @router(run_writing_crew) — return "publish" if quality is good enough, else "revise".
  5. @listen("publish") — run the assembly crew, set self.state.newsletter from the result.
  6. @listen("revise") — tighten the draft (for example append a short editorial pass or re-invoke a tiny revision task), update self.state.articles / quality_score, then run the same assembly crew so both branches produce newsletter.md. (In production you might loop back to the router instead; here one revision keeps the tutorial linear.)

Import decorators from crewai.flow.flow (same pattern as other Agent Foundry CrewAI lessons):

from crewai.flow.flow import Flow, listen, router, start

Step 6: Run the flow

Instantiate the flow and call kickoff(). If your CrewAI version accepts initial inputs, pass inputs={"topic": "Your Theme"} so the first step can read them from self.state or method arguments per your docs.

flow = NewsletterFlow()
result = flow.kickoff()
print(result)
print(flow.state.model_dump())

Open newsletter.md (or the path you set) to verify the assembled output.


Full Code

The following is a single, runnable sketch. Adjust model names, API keys, and output_pydantic models to match your installed CrewAI version.

from typing import Optional
 
from pydantic import BaseModel, Field
 
from crewai import Agent, Crew, Process, Task
from crewai.flow.flow import Flow, listen, router, start
 
 
# --- State -----------------------------------------------------------------
 
class NewsletterState(BaseModel):
    topic: str = "AI Agents"
    research: str = ""
    articles: list[str] = []
    newsletter: str = ""
    quality_score: int = 0
 
 
class ResearchBrief(BaseModel):
    summary: str = Field(description="Executive summary of the research")
    key_points: list[str] = Field(description="Bullet-level trends")
 
 
class QualityRating(BaseModel):
    score: int = Field(ge=1, le=10, description="Editorial readiness 1-10")
    note: str = ""
 
 
# --- Research crew ---------------------------------------------------------
 
researcher = Agent(
    role="AI Trends Researcher",
    goal="Find timely AI trends and credible angles for a weekly newsletter",
    backstory="You scan news, papers, and product launches for what practitioners care about this week.",
    verbose=True,
)
 
analyst = Agent(
    role="Research Analyst",
    goal="Turn raw research into a tight briefing the writer can use",
    backstory="You compress noise into clear themes and bullets.",
    verbose=True,
)
 
research_task = Task(
    description=(
        "Research the weekly AI landscape for the topic: {topic}. "
        "Return notable trends, tools, and one-line evidence for each."
    ),
    expected_output="Structured bullet research suitable for an analyst.",
    agent=researcher,
)
 
analysis_task = Task(
    description=(
        "Using the research above, produce a briefing: summary + key_points "
        "for a newsletter writer. Topic: {topic}."
    ),
    expected_output="A ResearchBrief JSON-matching object.",
    agent=analyst,
    context=[research_task],
    output_pydantic=ResearchBrief,
)
 
research_crew = Crew(
    agents=[researcher, analyst],
    tasks=[research_task, analysis_task],
    process=Process.sequential,
    memory=True,
    verbose=True,
)
 
# --- Writing crew ----------------------------------------------------------
 
writer = Agent(
    role="Newsletter Writer",
    goal="Write three engaging Markdown sections for busy AI readers",
    backstory="You write clear, scannable sections with strong ledes.",
    verbose=True,
)
 
section_1 = Task(
    description=(
        "Write **Section 1** (intro + lead story) for topic {topic}. "
        "Base it on this research briefing:\n{research}"
    ),
    expected_output="Markdown section 1 only.",
    agent=writer,
)
 
section_2 = Task(
    description=(
        "Write **Section 2** (second story or trend) continuing the newsletter. "
        "Stay consistent with Section 1."
    ),
    expected_output="Markdown section 2 only.",
    agent=writer,
    context=[section_1],
)
 
section_3 = Task(
    description=(
        "Write **Section 3** (roundup / links / what to watch). "
        "Align tone with prior sections."
    ),
    expected_output="Markdown section 3 only.",
    agent=writer,
    context=[section_2],
)
 
rate_task = Task(
    description="Rate editorial readiness of the three sections combined (1-10) and one short note.",
    expected_output="QualityRating",
    agent=writer,
    context=[section_1, section_2, section_3],
    output_pydantic=QualityRating,
)
 
writing_crew = Crew(
    agents=[writer],
    tasks=[section_1, section_2, section_3, rate_task],
    process=Process.sequential,
    memory=True,
    verbose=True,
)
 
# --- Assembly crew ---------------------------------------------------------
 
editor = Agent(
    role="Newsletter Editor",
    goal="Assemble sections into one polished Markdown newsletter file",
    backstory="You unify voice, fix transitions, and enforce a simple house style.",
    verbose=True,
)
 
assemble_task = Task(
    description=(
        "Combine the following draft sections into one newsletter in Markdown. "
        "Add a title line and a date placeholder. Unify voice and transitions.\n\n"
        "{combined}\n\nTopic: {topic}."
    ),
    expected_output="Full newsletter Markdown",
    agent=editor,
    output_file="newsletter.md",
)
 
assembly_crew = Crew(
    agents=[editor],
    tasks=[assemble_task],
    process=Process.sequential,
    memory=True,
    verbose=True,
)
 
 
# --- Flow ------------------------------------------------------------------
 
class NewsletterFlow(Flow[NewsletterState]):
    @start()
    def set_topic(self):
        self.state.topic = self.state.topic or "AI Agents"
        return self.state.topic
 
    @listen(set_topic)
    def run_research_crew(self, topic: str):
        out = research_crew.kickoff(inputs={"topic": topic})
        brief: Optional[ResearchBrief] = None
        for o in out.tasks_output:
            p = getattr(o, "pydantic", None)
            if isinstance(p, ResearchBrief):
                brief = p
                break
        if brief is not None:
            self.state.research = brief.summary + "\n" + "\n".join(f"- {x}" for x in brief.key_points)
        else:
            self.state.research = out.raw
        return self.state.research
 
    @listen(run_research_crew)
    def run_writing_crew(self, research: str):
        wout = writing_crew.kickoff(inputs={"topic": self.state.topic, "research": research})
        bodies = []
        rating: Optional[QualityRating] = None
        for o in wout.tasks_output:
            p = getattr(o, "pydantic", None)
            if isinstance(p, QualityRating):
                rating = p
            else:
                raw = getattr(o, "raw", None) or str(o)
                bodies.append(raw)
        if len(bodies) >= 3:
            self.state.articles = bodies[:3]
        else:
            self.state.articles = bodies
        if rating is not None:
            self.state.quality_score = rating.score
        else:
            self.state.quality_score = min(10, sum(len(a) for a in self.state.articles) // 400)
        return wout.raw
 
    @router(run_writing_crew)
    def quality_gate(self, _):
        if self.state.quality_score >= 7:
            return "publish"
        return "revise"
 
    @listen("publish")
    def publish_path(self):
        combined = "\n\n---\n\n".join(self.state.articles)
        aout = assembly_crew.kickoff(inputs={"topic": self.state.topic, "combined": combined})
        self.state.newsletter = aout.raw
        return self.state.newsletter
 
    @listen("revise")
    def revise_path(self):
        self.state.articles = [
            (a + "\n\n*( lightly edited for clarity )*") for a in self.state.articles
        ]
        self.state.quality_score = max(self.state.quality_score, 8)
        combined = "\n\n---\n\n".join(self.state.articles)
        aout = assembly_crew.kickoff(inputs={"topic": self.state.topic, "combined": combined})
        self.state.newsletter = aout.raw
        return self.state.newsletter
 
 
if __name__ == "__main__":
    flow = NewsletterFlow()
    print(flow.kickoff())
    print(flow.state.model_dump())

What You Learned

  • Flows let you sequence multiple crews and share typed state with Flow[NewsletterState].
  • @start kicks off the graph; @listen chains steps; @router chooses @listen("label") branches from runtime data (here, quality).
  • Research → writing → assembly mirrors real editorial pipelines; task context keeps each crew’s internal DAG explicit.
  • Structured outputs (output_pydantic) make it easier to fill research, articles, and quality_score without fragile string parsing.
  • output_file on a task is ideal for final artifacts like newsletter.md.
  • memory=True on crews is appropriate when you want agents to benefit from crew-level memory during a longer run.

You now have a template for multi-crew Flows with branching and file output—extend it with scheduling, human review steps, or a loop from revise back into run_writing_crew when you need stricter quality gates.