Agent Foundry
LangChain

Project: Production RAG Pipeline

AdvancedTopic 22 of 22Open in Colab

Project: Production RAG Pipeline

In this project, you'll build a production-ready RAG pipeline with query enhancement, retrieval, answer validation, PII protection, conversation memory, and structured output with citations. This combines multiple advanced patterns into a single cohesive system.

What You'll Build

A production RAG system that can:

  • Enhance user queries for better retrieval
  • Retrieve relevant documents from a vector store
  • Validate answers against source documents
  • Strip PII from inputs and outputs
  • Maintain conversation history across turns
  • Return structured answers with citations

Step 1: Prepare the Knowledge Base

Create documents and index them into a vector store:

from langchain_core.documents import Document
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
 
documents = [
    Document(
        page_content="LangChain agents use a ReAct loop: the model reasons about what to do, takes an action using a tool, and observes the result before deciding the next step.",
        metadata={"source": "langchain-docs", "section": "agents"},
    ),
    Document(
        page_content="RAG (Retrieval-Augmented Generation) grounds LLM responses in external data by retrieving relevant documents before generating an answer.",
        metadata={"source": "langchain-docs", "section": "rag"},
    ),
    Document(
        page_content="LangGraph provides a graph-based framework for building stateful, multi-step agent workflows with built-in persistence and streaming.",
        metadata={"source": "langgraph-docs", "section": "overview"},
    ),
    Document(
        page_content="Guardrails protect agents from processing sensitive data. PIIMiddleware detects and redacts emails, credit cards, phone numbers, and SSNs.",
        metadata={"source": "langchain-docs", "section": "guardrails"},
    ),
    Document(
        page_content="Structured output uses Pydantic models with response_format to ensure the agent returns data in a predictable schema.",
        metadata={"source": "langchain-docs", "section": "structured-output"},
    ),
    Document(
        page_content="Memory in LangGraph agents uses checkpointers like InMemorySaver to persist conversation state across multiple turns.",
        metadata={"source": "langgraph-docs", "section": "memory"},
    ),
]
 
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = InMemoryVectorStore(embedding=embeddings)
vector_store.add_documents(documents)
retriever = vector_store.as_retriever(search_kwargs={"k": 3})

Step 2: Build the Query Enhancement Tool

Enhance user queries for better retrieval results:

from langchain_core.tools import tool
 
@tool
def enhance_query(query: str) -> str:
    """Enhance a user query for better retrieval results."""
    expansions = {
        "agent": "LangChain agent ReAct loop reasoning tools",
        "rag": "RAG retrieval augmented generation documents embeddings",
        "memory": "memory persistence conversation state checkpointer saver",
        "guardrails": "guardrails PII safety middleware protection",
    }
    enhanced = query
    for keyword, expansion in expansions.items():
        if keyword.lower() in query.lower():
            enhanced = f"{query} {expansion}"
            break
    return enhanced

Step 3: Build the Retriever Tool

Wrap the retriever in a tool with source citations:

@tool
def search_docs(query: str) -> str:
    """Search the documentation knowledge base."""
    docs = retriever.invoke(query)
    if not docs:
        return "No relevant documents found."
 
    results = []
    for doc in docs:
        source = doc.metadata.get("source", "unknown")
        section = doc.metadata.get("section", "general")
        results.append(f"[{source} > {section}]\n{doc.page_content}")
    return "\n\n---\n\n".join(results)

Step 4: Answer Validation Guardrail

Create middleware that validates answers are grounded in retrieved documents:

from langgraph.prebuilt.middleware import AgentMiddleware
 
class AnswerValidationMiddleware(AgentMiddleware):
    def __init__(self, required_phrases=None):
        self.required_phrases = required_phrases or ["source", "according to", "based on"]
 
    def after_agent(self, state):
        response = state["messages"][-1].content.lower()
        has_citation = any(phrase in response for phrase in self.required_phrases)
        if not has_citation and len(response) > 100:
            state["messages"][-1].content += (
                "\n\n_Note: This response may not be fully grounded in source documents._"
            )
        return state

Step 5: PII Middleware

Add PII protection to strip sensitive data from inputs:

from langgraph.prebuilt.middleware import PIIMiddleware
 
pii_middleware = PIIMiddleware(strategy="redact")

Step 6: Conversation Memory

Use InMemorySaver to maintain conversation history:

from langgraph.checkpoint.memory import InMemorySaver
 
memory = InMemorySaver()

Step 7: Structured Output

Define a CitedAnswer schema for structured responses:

from pydantic import BaseModel, Field
 
class CitedAnswer(BaseModel):
    answer: str = Field(description="The answer to the user's question")
    citations: list[str] = Field(description="List of source documents cited")
    confidence: str = Field(description="Confidence level: high, medium, or low")
    follow_up_questions: list[str] = Field(description="Suggested follow-up questions")

Step 8: Assemble the Production Agent

Combine all components into the final agent:

from langchain.chat_models import init_chat_model
from langgraph.prebuilt import create_react_agent
 
model = init_chat_model("gpt-4o-mini", model_provider="openai")
 
validation_middleware = AnswerValidationMiddleware()
 
agent = create_react_agent(
    model=model,
    tools=[enhance_query, search_docs],
    prompt=(
        "You are a documentation assistant. For every question:\n"
        "1. Use enhance_query to improve the search terms\n"
        "2. Use search_docs to find relevant documentation\n"
        "3. Answer based ONLY on retrieved documents\n"
        "4. Always cite your sources with [source > section] format\n"
        "If no relevant documents are found, say so clearly."
    ),
    checkpointer=memory,
    middleware=[pii_middleware, validation_middleware],
    response_format=CitedAnswer,
)

Step 9: Run the Pipeline

Query the production RAG system with conversation memory:

from langchain_core.messages import HumanMessage
 
config = {"configurable": {"thread_id": "user-session-1"}}
 
result = agent.invoke(
    {"messages": [HumanMessage(content="How do LangChain agents work?")]},
    config=config,
)
print(result["messages"][-1].content)

Follow up in the same conversation:

result = agent.invoke(
    {"messages": [HumanMessage(content="How do I add memory to them?")]},
    config=config,
)
print(result["messages"][-1].content)

Test PII redaction:

result = agent.invoke(
    {"messages": [HumanMessage(content="My email is alice@example.com, tell me about guardrails")]},
    config=config,
)
print(result["messages"][-1].content)

Full Code

from langchain.chat_models import init_chat_model
from langchain_core.documents import Document
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langgraph.prebuilt.middleware import AgentMiddleware, PIIMiddleware
from pydantic import BaseModel, Field
 
documents = [
    Document(page_content="LangChain agents use a ReAct loop: reason, act, observe.", metadata={"source": "langchain-docs", "section": "agents"}),
    Document(page_content="RAG grounds LLM responses in external data by retrieving relevant documents.", metadata={"source": "langchain-docs", "section": "rag"}),
    Document(page_content="LangGraph provides graph-based stateful agent workflows.", metadata={"source": "langgraph-docs", "section": "overview"}),
    Document(page_content="Guardrails protect agents from processing sensitive data.", metadata={"source": "langchain-docs", "section": "guardrails"}),
    Document(page_content="Structured output uses Pydantic models with response_format.", metadata={"source": "langchain-docs", "section": "structured-output"}),
    Document(page_content="Memory uses checkpointers like InMemorySaver for conversation persistence.", metadata={"source": "langgraph-docs", "section": "memory"}),
]
 
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = InMemoryVectorStore(embedding=embeddings)
vector_store.add_documents(documents)
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
 
@tool
def enhance_query(query: str) -> str:
    """Enhance a user query for better retrieval."""
    expansions = {"agent": "ReAct loop tools", "rag": "retrieval augmented generation", "memory": "checkpointer persistence"}
    for kw, exp in expansions.items():
        if kw in query.lower():
            return f"{query} {exp}"
    return query
 
@tool
def search_docs(query: str) -> str:
    """Search the documentation knowledge base."""
    docs = retriever.invoke(query)
    if not docs:
        return "No relevant documents found."
    return "\n\n".join(f"[{d.metadata['source']} > {d.metadata['section']}] {d.page_content}" for d in docs)
 
class AnswerValidationMiddleware(AgentMiddleware):
    def after_agent(self, state):
        response = state["messages"][-1].content.lower()
        if not any(p in response for p in ["source", "according", "based on"]) and len(response) > 100:
            state["messages"][-1].content += "\n\n_Note: Response may not be fully grounded._"
        return state
 
class CitedAnswer(BaseModel):
    answer: str = Field(description="The answer")
    citations: list[str] = Field(description="Sources cited")
    confidence: str = Field(description="high, medium, or low")
    follow_up_questions: list[str] = Field(description="Suggested follow-ups")
 
model = init_chat_model("gpt-4o-mini", model_provider="openai")
 
agent = create_react_agent(
    model=model,
    tools=[enhance_query, search_docs],
    prompt="You are a docs assistant. Enhance queries, search docs, answer with citations.",
    checkpointer=InMemorySaver(),
    middleware=[PIIMiddleware(strategy="redact"), AnswerValidationMiddleware()],
    response_format=CitedAnswer,
)
 
config = {"configurable": {"thread_id": "demo"}}
result = agent.invoke({"messages": [HumanMessage(content="How do agents work?")]}, config=config)
print(result["messages"][-1].content)

Key Takeaways

  • Production RAG combines query enhancement, retrieval, validation, PII protection, memory, and structured output
  • enhance_query tool expands user queries with domain-specific terms for better retrieval
  • AnswerValidationMiddleware checks that responses cite source documents
  • PIIMiddleware strips sensitive data before the agent processes it
  • InMemorySaver maintains conversation context across multiple turns via thread_id
  • CitedAnswer schema ensures responses include citations, confidence, and follow-up suggestions