Agent Foundry
OpenAI Agents SDK

Project: Production Chat System

AdvancedTopic 22 of 22Open in Colab

Project: Production Chat System

In this project, you'll build a production-ready chat system that combines session persistence with SQLiteSession, input/output guardrails for safety, human-in-the-loop for sensitive operations, streaming events for real-time display, custom tracing for observability, and RunConfig with error handlers for graceful degradation.

Architecture Overview

User Message → Input Guardrails → Chat Agent
                                      ├── SQLiteSession (persistence)
                                      ├── Tools (lookup, actions)
                                      ├── Human Approval (sensitive ops)
                                      ├── Streaming Events (real-time UI)
                                      └── Custom Trace Processor (observability)
                                              ↓
                                   Output Guardrails → Response

Step 1: Session Persistence with SQLiteSession

Use SQLiteSession to persist conversation history across restarts:

import sqlite3
from agents import Agent, Runner
from agents.extensions.sqlite_session import SQLiteSession
 
db = sqlite3.connect("chat_history.db")
session = SQLiteSession(db)
 
chat_agent = Agent(
    name="Production Chat",
    instructions=(
        "You are a helpful customer service agent. "
        "You have access to account lookup and order management tools. "
        "Be concise and professional in your responses."
    ),
)
 
result = await Runner.run(
    chat_agent,
    "Hi, I need help with my recent order.",
    session=session,
    session_id="user-alice-001",
)
print(result.final_output)
 
result = await Runner.run(
    chat_agent,
    "Can you check order ORD-5678?",
    session=session,
    session_id="user-alice-001",
)
print(result.final_output)

The second call automatically includes the conversation history from the first call.

Step 2: Input and Output Guardrails

Add safety guardrails to filter harmful inputs and outputs:

from agents import input_guardrail, output_guardrail, GuardrailFunctionOutput
 
@input_guardrail
async def block_injection(ctx, agent, input):
    """Block prompt injection attempts."""
    injection_patterns = ["ignore previous instructions", "system prompt", "you are now"]
    is_injection = any(pattern in input.lower() for pattern in injection_patterns)
    return GuardrailFunctionOutput(
        output_info={"injection_detected": is_injection},
        tripwire_triggered=is_injection,
    )
 
@input_guardrail
async def rate_limit_check(ctx, agent, input):
    """Check if user has exceeded rate limits."""
    return GuardrailFunctionOutput(
        output_info={"rate_limited": False},
        tripwire_triggered=False,
    )
 
@output_guardrail
async def block_internal_data(ctx, agent, output):
    """Block responses that leak internal system details."""
    leak_patterns = ["internal error", "stack trace", "database connection", "api_secret"]
    has_leak = any(pattern in output.lower() for pattern in leak_patterns)
    return GuardrailFunctionOutput(
        output_info={"internal_leak": has_leak},
        tripwire_triggered=has_leak,
    )

Step 3: Tools with Human-in-the-Loop

Define tools — some execute immediately, others require human approval:

from agents import function_tool
 
@function_tool
def lookup_order(order_id: str) -> str:
    """Look up an order by ID."""
    orders = {
        "ORD-5678": "Widget Pro x2 - $99.98 - Shipped (tracking: TRK-111)",
        "ORD-9012": "Gadget Plus x1 - $149.99 - Processing",
        "ORD-3456": "Cable Kit x5 - $24.95 - Delivered",
    }
    return orders.get(order_id, "Order not found")
 
@function_tool
def lookup_account(email: str) -> str:
    """Look up a customer account by email."""
    accounts = {
        "alice@example.com": "Alice Johnson - Premium Tier - Member since 2022",
        "bob@example.com": "Bob Smith - Free Tier - Member since 2024",
    }
    return accounts.get(email, "Account not found")
 
@function_tool(needs_approval=True)
def issue_refund(order_id: str, amount: float, reason: str) -> str:
    """Issue a refund for an order. Requires human approval."""
    return f"Refund of ${amount} issued for {order_id}. Reason: {reason}"
 
@function_tool(needs_approval=True)
def cancel_order(order_id: str, reason: str) -> str:
    """Cancel an order. Requires human approval."""
    return f"Order {order_id} cancelled. Reason: {reason}"
 
@function_tool
def search_faq(query: str) -> str:
    """Search the FAQ knowledge base."""
    faqs = {
        "shipping": "Standard shipping: 5-7 business days. Express: 2-3 days.",
        "returns": "Returns accepted within 30 days. Free return shipping on Premium.",
        "warranty": "All products come with a 1-year manufacturer warranty.",
    }
    results = [v for k, v in faqs.items() if k in query.lower()]
    return "; ".join(results) if results else "No matching FAQ found"

Step 4: Custom Trace Processor

Build a trace processor to log agent activity to a file for observability:

import json
import time
from agents.tracing import add_trace_processor, TracingProcessor, Trace, Span
 
class FileTraceProcessor(TracingProcessor):
    def __init__(self, log_file: str = "agent_traces.jsonl"):
        self.log_file = log_file
 
    def _write(self, event: dict):
        event["timestamp"] = time.time()
        with open(self.log_file, "a") as f:
            f.write(json.dumps(event) + "\n")
 
    def on_trace_start(self, trace: Trace) -> None:
        self._write({"event": "trace_start", "trace_id": trace.trace_id, "name": trace.name})
 
    def on_trace_end(self, trace: Trace) -> None:
        self._write({"event": "trace_end", "trace_id": trace.trace_id})
 
    def on_span_start(self, span: Span) -> None:
        self._write({"event": "span_start", "span_id": span.span_id, "data": str(span.span_data)})
 
    def on_span_end(self, span: Span) -> None:
        self._write({"event": "span_end", "span_id": span.span_id})
 
add_trace_processor(FileTraceProcessor())

Step 5: RunConfig with Error Handlers

Configure the run for production with error handling and graceful degradation:

from agents import RunConfig
 
def handle_max_turns(error):
    return (
        "I apologize, but your request requires more steps than I can handle in a single interaction. "
        "Could you break it down into smaller questions?"
    )
 
production_config = RunConfig(
    model="gpt-4o",
    max_turns=15,
    trace_include_sensitive_data=False,
    error_handlers={"max_turns": handle_max_turns},
    max_retries=3,
    retry_delay=1.0,
)

Step 6: Assemble the Production Agent

Bring everything together into a single production-ready agent:

chat_agent = Agent(
    name="Production Chat",
    instructions=(
        "You are a professional customer service agent. "
        "Use lookup tools to find order and account information. "
        "For refunds and cancellations, the system will request human approval. "
        "Search the FAQ for common questions. "
        "Be concise, helpful, and empathetic."
    ),
    tools=[lookup_order, lookup_account, issue_refund, cancel_order, search_faq],
    input_guardrails=[block_injection, rate_limit_check],
    output_guardrails=[block_internal_data],
)

Step 7: Streaming Events Display

Use streaming to display agent activity in real time:

from agents import Runner
 
async def chat_with_streaming(agent, message, session, session_id, config):
    """Run agent with streaming and display events in real time."""
    result = Runner.run_streamed(
        agent,
        message,
        session=session,
        session_id=session_id,
        run_config=config,
    )
 
    async for event in result.stream_events():
        if event.type == "raw_response_event":
            if hasattr(event.data, "delta") and event.data.delta:
                print(event.data.delta, end="", flush=True)
        elif event.type == "tool_start":
            print(f"\n[Calling tool: {event.data.tool_name}]")
        elif event.type == "tool_end":
            print(f"[Tool result received]")
 
    print()
 
    final = await result.final_output()
    return final

Step 8: Handling Approvals in Chat

Handle human-in-the-loop approvals in the chat flow:

from agents.exceptions import InputGuardrailTripwireTriggered, OutputGuardrailTripwireTriggered
 
async def chat_loop(agent, session, session_id, config):
    """Main chat loop with guardrails and approval handling."""
    print("Production Chat System (type 'quit' to exit)")
    print("=" * 50)
 
    while True:
        user_input = input("\nYou: ")
        if user_input.lower() == "quit":
            break
 
        try:
            result = await Runner.run(
                agent,
                user_input,
                session=session,
                session_id=session_id,
                run_config=config,
            )
 
            if result.interruptions:
                for interruption in result.interruptions:
                    print(f"\n[APPROVAL REQUIRED] Tool: {interruption.tool_name}")
                    print(f"Arguments: {interruption.tool_arguments}")
                    decision = input("Approve? (y/n): ")
 
                    state = result.to_state()
                    if decision.lower() == "y":
                        state.approve(interruption_id=interruption.id)
                    else:
                        state.reject(
                            interruption_id=interruption.id,
                            message="Rejected by operator",
                        )
 
                    result = await Runner.run(agent, state, run_config=config)
 
            print(f"\nAgent: {result.final_output}")
 
        except InputGuardrailTripwireTriggered:
            print("\nAgent: I'm sorry, I can't process that request.")
        except OutputGuardrailTripwireTriggered:
            print("\nAgent: I encountered an issue generating a safe response. Please try again.")

Step 9: Running the System

db = sqlite3.connect("chat_history.db")
session = SQLiteSession(db)
 
await chat_loop(chat_agent, session, "demo-session-001", production_config)

Key Takeaways

  • Use SQLiteSession to persist conversation history across sessions and restarts
  • Combine @input_guardrail and @output_guardrail for layered safety protection
  • Use function_tool(needs_approval=True) for sensitive operations like refunds and cancellations
  • Handle result.interruptions with state.approve() / state.reject() in a chat loop
  • Stream events with Runner.run_streamed() for real-time UI feedback
  • Build custom TracingProcessor implementations for production observability
  • Configure RunConfig with error_handlers, max_retries, and trace_include_sensitive_data=False for graceful degradation