Project: AI Newsletter Pipeline
Project: AI Newsletter Pipeline
This project ties together Flows, multiple crews, and shared Pydantic state to automate a weekly AI newsletter: one crew researches trends, another writes article sections, and a final crew assembles and writes the finished issue to disk. You will use @start, @listen, and @router to orchestrate the stages and branch on a simple quality gate.
What You'll Build
A Flow that orchestrates three crews end to end:
- Research crew — discovers what is trending in AI for the week.
- Writing crew — turns that research into three newsletter sections (sequential tasks with task context).
- Assembly crew — combines sections into one newsletter document and saves it with
output_file.
The Flow sets the topic, runs research, then writing, then uses a router to choose publish (go straight to assembly) or revise (polish the draft first, then assemble). Along the way you will use structured output where it helps, task context inside crews, and memory on the crews so agents can retain useful context across steps.
Features Used
- Flows —
@start,@listen,@router - Crews — three specialized crews (research, writing, assembly)
- Agents — researcher, analyst, writer, editor
- Tasks — chained descriptions and context between tasks
- Structured output — Pydantic models on selected tasks (
output_pydantic) - Task context — downstream tasks consume prior task outputs in the same crew
- Memory —
memory=Trueon crews for conversational continuity within a run
Step 1: Define the state model
Use a single Pydantic model as Flow[NewsletterState] state. Every step reads and writes these fields so you do not have to thread long strings through every return value.
from pydantic import BaseModel
class NewsletterState(BaseModel):
topic: str = "AI Agents"
research: str = ""
articles: list[str] = []
newsletter: str = ""
quality_score: int = 0Step 2: Create the Research Crew (researcher + analyst, 2 tasks)
- Researcher — first task: gather trends and sources for
{topic}(and{research}placeholder if you inject prior notes from inputs). - Analyst — second task: synthesize the researcher’s output. Give this task
context=[research_task]so it runs after the first task and sees its result.
Optionally set output_pydantic on the analyst task with a small model (for example a ResearchBrief with summary and key_points) so the crew returns structured fields you can log or pass into the next crew via kickoff(inputs=...).
Enable memory=True on the Crew so the analyst can use crew memory where the runtime supports it.
Step 3: Create the Writing Crew (writer agent, 3 section tasks)
One writer agent runs three sequential tasks:
- Section 1 — intro and top story from the research briefing (use
{topic}and{research}in the description). - Section 2 —
context=[section_1_task]— deeper dive or second story, building on section 1. - Section 3 —
context=[section_2_task]— roundup, links, or “what to watch” using prior sections as context.
After kickoff(inputs={"topic": ..., "research": ...}), collect the three bodies (for example from result.tasks_output or your structured models) into self.state.articles. Derive a simple quality_score (for example from total length or from a structured “score” field) for the router in the next step.
Step 4: Create the Assembly Crew (editor agent, output_file)
A single editor agent with one assembly task:
- Description — merge the three sections into a cohesive newsletter (Markdown): title, date line, sections, sign-off.
kickoff(inputs={"topic": ..., "combined": ...})— pass the three sections joined ascombinedso the editor does not reuse task objects from the writing crew (cleaner than cross-crewcontext).output_file— e.g."newsletter.md"so the final artifact is written to disk.
Keep memory=True if you want the editor to align tone with earlier crew memory in the same process.
Step 5: Build the Flow with @start, @listen, and @router
Subclass Flow[NewsletterState] and wire the graph:
@start()— setself.state.topic(and optionally override fromkickoffinputs if your version supports passing initial state).@listen(set_topic)— run the research crew, assignself.state.researchfrom the crew output (raw or structured).@listen(run_research_crew)— run the writing crew, fillself.state.articlesandself.state.quality_score.@router(run_writing_crew)— return"publish"if quality is good enough, else"revise".@listen("publish")— run the assembly crew, setself.state.newsletterfrom the result.@listen("revise")— tighten the draft (for example append a short editorial pass or re-invoke a tiny revision task), updateself.state.articles/quality_score, then run the same assembly crew so both branches producenewsletter.md. (In production you might loop back to the router instead; here one revision keeps the tutorial linear.)
Import decorators from crewai.flow.flow (same pattern as other Agent Foundry CrewAI lessons):
from crewai.flow.flow import Flow, listen, router, startStep 6: Run the flow
Instantiate the flow and call kickoff(). If your CrewAI version accepts initial inputs, pass inputs={"topic": "Your Theme"} so the first step can read them from self.state or method arguments per your docs.
flow = NewsletterFlow()
result = flow.kickoff()
print(result)
print(flow.state.model_dump())Open newsletter.md (or the path you set) to verify the assembled output.
Full Code
The following is a single, runnable sketch. Adjust model names, API keys, and output_pydantic models to match your installed CrewAI version.
from typing import Optional
from pydantic import BaseModel, Field
from crewai import Agent, Crew, Process, Task
from crewai.flow.flow import Flow, listen, router, start
# --- State -----------------------------------------------------------------
class NewsletterState(BaseModel):
topic: str = "AI Agents"
research: str = ""
articles: list[str] = []
newsletter: str = ""
quality_score: int = 0
class ResearchBrief(BaseModel):
summary: str = Field(description="Executive summary of the research")
key_points: list[str] = Field(description="Bullet-level trends")
class QualityRating(BaseModel):
score: int = Field(ge=1, le=10, description="Editorial readiness 1-10")
note: str = ""
# --- Research crew ---------------------------------------------------------
researcher = Agent(
role="AI Trends Researcher",
goal="Find timely AI trends and credible angles for a weekly newsletter",
backstory="You scan news, papers, and product launches for what practitioners care about this week.",
verbose=True,
)
analyst = Agent(
role="Research Analyst",
goal="Turn raw research into a tight briefing the writer can use",
backstory="You compress noise into clear themes and bullets.",
verbose=True,
)
research_task = Task(
description=(
"Research the weekly AI landscape for the topic: {topic}. "
"Return notable trends, tools, and one-line evidence for each."
),
expected_output="Structured bullet research suitable for an analyst.",
agent=researcher,
)
analysis_task = Task(
description=(
"Using the research above, produce a briefing: summary + key_points "
"for a newsletter writer. Topic: {topic}."
),
expected_output="A ResearchBrief JSON-matching object.",
agent=analyst,
context=[research_task],
output_pydantic=ResearchBrief,
)
research_crew = Crew(
agents=[researcher, analyst],
tasks=[research_task, analysis_task],
process=Process.sequential,
memory=True,
verbose=True,
)
# --- Writing crew ----------------------------------------------------------
writer = Agent(
role="Newsletter Writer",
goal="Write three engaging Markdown sections for busy AI readers",
backstory="You write clear, scannable sections with strong ledes.",
verbose=True,
)
section_1 = Task(
description=(
"Write **Section 1** (intro + lead story) for topic {topic}. "
"Base it on this research briefing:\n{research}"
),
expected_output="Markdown section 1 only.",
agent=writer,
)
section_2 = Task(
description=(
"Write **Section 2** (second story or trend) continuing the newsletter. "
"Stay consistent with Section 1."
),
expected_output="Markdown section 2 only.",
agent=writer,
context=[section_1],
)
section_3 = Task(
description=(
"Write **Section 3** (roundup / links / what to watch). "
"Align tone with prior sections."
),
expected_output="Markdown section 3 only.",
agent=writer,
context=[section_2],
)
rate_task = Task(
description="Rate editorial readiness of the three sections combined (1-10) and one short note.",
expected_output="QualityRating",
agent=writer,
context=[section_1, section_2, section_3],
output_pydantic=QualityRating,
)
writing_crew = Crew(
agents=[writer],
tasks=[section_1, section_2, section_3, rate_task],
process=Process.sequential,
memory=True,
verbose=True,
)
# --- Assembly crew ---------------------------------------------------------
editor = Agent(
role="Newsletter Editor",
goal="Assemble sections into one polished Markdown newsletter file",
backstory="You unify voice, fix transitions, and enforce a simple house style.",
verbose=True,
)
assemble_task = Task(
description=(
"Combine the following draft sections into one newsletter in Markdown. "
"Add a title line and a date placeholder. Unify voice and transitions.\n\n"
"{combined}\n\nTopic: {topic}."
),
expected_output="Full newsletter Markdown",
agent=editor,
output_file="newsletter.md",
)
assembly_crew = Crew(
agents=[editor],
tasks=[assemble_task],
process=Process.sequential,
memory=True,
verbose=True,
)
# --- Flow ------------------------------------------------------------------
class NewsletterFlow(Flow[NewsletterState]):
@start()
def set_topic(self):
self.state.topic = self.state.topic or "AI Agents"
return self.state.topic
@listen(set_topic)
def run_research_crew(self, topic: str):
out = research_crew.kickoff(inputs={"topic": topic})
brief: Optional[ResearchBrief] = None
for o in out.tasks_output:
p = getattr(o, "pydantic", None)
if isinstance(p, ResearchBrief):
brief = p
break
if brief is not None:
self.state.research = brief.summary + "\n" + "\n".join(f"- {x}" for x in brief.key_points)
else:
self.state.research = out.raw
return self.state.research
@listen(run_research_crew)
def run_writing_crew(self, research: str):
wout = writing_crew.kickoff(inputs={"topic": self.state.topic, "research": research})
bodies = []
rating: Optional[QualityRating] = None
for o in wout.tasks_output:
p = getattr(o, "pydantic", None)
if isinstance(p, QualityRating):
rating = p
else:
raw = getattr(o, "raw", None) or str(o)
bodies.append(raw)
if len(bodies) >= 3:
self.state.articles = bodies[:3]
else:
self.state.articles = bodies
if rating is not None:
self.state.quality_score = rating.score
else:
self.state.quality_score = min(10, sum(len(a) for a in self.state.articles) // 400)
return wout.raw
@router(run_writing_crew)
def quality_gate(self, _):
if self.state.quality_score >= 7:
return "publish"
return "revise"
@listen("publish")
def publish_path(self):
combined = "\n\n---\n\n".join(self.state.articles)
aout = assembly_crew.kickoff(inputs={"topic": self.state.topic, "combined": combined})
self.state.newsletter = aout.raw
return self.state.newsletter
@listen("revise")
def revise_path(self):
self.state.articles = [
(a + "\n\n*( lightly edited for clarity )*") for a in self.state.articles
]
self.state.quality_score = max(self.state.quality_score, 8)
combined = "\n\n---\n\n".join(self.state.articles)
aout = assembly_crew.kickoff(inputs={"topic": self.state.topic, "combined": combined})
self.state.newsletter = aout.raw
return self.state.newsletter
if __name__ == "__main__":
flow = NewsletterFlow()
print(flow.kickoff())
print(flow.state.model_dump())What You Learned
- Flows let you sequence multiple crews and share typed state with
Flow[NewsletterState]. @startkicks off the graph;@listenchains steps;@routerchooses@listen("label")branches from runtime data (here, quality).- Research → writing → assembly mirrors real editorial pipelines; task
contextkeeps each crew’s internal DAG explicit. - Structured outputs (
output_pydantic) make it easier to fillresearch,articles, andquality_scorewithout fragile string parsing. output_fileon a task is ideal for final artifacts likenewsletter.md.memory=Trueon crews is appropriate when you want agents to benefit from crew-level memory during a longer run.
You now have a template for multi-crew Flows with branching and file output—extend it with scheduling, human review steps, or a loop from revise back into run_writing_crew when you need stricter quality gates.