Project: Production Chat System
Project: Production Chat System
In this project, you'll build a production-ready chat system that combines session persistence with SQLiteSession, input/output guardrails for safety, human-in-the-loop for sensitive operations, streaming events for real-time display, custom tracing for observability, and RunConfig with error handlers for graceful degradation.
Architecture Overview
User Message → Input Guardrails → Chat Agent
├── SQLiteSession (persistence)
├── Tools (lookup, actions)
├── Human Approval (sensitive ops)
├── Streaming Events (real-time UI)
└── Custom Trace Processor (observability)
↓
Output Guardrails → Response
Step 1: Session Persistence with SQLiteSession
Use SQLiteSession to persist conversation history across restarts:
import sqlite3
from agents import Agent, Runner
from agents.extensions.sqlite_session import SQLiteSession
db = sqlite3.connect("chat_history.db")
session = SQLiteSession(db)
chat_agent = Agent(
name="Production Chat",
instructions=(
"You are a helpful customer service agent. "
"You have access to account lookup and order management tools. "
"Be concise and professional in your responses."
),
)
result = await Runner.run(
chat_agent,
"Hi, I need help with my recent order.",
session=session,
session_id="user-alice-001",
)
print(result.final_output)
result = await Runner.run(
chat_agent,
"Can you check order ORD-5678?",
session=session,
session_id="user-alice-001",
)
print(result.final_output)The second call automatically includes the conversation history from the first call.
Step 2: Input and Output Guardrails
Add safety guardrails to filter harmful inputs and outputs:
from agents import input_guardrail, output_guardrail, GuardrailFunctionOutput
@input_guardrail
async def block_injection(ctx, agent, input):
"""Block prompt injection attempts."""
injection_patterns = ["ignore previous instructions", "system prompt", "you are now"]
is_injection = any(pattern in input.lower() for pattern in injection_patterns)
return GuardrailFunctionOutput(
output_info={"injection_detected": is_injection},
tripwire_triggered=is_injection,
)
@input_guardrail
async def rate_limit_check(ctx, agent, input):
"""Check if user has exceeded rate limits."""
return GuardrailFunctionOutput(
output_info={"rate_limited": False},
tripwire_triggered=False,
)
@output_guardrail
async def block_internal_data(ctx, agent, output):
"""Block responses that leak internal system details."""
leak_patterns = ["internal error", "stack trace", "database connection", "api_secret"]
has_leak = any(pattern in output.lower() for pattern in leak_patterns)
return GuardrailFunctionOutput(
output_info={"internal_leak": has_leak},
tripwire_triggered=has_leak,
)Step 3: Tools with Human-in-the-Loop
Define tools — some execute immediately, others require human approval:
from agents import function_tool
@function_tool
def lookup_order(order_id: str) -> str:
"""Look up an order by ID."""
orders = {
"ORD-5678": "Widget Pro x2 - $99.98 - Shipped (tracking: TRK-111)",
"ORD-9012": "Gadget Plus x1 - $149.99 - Processing",
"ORD-3456": "Cable Kit x5 - $24.95 - Delivered",
}
return orders.get(order_id, "Order not found")
@function_tool
def lookup_account(email: str) -> str:
"""Look up a customer account by email."""
accounts = {
"alice@example.com": "Alice Johnson - Premium Tier - Member since 2022",
"bob@example.com": "Bob Smith - Free Tier - Member since 2024",
}
return accounts.get(email, "Account not found")
@function_tool(needs_approval=True)
def issue_refund(order_id: str, amount: float, reason: str) -> str:
"""Issue a refund for an order. Requires human approval."""
return f"Refund of ${amount} issued for {order_id}. Reason: {reason}"
@function_tool(needs_approval=True)
def cancel_order(order_id: str, reason: str) -> str:
"""Cancel an order. Requires human approval."""
return f"Order {order_id} cancelled. Reason: {reason}"
@function_tool
def search_faq(query: str) -> str:
"""Search the FAQ knowledge base."""
faqs = {
"shipping": "Standard shipping: 5-7 business days. Express: 2-3 days.",
"returns": "Returns accepted within 30 days. Free return shipping on Premium.",
"warranty": "All products come with a 1-year manufacturer warranty.",
}
results = [v for k, v in faqs.items() if k in query.lower()]
return "; ".join(results) if results else "No matching FAQ found"Step 4: Custom Trace Processor
Build a trace processor to log agent activity to a file for observability:
import json
import time
from agents.tracing import add_trace_processor, TracingProcessor, Trace, Span
class FileTraceProcessor(TracingProcessor):
def __init__(self, log_file: str = "agent_traces.jsonl"):
self.log_file = log_file
def _write(self, event: dict):
event["timestamp"] = time.time()
with open(self.log_file, "a") as f:
f.write(json.dumps(event) + "\n")
def on_trace_start(self, trace: Trace) -> None:
self._write({"event": "trace_start", "trace_id": trace.trace_id, "name": trace.name})
def on_trace_end(self, trace: Trace) -> None:
self._write({"event": "trace_end", "trace_id": trace.trace_id})
def on_span_start(self, span: Span) -> None:
self._write({"event": "span_start", "span_id": span.span_id, "data": str(span.span_data)})
def on_span_end(self, span: Span) -> None:
self._write({"event": "span_end", "span_id": span.span_id})
add_trace_processor(FileTraceProcessor())Step 5: RunConfig with Error Handlers
Configure the run for production with error handling and graceful degradation:
from agents import RunConfig
def handle_max_turns(error):
return (
"I apologize, but your request requires more steps than I can handle in a single interaction. "
"Could you break it down into smaller questions?"
)
production_config = RunConfig(
model="gpt-4o",
max_turns=15,
trace_include_sensitive_data=False,
error_handlers={"max_turns": handle_max_turns},
max_retries=3,
retry_delay=1.0,
)Step 6: Assemble the Production Agent
Bring everything together into a single production-ready agent:
chat_agent = Agent(
name="Production Chat",
instructions=(
"You are a professional customer service agent. "
"Use lookup tools to find order and account information. "
"For refunds and cancellations, the system will request human approval. "
"Search the FAQ for common questions. "
"Be concise, helpful, and empathetic."
),
tools=[lookup_order, lookup_account, issue_refund, cancel_order, search_faq],
input_guardrails=[block_injection, rate_limit_check],
output_guardrails=[block_internal_data],
)Step 7: Streaming Events Display
Use streaming to display agent activity in real time:
from agents import Runner
async def chat_with_streaming(agent, message, session, session_id, config):
"""Run agent with streaming and display events in real time."""
result = Runner.run_streamed(
agent,
message,
session=session,
session_id=session_id,
run_config=config,
)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, "delta") and event.data.delta:
print(event.data.delta, end="", flush=True)
elif event.type == "tool_start":
print(f"\n[Calling tool: {event.data.tool_name}]")
elif event.type == "tool_end":
print(f"[Tool result received]")
print()
final = await result.final_output()
return finalStep 8: Handling Approvals in Chat
Handle human-in-the-loop approvals in the chat flow:
from agents.exceptions import InputGuardrailTripwireTriggered, OutputGuardrailTripwireTriggered
async def chat_loop(agent, session, session_id, config):
"""Main chat loop with guardrails and approval handling."""
print("Production Chat System (type 'quit' to exit)")
print("=" * 50)
while True:
user_input = input("\nYou: ")
if user_input.lower() == "quit":
break
try:
result = await Runner.run(
agent,
user_input,
session=session,
session_id=session_id,
run_config=config,
)
if result.interruptions:
for interruption in result.interruptions:
print(f"\n[APPROVAL REQUIRED] Tool: {interruption.tool_name}")
print(f"Arguments: {interruption.tool_arguments}")
decision = input("Approve? (y/n): ")
state = result.to_state()
if decision.lower() == "y":
state.approve(interruption_id=interruption.id)
else:
state.reject(
interruption_id=interruption.id,
message="Rejected by operator",
)
result = await Runner.run(agent, state, run_config=config)
print(f"\nAgent: {result.final_output}")
except InputGuardrailTripwireTriggered:
print("\nAgent: I'm sorry, I can't process that request.")
except OutputGuardrailTripwireTriggered:
print("\nAgent: I encountered an issue generating a safe response. Please try again.")Step 9: Running the System
db = sqlite3.connect("chat_history.db")
session = SQLiteSession(db)
await chat_loop(chat_agent, session, "demo-session-001", production_config)Key Takeaways
- Use
SQLiteSessionto persist conversation history across sessions and restarts - Combine
@input_guardrailand@output_guardrailfor layered safety protection - Use
function_tool(needs_approval=True)for sensitive operations like refunds and cancellations - Handle
result.interruptionswithstate.approve()/state.reject()in a chat loop - Stream events with
Runner.run_streamed()for real-time UI feedback - Build custom
TracingProcessorimplementations for production observability - Configure
RunConfigwitherror_handlers,max_retries, andtrace_include_sensitive_data=Falsefor graceful degradation