Project: Production RAG Pipeline
Project: Production RAG Pipeline
In this project, you'll build a production-ready RAG pipeline with query enhancement, retrieval, answer validation, PII protection, conversation memory, and structured output with citations. This combines multiple advanced patterns into a single cohesive system.
What You'll Build
A production RAG system that can:
- Enhance user queries for better retrieval
- Retrieve relevant documents from a vector store
- Validate answers against source documents
- Strip PII from inputs and outputs
- Maintain conversation history across turns
- Return structured answers with citations
Step 1: Prepare the Knowledge Base
Create documents and index them into a vector store:
from langchain_core.documents import Document
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
documents = [
Document(
page_content="LangChain agents use a ReAct loop: the model reasons about what to do, takes an action using a tool, and observes the result before deciding the next step.",
metadata={"source": "langchain-docs", "section": "agents"},
),
Document(
page_content="RAG (Retrieval-Augmented Generation) grounds LLM responses in external data by retrieving relevant documents before generating an answer.",
metadata={"source": "langchain-docs", "section": "rag"},
),
Document(
page_content="LangGraph provides a graph-based framework for building stateful, multi-step agent workflows with built-in persistence and streaming.",
metadata={"source": "langgraph-docs", "section": "overview"},
),
Document(
page_content="Guardrails protect agents from processing sensitive data. PIIMiddleware detects and redacts emails, credit cards, phone numbers, and SSNs.",
metadata={"source": "langchain-docs", "section": "guardrails"},
),
Document(
page_content="Structured output uses Pydantic models with response_format to ensure the agent returns data in a predictable schema.",
metadata={"source": "langchain-docs", "section": "structured-output"},
),
Document(
page_content="Memory in LangGraph agents uses checkpointers like InMemorySaver to persist conversation state across multiple turns.",
metadata={"source": "langgraph-docs", "section": "memory"},
),
]
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = InMemoryVectorStore(embedding=embeddings)
vector_store.add_documents(documents)
retriever = vector_store.as_retriever(search_kwargs={"k": 3})Step 2: Build the Query Enhancement Tool
Enhance user queries for better retrieval results:
from langchain_core.tools import tool
@tool
def enhance_query(query: str) -> str:
"""Enhance a user query for better retrieval results."""
expansions = {
"agent": "LangChain agent ReAct loop reasoning tools",
"rag": "RAG retrieval augmented generation documents embeddings",
"memory": "memory persistence conversation state checkpointer saver",
"guardrails": "guardrails PII safety middleware protection",
}
enhanced = query
for keyword, expansion in expansions.items():
if keyword.lower() in query.lower():
enhanced = f"{query} {expansion}"
break
return enhancedStep 3: Build the Retriever Tool
Wrap the retriever in a tool with source citations:
@tool
def search_docs(query: str) -> str:
"""Search the documentation knowledge base."""
docs = retriever.invoke(query)
if not docs:
return "No relevant documents found."
results = []
for doc in docs:
source = doc.metadata.get("source", "unknown")
section = doc.metadata.get("section", "general")
results.append(f"[{source} > {section}]\n{doc.page_content}")
return "\n\n---\n\n".join(results)Step 4: Answer Validation Guardrail
Create middleware that validates answers are grounded in retrieved documents:
from langgraph.prebuilt.middleware import AgentMiddleware
class AnswerValidationMiddleware(AgentMiddleware):
def __init__(self, required_phrases=None):
self.required_phrases = required_phrases or ["source", "according to", "based on"]
def after_agent(self, state):
response = state["messages"][-1].content.lower()
has_citation = any(phrase in response for phrase in self.required_phrases)
if not has_citation and len(response) > 100:
state["messages"][-1].content += (
"\n\n_Note: This response may not be fully grounded in source documents._"
)
return stateStep 5: PII Middleware
Add PII protection to strip sensitive data from inputs:
from langgraph.prebuilt.middleware import PIIMiddleware
pii_middleware = PIIMiddleware(strategy="redact")Step 6: Conversation Memory
Use InMemorySaver to maintain conversation history:
from langgraph.checkpoint.memory import InMemorySaver
memory = InMemorySaver()Step 7: Structured Output
Define a CitedAnswer schema for structured responses:
from pydantic import BaseModel, Field
class CitedAnswer(BaseModel):
answer: str = Field(description="The answer to the user's question")
citations: list[str] = Field(description="List of source documents cited")
confidence: str = Field(description="Confidence level: high, medium, or low")
follow_up_questions: list[str] = Field(description="Suggested follow-up questions")Step 8: Assemble the Production Agent
Combine all components into the final agent:
from langchain.chat_models import init_chat_model
from langgraph.prebuilt import create_react_agent
model = init_chat_model("gpt-4o-mini", model_provider="openai")
validation_middleware = AnswerValidationMiddleware()
agent = create_react_agent(
model=model,
tools=[enhance_query, search_docs],
prompt=(
"You are a documentation assistant. For every question:\n"
"1. Use enhance_query to improve the search terms\n"
"2. Use search_docs to find relevant documentation\n"
"3. Answer based ONLY on retrieved documents\n"
"4. Always cite your sources with [source > section] format\n"
"If no relevant documents are found, say so clearly."
),
checkpointer=memory,
middleware=[pii_middleware, validation_middleware],
response_format=CitedAnswer,
)Step 9: Run the Pipeline
Query the production RAG system with conversation memory:
from langchain_core.messages import HumanMessage
config = {"configurable": {"thread_id": "user-session-1"}}
result = agent.invoke(
{"messages": [HumanMessage(content="How do LangChain agents work?")]},
config=config,
)
print(result["messages"][-1].content)Follow up in the same conversation:
result = agent.invoke(
{"messages": [HumanMessage(content="How do I add memory to them?")]},
config=config,
)
print(result["messages"][-1].content)Test PII redaction:
result = agent.invoke(
{"messages": [HumanMessage(content="My email is alice@example.com, tell me about guardrails")]},
config=config,
)
print(result["messages"][-1].content)Full Code
from langchain.chat_models import init_chat_model
from langchain_core.documents import Document
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt.middleware import AgentMiddleware, PIIMiddleware
from pydantic import BaseModel, Field
documents = [
Document(page_content="LangChain agents use a ReAct loop: reason, act, observe.", metadata={"source": "langchain-docs", "section": "agents"}),
Document(page_content="RAG grounds LLM responses in external data by retrieving relevant documents.", metadata={"source": "langchain-docs", "section": "rag"}),
Document(page_content="LangGraph provides graph-based stateful agent workflows.", metadata={"source": "langgraph-docs", "section": "overview"}),
Document(page_content="Guardrails protect agents from processing sensitive data.", metadata={"source": "langchain-docs", "section": "guardrails"}),
Document(page_content="Structured output uses Pydantic models with response_format.", metadata={"source": "langchain-docs", "section": "structured-output"}),
Document(page_content="Memory uses checkpointers like InMemorySaver for conversation persistence.", metadata={"source": "langgraph-docs", "section": "memory"}),
]
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = InMemoryVectorStore(embedding=embeddings)
vector_store.add_documents(documents)
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
@tool
def enhance_query(query: str) -> str:
"""Enhance a user query for better retrieval."""
expansions = {"agent": "ReAct loop tools", "rag": "retrieval augmented generation", "memory": "checkpointer persistence"}
for kw, exp in expansions.items():
if kw in query.lower():
return f"{query} {exp}"
return query
@tool
def search_docs(query: str) -> str:
"""Search the documentation knowledge base."""
docs = retriever.invoke(query)
if not docs:
return "No relevant documents found."
return "\n\n".join(f"[{d.metadata['source']} > {d.metadata['section']}] {d.page_content}" for d in docs)
class AnswerValidationMiddleware(AgentMiddleware):
def after_agent(self, state):
response = state["messages"][-1].content.lower()
if not any(p in response for p in ["source", "according", "based on"]) and len(response) > 100:
state["messages"][-1].content += "\n\n_Note: Response may not be fully grounded._"
return state
class CitedAnswer(BaseModel):
answer: str = Field(description="The answer")
citations: list[str] = Field(description="Sources cited")
confidence: str = Field(description="high, medium, or low")
follow_up_questions: list[str] = Field(description="Suggested follow-ups")
model = init_chat_model("gpt-4o-mini", model_provider="openai")
agent = create_react_agent(
model=model,
tools=[enhance_query, search_docs],
prompt="You are a docs assistant. Enhance queries, search docs, answer with citations.",
checkpointer=InMemorySaver(),
middleware=[PIIMiddleware(strategy="redact"), AnswerValidationMiddleware()],
response_format=CitedAnswer,
)
config = {"configurable": {"thread_id": "demo"}}
result = agent.invoke({"messages": [HumanMessage(content="How do agents work?")]}, config=config)
print(result["messages"][-1].content)Key Takeaways
- Production RAG combines query enhancement, retrieval, validation, PII protection, memory, and structured output
enhance_querytool expands user queries with domain-specific terms for better retrievalAnswerValidationMiddlewarechecks that responses cite source documentsPIIMiddlewarestrips sensitive data before the agent processes itInMemorySavermaintains conversation context across multiple turns viathread_idCitedAnswerschema ensures responses include citations, confidence, and follow-up suggestions