Back to skills
SkillHub ClubAnalyze Data & AIFull StackFrontendData / AI

agentic-workflows

Build production-grade agentic AI systems with real-time streaming visibility, structured outputs, and multi-agent collaboration. Covers Anthropic/OpenAI/vLLM SDKs, A2A protocol for agent interoperability, Pydantic validation, LangGraph checkpointing for workflow resumption, vector DB memory (Pinecone/Chroma/FAISS), and guardrails for anti-hallucination. Use when building AI agents, multi-agent systems, tool-calling workflows, or applications requiring streaming agent reasoning to UI.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
0
Hot score
74
Updated
March 20, 2026
Overall rating
C0.0
Composite score
0.0
Best-practice grade
F39.6

Install command

npx @skill-hub/cli install deconvfft-resume-crafter-agentic-workflows

Repository

DeconvFFT/resume-crafter

Skill path: .claude/skills/agentic-workflows

Build production-grade agentic AI systems with real-time streaming visibility, structured outputs, and multi-agent collaboration. Covers Anthropic/OpenAI/vLLM SDKs, A2A protocol for agent interoperability, Pydantic validation, LangGraph checkpointing for workflow resumption, vector DB memory (Pinecone/Chroma/FAISS), and guardrails for anti-hallucination. Use when building AI agents, multi-agent systems, tool-calling workflows, or applications requiring streaming agent reasoning to UI.

Open repository

Best for

Primary workflow: Analyze Data & AI.

Technical facets: Full Stack, Frontend, Data / AI.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: DeconvFFT.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install agentic-workflows into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/DeconvFFT/resume-crafter before adding agentic-workflows to shared team environments
  • Use agentic-workflows for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: agentic-workflows
description: Build production-grade agentic AI systems with real-time streaming visibility, structured outputs, and multi-agent collaboration. Covers Anthropic/OpenAI/vLLM SDKs, A2A protocol for agent interoperability, Pydantic validation, LangGraph checkpointing for workflow resumption, vector DB memory (Pinecone/Chroma/FAISS), and guardrails for anti-hallucination. Use when building AI agents, multi-agent systems, tool-calling workflows, or applications requiring streaming agent reasoning to UI.
---

# Agentic Workflows Skill

Build intelligent, observable, and resilient AI agent systems.

## Architecture Decision Flow

```
New Agent System Request
           │
           ▼
┌──────────────────────────┐
│ Single task or multi-step?│
│ Single → Simple LLM call │
│ Multi-step → Agent loop  │
└──────────────────────────┘
           │
           ▼
┌──────────────────────────┐
│ Need multiple specialists?│
│ Yes → Multi-agent (A2A)  │
│ No → Single agent        │
└──────────────────────────┘
           │
           ▼
┌──────────────────────────┐
│ Long-running/resumable?   │
│ Yes → LangGraph + checkpoint│
│ No → Simple agent loop   │
└──────────────────────────┘
           │
           ▼
┌──────────────────────────┐
│ Need memory across sessions?│
│ Yes → Vector DB          │
│ No → In-session state    │
└──────────────────────────┘
```

## Provider Selection

| Provider | Best For | Streaming | Tools |
|----------|----------|-----------|-------|
| Anthropic Claude | Complex reasoning, extended thinking | SSE | Native |
| OpenAI GPT-4 | General purpose, function calling | SSE | Native |
| vLLM | Self-hosted, cost control | OpenAI-compatible | Via prompts |

## Quick Start Patterns

### Anthropic Streaming with Tools
```python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=4096,
    tools=[{"name": "search", "description": "Search the web", "input_schema": {...}}],
    messages=[{"role": "user", "content": "Research AI trends"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if hasattr(event.delta, "text"):
                print(event.delta.text, end="", flush=True)
            elif hasattr(event.delta, "thinking"):
                print(f"[Thinking] {event.delta.thinking}")
```

### Structured Output with Pydantic
```python
import instructor
from pydantic import BaseModel

class Analysis(BaseModel):
    summary: str
    confidence: float
    sources: list[str]

client = instructor.from_provider("anthropic/claude-sonnet-4-5")
result = client.create(
    response_model=Analysis,
    messages=[{"role": "user", "content": "Analyze market trends"}],
    max_retries=3
)
```

## Reference Documentation

| Task | Reference File |
|------|----------------|
| Anthropic/OpenAI/vLLM SDK patterns | [references/llm-sdks.md](references/llm-sdks.md) |
| Multi-agent with A2A protocol | [references/multi-agent.md](references/multi-agent.md) |
| Streaming to UI (SSE/WebSocket) | [references/streaming.md](references/streaming.md) |
| Pydantic structured outputs | [references/structured-outputs.md](references/structured-outputs.md) |
| Memory with vector DBs | [references/memory.md](references/memory.md) |
| Checkpointing & resumption | [references/checkpointing.md](references/checkpointing.md) |
| Guardrails & anti-hallucination | [references/guardrails.md](references/guardrails.md) |

## When to Use Multi-Agent

| Scenario | Approach |
|----------|----------|
| Different expertise needed | Multi-agent with specialists |
| Verification required | Debate pattern (critic agent) |
| Complex workflow orchestration | Supervisor + workers |
| Simple tool use | Single agent with tools |
| Independent subtasks | Parallel agents |

## Production Checklist

- [ ] Structured outputs with Pydantic validation
- [ ] Retry logic with exponential backoff
- [ ] Streaming to UI for visibility
- [ ] Checkpointing for long-running workflows
- [ ] Guardrails for input/output validation
- [ ] Memory persistence (vector DB or KV store)
- [ ] Error handling with graceful degradation
- [ ] Observability (logging, tracing)


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### references/llm-sdks.md

```markdown
# LLM SDK Integration

## Anthropic Claude SDK

### Basic Streaming
```python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
```

### Extended Thinking (Streaming)
```python
with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=16000,
    thinking={"type": "enabled", "budget_tokens": 10000},
    messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if hasattr(event.delta, "thinking_delta"):
                print(f"[Thinking] {event.delta.thinking}", end="")
            elif hasattr(event.delta, "text_delta"):
                print(event.delta.text, end="")
```

### Tool Use with Streaming
```python
tools = [{
    "name": "web_search",
    "description": "Search the web for information",
    "input_schema": {
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Search query"}
        },
        "required": ["query"]
    }
}]

response = client.messages.create(
    model="claude-sonnet-4-5",
    max_tokens=4096,
    tools=tools,
    messages=[{"role": "user", "content": "Search for AI news"}]
)

# Handle tool use
for block in response.content:
    if block.type == "tool_use":
        tool_name = block.name
        tool_input = block.input
        # Execute tool and continue conversation
```

### Async Client
```python
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def chat():
    async with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
```

## OpenAI SDK

### Streaming with Function Calling
```python
from openai import OpenAI

client = OpenAI()

tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get weather for a location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string"}
            },
            "required": ["location"]
        }
    }
}]

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's the weather in SF?"}],
    tools=tools,
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
    if chunk.choices[0].delta.tool_calls:
        # Handle tool calls
        pass
```

### Structured Outputs (Native)
```python
from pydantic import BaseModel

class CalendarEvent(BaseModel):
    name: str
    date: str
    participants: list[str]

response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Schedule a meeting..."}],
    response_format=CalendarEvent
)

event = response.choices[0].message.parsed
```

### Async Streaming
```python
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def stream_response():
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello"}],
        stream=True
    )
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content
```

## vLLM Integration

### OpenAI-Compatible API
```python
from openai import OpenAI

# Point to vLLM server
client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed"
)

response = client.chat.completions.create(
    model="meta-llama/Llama-3.1-8B-Instruct",
    messages=[{"role": "user", "content": "Hello"}],
    stream=True
)

for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
```

### vLLM Server Launch
```bash
vllm serve meta-llama/Llama-3.1-8B-Instruct \
    --host 0.0.0.0 \
    --port 8000 \
    --tensor-parallel-size 2 \
    --gpu-memory-utilization 0.9
```

## Multi-Provider Routing

```python
from enum import Enum
from typing import Optional

class Provider(Enum):
    ANTHROPIC = "anthropic"
    OPENAI = "openai"
    VLLM = "vllm"

class ModelRouter:
    def __init__(self):
        self.anthropic = anthropic.Anthropic()
        self.openai = OpenAI()
        self.vllm = OpenAI(base_url="http://localhost:8000/v1", api_key="x")
    
    def route(self, task_type: str) -> Provider:
        """Route based on task requirements."""
        routing = {
            "complex_reasoning": Provider.ANTHROPIC,
            "code_generation": Provider.ANTHROPIC,
            "simple_chat": Provider.VLLM,
            "function_calling": Provider.OPENAI,
            "cost_sensitive": Provider.VLLM,
        }
        return routing.get(task_type, Provider.OPENAI)
    
    async def complete(self, messages: list, task_type: str):
        provider = self.route(task_type)
        
        if provider == Provider.ANTHROPIC:
            return await self._anthropic_complete(messages)
        elif provider == Provider.OPENAI:
            return await self._openai_complete(messages)
        else:
            return await self._vllm_complete(messages)
```

## Error Handling & Retries

```python
from tenacity import retry, stop_after_attempt, wait_exponential
import anthropic

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=60)
)
async def robust_completion(messages: list):
    try:
        response = await client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=4096,
            messages=messages
        )
        return response
    except anthropic.RateLimitError:
        raise  # Let tenacity retry
    except anthropic.APIError as e:
        logger.error(f"API error: {e}")
        raise
```

```

### references/multi-agent.md

```markdown
# Multi-Agent Systems & A2A Protocol

## Google A2A Protocol Overview

A2A (Agent-to-Agent) is an open protocol for secure agent interoperability, now under Linux Foundation governance.

### Core Concepts

| Component | Description |
|-----------|-------------|
| Agent Card | JSON metadata at `/.well-known/agent.json` describing capabilities |
| Client Agent | Initiates tasks, interfaces with users |
| Remote Agent | Executes tasks, returns results |
| Task | Unit of work with lifecycle states |

### Agent Card Schema
```json
{
  "name": "research-agent",
  "description": "Performs deep research on topics",
  "url": "https://agent.example.com",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true
  },
  "skills": [
    {
      "id": "web-research",
      "name": "Web Research",
      "description": "Search and synthesize information from the web"
    }
  ],
  "authentication": {
    "schemes": ["bearer", "oauth2"]
  }
}
```

### A2A Communication Flow

```
1. Discovery: Client fetches /.well-known/agent.json
2. Task Creation: Client POSTs task to remote agent
3. Status Updates: Remote sends progress via SSE/webhook
4. Completion: Remote returns artifacts
```

### Python A2A Client
```python
from a2a import A2AClient, Task

# Discover agent
client = A2AClient()
agent_card = await client.discover("https://research-agent.example.com")

# Create task
task = Task(
    skill_id="web-research",
    input={"query": "Latest AI developments"},
    callback_url="https://my-app.com/webhook"
)

# Send task and stream results
async for update in client.execute_streaming(agent_card, task):
    if update.type == "progress":
        print(f"Progress: {update.message}")
    elif update.type == "artifact":
        print(f"Result: {update.content}")
```

### A2A Server Implementation
```python
from fastapi import FastAPI
from a2a.server import A2AServer, AgentCard, Skill

app = FastAPI()
a2a = A2AServer(app)

# Define agent capabilities
card = AgentCard(
    name="analysis-agent",
    description="Analyzes data and provides insights",
    skills=[
        Skill(id="analyze", name="Data Analysis", description="...")
    ]
)

@a2a.skill("analyze")
async def analyze_data(input: dict, context: A2AContext):
    # Stream progress
    await context.send_progress("Starting analysis...")
    
    # Do work
    result = await perform_analysis(input["data"])
    
    # Return artifact
    return {"analysis": result, "confidence": 0.95}

a2a.register(card)
```

## Multi-Agent Patterns

### Supervisor-Worker Pattern
```python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class AgentState(TypedDict):
    task: str
    plan: list[str]
    results: dict
    final_answer: str

def supervisor(state: AgentState) -> AgentState:
    """Supervisor plans and delegates."""
    plan = plan_task(state["task"])
    return {"plan": plan}

def researcher(state: AgentState) -> AgentState:
    """Research agent gathers information."""
    results = research(state["plan"][0])
    return {"results": {"research": results}}

def writer(state: AgentState) -> AgentState:
    """Writer agent synthesizes results."""
    answer = synthesize(state["results"])
    return {"final_answer": answer}

def router(state: AgentState) -> Literal["researcher", "writer", "end"]:
    if not state.get("results"):
        return "researcher"
    elif not state.get("final_answer"):
        return "writer"
    return "end"

# Build graph
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.add_conditional_edges("supervisor", router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", END)
graph.set_entry_point("supervisor")
```

### Debate Pattern (Critic Agent)
```python
class DebateState(TypedDict):
    question: str
    proposal: str
    critique: str
    revision: str
    rounds: int
    consensus: bool

def proposer(state: DebateState) -> DebateState:
    """Generate initial proposal or revision."""
    if state.get("critique"):
        proposal = revise_based_on_feedback(
            state["proposal"], 
            state["critique"]
        )
    else:
        proposal = generate_proposal(state["question"])
    return {"proposal": proposal, "rounds": state.get("rounds", 0) + 1}

def critic(state: DebateState) -> DebateState:
    """Critically evaluate the proposal."""
    critique = evaluate_proposal(state["proposal"])
    consensus = critique["score"] > 0.8
    return {"critique": critique["feedback"], "consensus": consensus}

def should_continue(state: DebateState) -> Literal["proposer", "end"]:
    if state["consensus"] or state["rounds"] >= 3:
        return "end"
    return "proposer"

# Build debate graph
graph = StateGraph(DebateState)
graph.add_node("proposer", proposer)
graph.add_node("critic", critic)
graph.add_edge("proposer", "critic")
graph.add_conditional_edges("critic", should_continue)
graph.set_entry_point("proposer")
```

### Parallel Agents
```python
from langgraph.graph import StateGraph
import asyncio

class ParallelState(TypedDict):
    query: str
    web_results: str
    db_results: str
    combined: str

async def web_search(state: ParallelState) -> dict:
    results = await search_web(state["query"])
    return {"web_results": results}

async def db_search(state: ParallelState) -> dict:
    results = await search_database(state["query"])
    return {"db_results": results}

def combine_results(state: ParallelState) -> ParallelState:
    combined = merge(state["web_results"], state["db_results"])
    return {"combined": combined}

# Parallel execution
graph = StateGraph(ParallelState)
graph.add_node("web_search", web_search)
graph.add_node("db_search", db_search)
graph.add_node("combine", combine_results)

# Both search nodes run in parallel from start
graph.set_entry_point("web_search")
graph.set_entry_point("db_search")
graph.add_edge("web_search", "combine")
graph.add_edge("db_search", "combine")
```

## When to Use Each Pattern

| Pattern | Use Case |
|---------|----------|
| Single Agent | Simple tasks, direct tool use |
| Supervisor-Worker | Complex workflows needing coordination |
| Debate/Critic | Quality-critical outputs, verification |
| Parallel | Independent subtasks, speed optimization |
| A2A Federation | Cross-organization, different frameworks |

## A2A vs MCP

| Protocol | Purpose |
|----------|---------|
| A2A | Agent-to-agent communication |
| MCP | Agent-to-tool/data communication |

Use MCP for tools and data sources, A2A for agent collaboration.

```

### references/streaming.md

```markdown
# Streaming Agent Visibility

## Transport Selection

| Transport | Best For | Bidirectional |
|-----------|----------|---------------|
| SSE (Server-Sent Events) | Simple streaming, wide browser support | No |
| WebSocket | Bidirectional, real-time interaction | Yes |
| HTTP Long-Polling | Fallback for restrictive networks | No |

## Server-Sent Events (SSE)

### FastAPI SSE Endpoint
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def stream_agent_response(query: str):
    """Stream agent thinking and responses."""
    async for event in run_agent(query):
        if event["type"] == "thinking":
            yield f"event: thinking\ndata: {json.dumps(event)}\n\n"
        elif event["type"] == "tool_call":
            yield f"event: tool_call\ndata: {json.dumps(event)}\n\n"
        elif event["type"] == "response":
            yield f"event: response\ndata: {json.dumps(event)}\n\n"
        elif event["type"] == "done":
            yield f"event: done\ndata: {json.dumps(event)}\n\n"

@app.get("/agent/stream")
async def agent_stream(query: str):
    return StreamingResponse(
        stream_agent_response(query),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # Disable nginx buffering
        }
    )
```

### React SSE Client
```typescript
function useAgentStream(query: string) {
  const [thinking, setThinking] = useState<string[]>([]);
  const [response, setResponse] = useState("");
  const [toolCalls, setToolCalls] = useState<ToolCall[]>([]);

  useEffect(() => {
    const eventSource = new EventSource(
      `/agent/stream?query=${encodeURIComponent(query)}`
    );

    eventSource.addEventListener("thinking", (e) => {
      const data = JSON.parse(e.data);
      setThinking((prev) => [...prev, data.content]);
    });

    eventSource.addEventListener("tool_call", (e) => {
      const data = JSON.parse(e.data);
      setToolCalls((prev) => [...prev, data]);
    });

    eventSource.addEventListener("response", (e) => {
      const data = JSON.parse(e.data);
      setResponse((prev) => prev + data.content);
    });

    eventSource.addEventListener("done", () => {
      eventSource.close();
    });

    eventSource.onerror = () => {
      eventSource.close();
    };

    return () => eventSource.close();
  }, [query]);

  return { thinking, response, toolCalls };
}
```

## WebSocket Implementation

### FastAPI WebSocket
```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

app = FastAPI()

class AgentSession:
    def __init__(self, websocket: WebSocket):
        self.websocket = websocket
        self.conversation_history = []
    
    async def send_event(self, event_type: str, data: dict):
        await self.websocket.send_json({
            "type": event_type,
            "data": data,
            "timestamp": time.time()
        })

@app.websocket("/agent/ws")
async def agent_websocket(websocket: WebSocket):
    await websocket.accept()
    session = AgentSession(websocket)
    
    try:
        while True:
            # Receive user message
            message = await websocket.receive_json()
            
            if message["type"] == "query":
                # Stream agent response
                async for event in run_agent(
                    message["content"],
                    session.conversation_history
                ):
                    await session.send_event(event["type"], event)
                
                # Update history
                session.conversation_history.append({
                    "role": "user",
                    "content": message["content"]
                })
                
            elif message["type"] == "interrupt":
                # Handle user interruption
                await cancel_current_task()
                
    except WebSocketDisconnect:
        pass
```

### React WebSocket Hook
```typescript
function useAgentWebSocket() {
  const [connected, setConnected] = useState(false);
  const [events, setEvents] = useState<AgentEvent[]>([]);
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    const ws = new WebSocket("ws://localhost:8000/agent/ws");
    wsRef.current = ws;

    ws.onopen = () => setConnected(true);
    ws.onclose = () => setConnected(false);
    
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setEvents((prev) => [...prev, data]);
    };

    return () => ws.close();
  }, []);

  const sendMessage = useCallback((content: string) => {
    wsRef.current?.send(JSON.stringify({
      type: "query",
      content
    }));
  }, []);

  const interrupt = useCallback(() => {
    wsRef.current?.send(JSON.stringify({ type: "interrupt" }));
  }, []);

  return { connected, events, sendMessage, interrupt };
}
```

## Streaming Anthropic to UI

### Full Pipeline
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json

app = FastAPI()
client = anthropic.Anthropic()

async def stream_claude_response(messages: list):
    """Stream Claude response with thinking visibility."""
    
    with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=4096,
        thinking={"type": "enabled", "budget_tokens": 8000},
        messages=messages
    ) as stream:
        for event in stream:
            if event.type == "content_block_start":
                if event.content_block.type == "thinking":
                    yield f"event: thinking_start\ndata: {{}}\n\n"
                elif event.content_block.type == "text":
                    yield f"event: response_start\ndata: {{}}\n\n"
                    
            elif event.type == "content_block_delta":
                if hasattr(event.delta, "thinking"):
                    data = {"content": event.delta.thinking}
                    yield f"event: thinking\ndata: {json.dumps(data)}\n\n"
                elif hasattr(event.delta, "text"):
                    data = {"content": event.delta.text}
                    yield f"event: response\ndata: {json.dumps(data)}\n\n"
                    
            elif event.type == "message_stop":
                yield f"event: done\ndata: {{}}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    messages = [{"role": "user", "content": request.content}]
    return StreamingResponse(
        stream_claude_response(messages),
        media_type="text/event-stream"
    )
```

## Framework-Agnostic Patterns

### Event Schema
```python
from pydantic import BaseModel
from typing import Literal
from datetime import datetime

class AgentEvent(BaseModel):
    type: Literal["thinking", "tool_call", "tool_result", "response", "error", "done"]
    content: str | dict | None = None
    timestamp: datetime
    metadata: dict = {}

# Consistent event emission
async def emit_event(event_type: str, content: any = None, **metadata):
    return AgentEvent(
        type=event_type,
        content=content,
        timestamp=datetime.utcnow(),
        metadata=metadata
    )
```

### Streamlit Integration
```python
import streamlit as st
import requests

st.title("Agent Chat")

if prompt := st.chat_input("Ask anything..."):
    with st.chat_message("user"):
        st.write(prompt)
    
    with st.chat_message("assistant"):
        thinking_placeholder = st.empty()
        response_placeholder = st.empty()
        
        thinking_text = ""
        response_text = ""
        
        # Stream from backend
        with requests.get(
            f"/agent/stream?query={prompt}",
            stream=True
        ) as r:
            for line in r.iter_lines():
                if line.startswith(b"event: "):
                    event_type = line[7:].decode()
                elif line.startswith(b"data: "):
                    data = json.loads(line[6:])
                    
                    if event_type == "thinking":
                        thinking_text += data.get("content", "")
                        with thinking_placeholder.expander("Thinking..."):
                            st.write(thinking_text)
                    elif event_type == "response":
                        response_text += data.get("content", "")
                        response_placeholder.write(response_text)
```

## Performance Considerations

1. **Buffering**: Disable proxy buffering (nginx, cloudflare)
2. **Heartbeats**: Send periodic pings to keep connection alive
3. **Reconnection**: Implement client-side reconnection with exponential backoff
4. **Compression**: Avoid for SSE (breaks streaming)

```

### references/structured-outputs.md

```markdown
# Structured Outputs with Pydantic

## Instructor Library

Instructor provides reliable structured extraction from LLMs with automatic validation and retries.

### Basic Usage
```python
import instructor
from pydantic import BaseModel, Field
from typing import Optional

class User(BaseModel):
    name: str = Field(description="User's full name")
    age: int = Field(ge=0, le=150, description="User's age")
    email: Optional[str] = Field(default=None, description="Email address")

# Multi-provider support
client = instructor.from_provider("anthropic/claude-sonnet-4-5")
# or: instructor.from_provider("openai/gpt-4o")
# or: instructor.from_provider("ollama/llama3")

user = client.create(
    response_model=User,
    messages=[{"role": "user", "content": "Extract: John Doe, 30 years old, [email protected]"}],
    max_retries=3
)

print(user)  # User(name='John Doe', age=30, email='[email protected]')
```

### Complex Nested Structures
```python
from pydantic import BaseModel
from typing import List, Literal
from enum import Enum

class Priority(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

class Task(BaseModel):
    title: str
    description: str
    priority: Priority
    assignee: Optional[str] = None

class Project(BaseModel):
    name: str
    tasks: List[Task]
    deadline: Optional[str] = None

project = client.create(
    response_model=Project,
    messages=[{
        "role": "user",
        "content": """
        Project: Website Redesign
        - Task: Update homepage (high priority, assigned to Alice)
        - Task: Fix mobile layout (medium priority)
        - Deadline: March 2025
        """
    }]
)
```

### Streaming Partial Objects
```python
from instructor import Partial

# Stream partial results as they're generated
for partial in client.create(
    response_model=Partial[Project],
    messages=[{"role": "user", "content": "..."}],
    stream=True
):
    print(partial)
    # Project(name='Website...', tasks=None)
    # Project(name='Website Redesign', tasks=[Task(...)])
    # ... progressively more complete
```

### Custom Validators
```python
from pydantic import BaseModel, field_validator, model_validator

class Analysis(BaseModel):
    summary: str
    confidence: float
    sources: List[str]
    
    @field_validator("confidence")
    @classmethod
    def validate_confidence(cls, v):
        if not 0 <= v <= 1:
            raise ValueError("Confidence must be between 0 and 1")
        return v
    
    @field_validator("sources")
    @classmethod
    def validate_sources(cls, v):
        if len(v) < 1:
            raise ValueError("At least one source required")
        return v
    
    @model_validator(mode="after")
    def validate_model(self):
        if self.confidence < 0.5 and len(self.sources) < 3:
            raise ValueError("Low confidence requires more sources")
        return self
```

### LLM-Powered Validation
```python
from instructor import llm_validator
from typing import Annotated
from pydantic import BeforeValidator

class ContentModeration(BaseModel):
    content: Annotated[
        str,
        BeforeValidator(
            llm_validator(
                """Content must be:
                1. Professional and appropriate
                2. Free of harmful or offensive language
                3. Factually accurate where verifiable
                """,
                client=client
            )
        )
    ]
```

## Anthropic Native Structured Outputs

### Tool-Based Extraction
```python
import anthropic

client = anthropic.Anthropic()

# Define schema as tool
extraction_tool = {
    "name": "extract_entities",
    "description": "Extract structured entities from text",
    "input_schema": {
        "type": "object",
        "properties": {
            "people": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "name": {"type": "string"},
                        "role": {"type": "string"}
                    },
                    "required": ["name"]
                }
            },
            "organizations": {
                "type": "array",
                "items": {"type": "string"}
            }
        },
        "required": ["people", "organizations"]
    }
}

response = client.messages.create(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    tools=[extraction_tool],
    tool_choice={"type": "tool", "name": "extract_entities"},
    messages=[{
        "role": "user",
        "content": "Extract entities: John Smith, CEO of Acme Corp, met with Jane Doe from Tech Inc."
    }]
)

# Parse tool use result
for block in response.content:
    if block.type == "tool_use":
        entities = block.input  # Already parsed JSON
```

## OpenAI Native Structured Outputs

```python
from openai import OpenAI
from pydantic import BaseModel

client = OpenAI()

class MathSolution(BaseModel):
    steps: List[str]
    final_answer: float
    units: Optional[str] = None

response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[{
        "role": "user",
        "content": "Solve: If a train travels 120 miles in 2 hours, what is its speed?"
    }],
    response_format=MathSolution
)

solution = response.choices[0].message.parsed
print(solution.final_answer)  # 60.0
print(solution.units)  # "miles per hour"
```

## Error Handling & Retries

```python
from instructor import Instructor
from tenacity import retry, stop_after_attempt, wait_exponential

class RobustExtractor:
    def __init__(self, provider: str):
        self.client = instructor.from_provider(provider)
    
    def extract(
        self,
        response_model: type,
        content: str,
        max_retries: int = 3
    ):
        try:
            return self.client.create(
                response_model=response_model,
                messages=[{"role": "user", "content": content}],
                max_retries=max_retries
            )
        except instructor.IncompleteOutputException as e:
            # Model output was truncated
            logger.warning(f"Incomplete output: {e}")
            return self._handle_incomplete(response_model, content)
        except ValidationError as e:
            # Pydantic validation failed after all retries
            logger.error(f"Validation failed: {e}")
            raise
```

## Best Practices

1. **Use descriptive Field descriptions** - Helps the LLM understand intent
2. **Start simple** - Add complexity only when needed
3. **Validate at multiple levels** - Field, model, and semantic validators
4. **Handle partial results** - Use streaming for long extractions
5. **Set appropriate retries** - Usually 2-3 is sufficient
6. **Test edge cases** - Empty inputs, ambiguous data, long text

```

### references/memory.md

```markdown
# Memory & Vector Databases

## Vector DB Selection

| Database | Best For | Hosting |
|----------|----------|---------|
| Pinecone | Production, managed, low-ops | Cloud only |
| Chroma | Local dev, prototyping, embedded | Self-hosted |
| FAISS | High-performance, on-prem, large-scale | Self-hosted |

## Pinecone (Production)

### Setup
```python
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key="your-api-key")

# Create index
pc.create_index(
    name="agent-memory",
    dimension=1536,  # OpenAI embedding dimension
    metric="cosine",
    spec=ServerlessSpec(cloud="aws", region="us-east-1")
)

index = pc.Index("agent-memory")
```

### Store & Retrieve
```python
from openai import OpenAI

openai_client = OpenAI()

def get_embedding(text: str) -> list[float]:
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

# Store memory
def store_memory(user_id: str, content: str, metadata: dict = {}):
    embedding = get_embedding(content)
    index.upsert(
        vectors=[{
            "id": f"{user_id}_{uuid.uuid4()}",
            "values": embedding,
            "metadata": {
                "user_id": user_id,
                "content": content,
                "timestamp": datetime.utcnow().isoformat(),
                **metadata
            }
        }],
        namespace=user_id
    )

# Retrieve relevant memories
def recall_memories(user_id: str, query: str, top_k: int = 5) -> list[dict]:
    query_embedding = get_embedding(query)
    results = index.query(
        namespace=user_id,
        vector=query_embedding,
        top_k=top_k,
        include_metadata=True
    )
    return [
        {
            "content": match.metadata["content"],
            "score": match.score,
            "timestamp": match.metadata["timestamp"]
        }
        for match in results.matches
    ]
```

## Chroma (Development)

### Setup
```python
import chromadb
from chromadb.config import Settings

# Persistent storage
client = chromadb.PersistentClient(path="./chroma_db")

# Or in-memory for testing
# client = chromadb.Client()

collection = client.get_or_create_collection(
    name="agent_memory",
    metadata={"hnsw:space": "cosine"}
)
```

### Store & Retrieve
```python
# Store with auto-embedding
collection.add(
    documents=["User prefers concise responses", "User is working on ML project"],
    metadatas=[
        {"user_id": "user123", "type": "preference"},
        {"user_id": "user123", "type": "context"}
    ],
    ids=["mem1", "mem2"]
)

# Query
results = collection.query(
    query_texts=["What does the user like?"],
    n_results=5,
    where={"user_id": "user123"}
)

for doc, metadata, distance in zip(
    results["documents"][0],
    results["metadatas"][0],
    results["distances"][0]
):
    print(f"[{1 - distance:.2f}] {doc}")
```

## FAISS (High Performance)

### Setup
```python
import faiss
import numpy as np
from dataclasses import dataclass

@dataclass
class MemoryStore:
    index: faiss.Index
    id_to_content: dict[int, dict]
    current_id: int = 0

def create_faiss_store(dimension: int = 1536) -> MemoryStore:
    # IVF index for large-scale
    quantizer = faiss.IndexFlatL2(dimension)
    index = faiss.IndexIVFFlat(quantizer, dimension, 100)
    
    # Or simple flat index for smaller datasets
    # index = faiss.IndexFlatL2(dimension)
    
    return MemoryStore(index=index, id_to_content={})
```

### Store & Retrieve
```python
def add_memory(store: MemoryStore, embedding: np.ndarray, content: dict):
    if not store.index.is_trained:
        # Train IVF index (need enough vectors)
        store.index.train(embedding.reshape(1, -1))
    
    store.index.add(embedding.reshape(1, -1))
    store.id_to_content[store.current_id] = content
    store.current_id += 1

def search_memories(
    store: MemoryStore, 
    query_embedding: np.ndarray, 
    k: int = 5
) -> list[dict]:
    distances, indices = store.index.search(
        query_embedding.reshape(1, -1), k
    )
    
    results = []
    for dist, idx in zip(distances[0], indices[0]):
        if idx != -1:  # Valid result
            results.append({
                **store.id_to_content[idx],
                "distance": float(dist)
            })
    return results

# Save/Load
def save_store(store: MemoryStore, path: str):
    faiss.write_index(store.index, f"{path}/index.faiss")
    with open(f"{path}/content.json", "w") as f:
        json.dump(store.id_to_content, f)
```

## Memory Patterns

### Short-Term vs Long-Term
```python
class AgentMemory:
    def __init__(self, vector_db):
        self.short_term: list[dict] = []  # Current session
        self.vector_db = vector_db  # Long-term
        self.max_short_term = 20
    
    def add(self, content: str, importance: float = 0.5):
        memory = {
            "content": content,
            "timestamp": datetime.utcnow(),
            "importance": importance
        }
        
        # Always add to short-term
        self.short_term.append(memory)
        if len(self.short_term) > self.max_short_term:
            self._consolidate()
        
        # High importance goes to long-term immediately
        if importance > 0.7:
            self._store_long_term(memory)
    
    def _consolidate(self):
        """Move important short-term to long-term, summarize rest."""
        important = [m for m in self.short_term if m["importance"] > 0.5]
        for memory in important:
            self._store_long_term(memory)
        
        # Summarize and store less important
        if len(self.short_term) > len(important):
            summary = self._summarize(self.short_term)
            self._store_long_term({"content": summary, "type": "summary"})
        
        self.short_term = []
    
    def recall(self, query: str, k: int = 5) -> list[dict]:
        """Combine short-term and long-term recall."""
        long_term = self.vector_db.search(query, k=k)
        
        # Add relevant short-term
        relevant_short = self._filter_relevant(self.short_term, query)
        
        return self._merge_and_rank(long_term, relevant_short)
```

### Memory Summarization
```python
async def summarize_memories(memories: list[dict], client) -> str:
    """Compress multiple memories into a summary."""
    content = "\n".join([m["content"] for m in memories])
    
    response = await client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=500,
        messages=[{
            "role": "user",
            "content": f"""Summarize these conversation memories into key points:

{content}

Provide a concise summary that preserves important information."""
        }]
    )
    return response.content[0].text
```

## Integration with Agents

```python
class MemoryAugmentedAgent:
    def __init__(self, llm_client, memory: AgentMemory):
        self.client = llm_client
        self.memory = memory
    
    async def respond(self, user_message: str) -> str:
        # Recall relevant memories
        memories = self.memory.recall(user_message, k=5)
        
        # Build context
        memory_context = "\n".join([
            f"- {m['content']}" for m in memories
        ])
        
        # Generate response with memory context
        response = await self.client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            system=f"""You have access to these memories about the user:
{memory_context}

Use this context to provide personalized, contextual responses.""",
            messages=[{"role": "user", "content": user_message}]
        )
        
        # Store new memory
        self.memory.add(
            f"User: {user_message}\nAssistant: {response.content[0].text}",
            importance=0.5
        )
        
        return response.content[0].text
```

```

### references/checkpointing.md

```markdown
# Checkpointing & Workflow Resumption

## LangGraph Checkpointing

LangGraph provides persistence for stateful workflows with automatic checkpointing.

### Core Concepts

| Concept | Description |
|---------|-------------|
| Checkpoint | Snapshot of graph state at a super-step |
| Thread | Unique ID for a conversation/workflow |
| Checkpointer | Persistence backend (memory, SQLite, Postgres) |

### Basic Setup
```python
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict

class AgentState(TypedDict):
    messages: list
    current_step: str
    results: dict

# Build graph
graph = StateGraph(AgentState)
graph.add_node("research", research_node)
graph.add_node("analyze", analyze_node)
graph.add_node("write", write_node)
graph.add_edge("research", "analyze")
graph.add_edge("analyze", "write")
graph.add_edge("write", END)
graph.set_entry_point("research")

# Compile with checkpointer
checkpointer = InMemorySaver()
app = graph.compile(checkpointer=checkpointer)

# Run with thread_id
config = {"configurable": {"thread_id": "workflow-123"}}
result = app.invoke({"messages": ["Start research"]}, config)
```

### Production Checkpointers

#### SQLite (Local/Development)
```python
from langgraph.checkpoint.sqlite import SqliteSaver

# Sync
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")

# Async
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
checkpointer = AsyncSqliteSaver.from_conn_string("checkpoints.db")
```

#### PostgreSQL (Production)
```python
from langgraph.checkpoint.postgres import PostgresSaver

checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/db"
)

# With connection pooling
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncpg

pool = await asyncpg.create_pool(
    "postgresql://user:pass@localhost/db",
    min_size=5,
    max_size=20
)
checkpointer = AsyncPostgresSaver(pool)
```

#### DynamoDB (AWS)
```python
from langgraph_checkpoint_aws import DynamoDBSaver

checkpointer = DynamoDBSaver(
    table_name="langgraph-checkpoints",
    s3_bucket="langgraph-large-payloads",  # For checkpoints > 350KB
    enable_checkpoint_compression=True
)
```

### Resume from Failure
```python
async def run_with_recovery(
    app, 
    initial_state: dict, 
    thread_id: str
):
    config = {"configurable": {"thread_id": thread_id}}
    
    try:
        # Check for existing checkpoint
        checkpoint = await app.checkpointer.get(config)
        
        if checkpoint:
            print(f"Resuming from checkpoint: {checkpoint['id']}")
            # Resume from last checkpoint
            result = await app.ainvoke(None, config)
        else:
            # Start fresh
            result = await app.ainvoke(initial_state, config)
        
        return result
        
    except Exception as e:
        # State is saved at last successful step
        print(f"Failed at step, can resume: {e}")
        raise
```

### Time Travel (Debug)
```python
# List all checkpoints for a thread
checkpoints = list(app.checkpointer.list(config))

for cp in checkpoints:
    print(f"Checkpoint {cp.checkpoint_id}: {cp.metadata}")

# Resume from specific checkpoint
specific_config = {
    "configurable": {
        "thread_id": "workflow-123",
        "checkpoint_id": "checkpoint-abc"
    }
}
result = app.invoke(None, specific_config)
```

### Human-in-the-Loop
```python
from langgraph.graph import StateGraph, END

def approval_node(state: AgentState) -> AgentState:
    """Node that requires human approval."""
    return {"pending_approval": True}

def process_approval(state: AgentState) -> AgentState:
    """Process after approval received."""
    return {"approved": True, "pending_approval": False}

graph = StateGraph(AgentState)
graph.add_node("generate", generate_node)
graph.add_node("await_approval", approval_node)
graph.add_node("process", process_approval)

# Interrupt before approval
graph.add_edge("generate", "await_approval")
graph.add_conditional_edges(
    "await_approval",
    lambda s: "wait" if s.get("pending_approval") else "process",
    {"wait": END, "process": "process"}
)

app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["await_approval"]  # Pause here
)

# First run - stops at approval
result = app.invoke(initial_state, config)
# Returns with pending_approval=True

# Later - resume after human approves
app.update_state(config, {"pending_approval": False, "approved": True})
final_result = app.invoke(None, config)
```

## Custom Checkpointing

### Database-Backed State
```python
from abc import ABC, abstractmethod
from typing import Optional
import json

class WorkflowState:
    def __init__(self, workflow_id: str, db):
        self.workflow_id = workflow_id
        self.db = db
    
    async def save(self, step: str, data: dict):
        """Save checkpoint."""
        await self.db.execute(
            """
            INSERT INTO checkpoints (workflow_id, step, data, created_at)
            VALUES ($1, $2, $3, NOW())
            ON CONFLICT (workflow_id, step) 
            DO UPDATE SET data = $3, created_at = NOW()
            """,
            self.workflow_id, step, json.dumps(data)
        )
    
    async def load(self, step: Optional[str] = None) -> Optional[dict]:
        """Load latest or specific checkpoint."""
        if step:
            row = await self.db.fetchrow(
                "SELECT data FROM checkpoints WHERE workflow_id = $1 AND step = $2",
                self.workflow_id, step
            )
        else:
            row = await self.db.fetchrow(
                """
                SELECT data FROM checkpoints 
                WHERE workflow_id = $1 
                ORDER BY created_at DESC LIMIT 1
                """,
                self.workflow_id
            )
        
        return json.loads(row["data"]) if row else None
    
    async def get_last_step(self) -> Optional[str]:
        """Get the last completed step."""
        row = await self.db.fetchrow(
            """
            SELECT step FROM checkpoints 
            WHERE workflow_id = $1 
            ORDER BY created_at DESC LIMIT 1
            """,
            self.workflow_id
        )
        return row["step"] if row else None
```

### Resumable Agent Loop
```python
class ResumableAgent:
    def __init__(self, llm_client, state: WorkflowState):
        self.client = llm_client
        self.state = state
        
    async def run(self, task: str):
        # Check for existing progress
        last_step = await self.state.get_last_step()
        checkpoint = await self.state.load() if last_step else None
        
        if checkpoint:
            print(f"Resuming from step: {last_step}")
            messages = checkpoint.get("messages", [])
            step_index = checkpoint.get("step_index", 0)
        else:
            messages = [{"role": "user", "content": task}]
            step_index = 0
        
        steps = ["plan", "research", "draft", "review", "finalize"]
        
        for i, step in enumerate(steps[step_index:], start=step_index):
            try:
                result = await self._execute_step(step, messages)
                messages.append({"role": "assistant", "content": result})
                
                # Checkpoint after each step
                await self.state.save(step, {
                    "messages": messages,
                    "step_index": i + 1,
                    "step_result": result
                })
                
            except Exception as e:
                print(f"Failed at {step}, checkpoint saved")
                raise
        
        return messages[-1]["content"]
```

## Best Practices

1. **Choose the right backend**:
   - InMemorySaver: Testing only
   - SQLite: Local dev, single instance
   - PostgreSQL: Production, multi-instance
   - DynamoDB: Serverless, AWS-native

2. **Checkpoint granularity**: Save after expensive operations, not every micro-step

3. **State serialization**: Ensure all state is JSON-serializable

4. **Thread management**: Use meaningful thread IDs (user_id + session_id)

5. **Cleanup**: Implement TTL or manual cleanup for old checkpoints

6. **Compression**: Enable for large states (DynamoDB/Postgres)

```

### references/guardrails.md

```markdown
# Guardrails & Anti-Hallucination

## Framework Selection

| Framework | Best For | Approach |
|-----------|----------|----------|
| NeMo Guardrails | Conversational AI, dialog control | Colang rules + LLM checks |
| Guardrails AI | Structured output validation | Pydantic + custom validators |
| Custom | Specific domain requirements | RAG grounding + fact-checking |

## NVIDIA NeMo Guardrails

### Installation
```bash
pip install nemoguardrails
```

### Configuration Structure
```
config/
├── config.yml          # Main configuration
├── prompts.yml         # Custom prompts
└── rails/
    ├── input.co        # Input rails (Colang)
    ├── output.co       # Output rails
    └── dialog.co       # Dialog flow rails
```

### config.yml
```yaml
models:
  - type: main
    engine: anthropic
    model: claude-sonnet-4-5

rails:
  input:
    flows:
      - check jailbreak
      - check toxicity
      - mask sensitive data
  
  output:
    flows:
      - self check facts
      - self check hallucination
      - check toxicity

  config:
    sensitive_data_detection:
      input:
        entities:
          - PERSON
          - EMAIL_ADDRESS
          - PHONE_NUMBER
          - SSN
    
    fact_checking:
      provider: alignscore  # or custom
```

### Colang Rails (input.co)
```colang
define user ask about competitors
  "What do you think about [competitor]?"
  "How does [competitor] compare?"
  "Is [competitor] better?"

define bot refuse competitor discussion
  "I focus on our products and services. 
   I'd be happy to help you with questions about what we offer."

define flow check competitor mentions
  user ask about competitors
  bot refuse competitor discussion
```

### Hallucination Detection
```yaml
# config.yml
rails:
  output:
    flows:
      - self check hallucination

prompts:
  - task: self_check_hallucination
    content: |
      Your task is to check if the bot's response contains any hallucinations.
      
      User message: {{ user_input }}
      Bot response: {{ bot_response }}
      Context provided: {{ context }}
      
      Check if the response:
      1. Contains information not supported by the context
      2. Makes claims that cannot be verified
      3. Invents facts, dates, or statistics
      
      Respond with "yes" if hallucination detected, "no" otherwise.
```

### Python Integration
```python
from nemoguardrails import RailsConfig, LLMRails

config = RailsConfig.from_path("./config")
rails = LLMRails(config)

# With guardrails
response = await rails.generate_async(
    messages=[{"role": "user", "content": "Tell me about our Q3 results"}]
)

# Check if blocked
if response.get("blocked"):
    print(f"Blocked by: {response.get('blocked_by')}")
else:
    print(response["content"])
```

## Guardrails AI

### Structured Validation
```python
from guardrails import Guard
from guardrails.validators import ValidLength, ValidRange, OneLine
from pydantic import BaseModel, Field

class ProductReview(BaseModel):
    summary: str = Field(
        validators=[ValidLength(min=10, max=200, on_fail="reask")]
    )
    rating: float = Field(
        validators=[ValidRange(min=1, max=5, on_fail="fix")]
    )
    pros: list[str]
    cons: list[str]

guard = Guard.from_pydantic(ProductReview)

result = guard(
    llm_api=openai.chat.completions.create,
    model="gpt-4o",
    messages=[{"role": "user", "content": "Review this product..."}]
)

if result.validation_passed:
    review = result.validated_output
else:
    print(f"Validation failed: {result.error}")
```

### Custom Validators
```python
from guardrails.validators import Validator, register_validator
from typing import Any

@register_validator(name="no_competitor_mentions", data_type="string")
class NoCompetitorMentions(Validator):
    def __init__(self, competitors: list[str], on_fail: str = "fix"):
        super().__init__(on_fail=on_fail)
        self.competitors = [c.lower() for c in competitors]
    
    def validate(self, value: Any, metadata: dict) -> dict:
        text_lower = value.lower()
        
        for competitor in self.competitors:
            if competitor in text_lower:
                return {
                    "outcome": "fail",
                    "error_message": f"Response mentions competitor: {competitor}",
                    "fix_value": self._redact(value, competitor)
                }
        
        return {"outcome": "pass"}
    
    def _redact(self, text: str, competitor: str) -> str:
        import re
        return re.sub(competitor, "[REDACTED]", text, flags=re.IGNORECASE)
```

## RAG-Based Fact Grounding

### Grounded Response Generation
```python
from typing import Optional

class GroundedAgent:
    def __init__(self, llm_client, retriever):
        self.client = llm_client
        self.retriever = retriever
    
    async def respond(
        self, 
        query: str, 
        require_citation: bool = True
    ) -> dict:
        # Retrieve relevant context
        docs = await self.retriever.search(query, k=5)
        context = "\n\n".join([d.content for d in docs])
        
        # Generate grounded response
        response = await self.client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=2048,
            system=f"""Answer based ONLY on the provided context.
If the answer is not in the context, say "I don't have information about that."
Always cite your sources using [Source N] notation.

Context:
{context}""",
            messages=[{"role": "user", "content": query}]
        )
        
        answer = response.content[0].text
        
        # Verify citations exist
        if require_citation:
            verified = await self._verify_citations(answer, docs)
            if not verified["all_valid"]:
                answer = await self._regenerate_with_valid_citations(
                    query, context, verified["invalid_claims"]
                )
        
        return {
            "answer": answer,
            "sources": docs,
            "grounded": True
        }
    
    async def _verify_citations(
        self, 
        answer: str, 
        docs: list
    ) -> dict:
        """Verify that cited claims actually appear in sources."""
        verification = await self.client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": f"""Verify each claim in this answer is supported by the sources.

Answer: {answer}

Sources:
{[d.content for d in docs]}

List any claims NOT supported by sources."""
            }]
        )
        
        unsupported = verification.content[0].text
        return {
            "all_valid": "none" in unsupported.lower() or unsupported.strip() == "",
            "invalid_claims": unsupported
        }
```

### Confidence Scoring
```python
class ConfidenceScorer:
    def __init__(self, llm_client):
        self.client = llm_client
    
    async def score(
        self, 
        question: str, 
        answer: str, 
        context: Optional[str] = None
    ) -> dict:
        prompt = f"""Rate the confidence of this answer on a scale of 0-1.

Question: {question}
Answer: {answer}
{"Context: " + context if context else ""}

Consider:
- Factual accuracy (is it verifiable?)
- Completeness (does it fully answer?)
- Hedging language (does it express uncertainty?)
- Specificity (are claims specific or vague?)

Respond with JSON: {{"confidence": 0.X, "reasoning": "..."}}"""

        response = await self.client.messages.create(
            model="claude-sonnet-4-5",
            max_tokens=500,
            messages=[{"role": "user", "content": prompt}]
        )
        
        import json
        return json.loads(response.content[0].text)
```

## Multi-Layer Guardrails

### Complete Pipeline
```python
class GuardedAgent:
    def __init__(self, llm_client, retriever, config: dict):
        self.client = llm_client
        self.retriever = retriever
        self.config = config
    
    async def respond(self, user_input: str) -> dict:
        # Layer 1: Input validation
        input_check = await self._check_input(user_input)
        if not input_check["safe"]:
            return {"blocked": True, "reason": input_check["reason"]}
        
        # Layer 2: RAG grounding
        context = await self._get_context(user_input)
        
        # Layer 3: Generate with constraints
        response = await self._generate(user_input, context)
        
        # Layer 4: Output validation
        output_check = await self._check_output(response, context)
        if not output_check["valid"]:
            # Regenerate with feedback
            response = await self._regenerate(
                user_input, context, output_check["issues"]
            )
        
        # Layer 5: Confidence check
        confidence = await self._score_confidence(
            user_input, response, context
        )
        
        if confidence["score"] < self.config["min_confidence"]:
            response = self._add_uncertainty_disclaimer(response)
        
        return {
            "response": response,
            "confidence": confidence["score"],
            "sources": context.sources if context else []
        }
    
    async def _check_input(self, text: str) -> dict:
        """Check for jailbreaks, PII, off-topic."""
        # Use NeMo or custom checks
        pass
    
    async def _check_output(self, response: str, context) -> dict:
        """Verify factual accuracy, no hallucinations."""
        pass
```

## Best Practices

1. **Layer defenses**: Input → Generation → Output → Confidence
2. **Ground in sources**: Always retrieve context before generating
3. **Verify claims**: Cross-check generated facts against sources
4. **Express uncertainty**: Add disclaimers for low-confidence answers
5. **Monitor and iterate**: Log blocked content, refine rules
6. **Test adversarially**: Regularly probe for bypasses

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### scripts/example_agent.py

```python
#!/usr/bin/env python3
"""
Example: Streaming Agent with Checkpointing and Guardrails

This demonstrates a production-ready agentic workflow combining:
- Anthropic streaming with extended thinking
- Pydantic structured outputs
- LangGraph checkpointing
- Basic guardrails
"""

import asyncio
import json
from typing import TypedDict, Optional
from pydantic import BaseModel, Field
import instructor

# Uncomment and configure as needed:
# import anthropic
# from langgraph.graph import StateGraph, END
# from langgraph.checkpoint.memory import InMemorySaver


# ============= Structured Output Models =============

class ResearchResult(BaseModel):
    """Structured output for research tasks."""
    topic: str = Field(description="The research topic")
    summary: str = Field(description="Executive summary of findings")
    key_points: list[str] = Field(description="Key findings as bullet points")
    confidence: float = Field(ge=0, le=1, description="Confidence score")
    sources: list[str] = Field(default=[], description="Sources used")


class AgentAction(BaseModel):
    """Structured agent action decision."""
    action: str = Field(description="Action to take: search, analyze, respond")
    reasoning: str = Field(description="Why this action was chosen")
    parameters: dict = Field(default={}, description="Action parameters")


# ============= Agent State =============

class AgentState(TypedDict):
    """State for the agentic workflow."""
    query: str
    messages: list
    research: Optional[dict]
    final_response: Optional[str]
    step: str


# ============= Streaming Handler =============

async def stream_to_ui(event_type: str, content: str):
    """
    Stream events to UI (implement based on your transport).
    Replace with SSE, WebSocket, or your preferred method.
    """
    event = {
        "type": event_type,
        "content": content
    }
    print(f"[{event_type.upper()}] {content[:100]}...")


# ============= Agent Nodes =============

async def research_node(state: AgentState, client) -> AgentState:
    """Research node with streaming visibility."""
    await stream_to_ui("thinking", "Starting research phase...")
    
    # Use instructor for structured output
    instructor_client = instructor.from_provider("anthropic/claude-sonnet-4-5")
    
    result = instructor_client.create(
        response_model=ResearchResult,
        messages=[{
            "role": "user",
            "content": f"Research this topic and provide structured findings: {state['query']}"
        }],
        max_retries=2
    )
    
    await stream_to_ui("research_complete", result.summary)
    
    return {
        **state,
        "research": result.model_dump(),
        "step": "analyze"
    }


async def analyze_node(state: AgentState, client) -> AgentState:
    """Analyze research results."""
    await stream_to_ui("thinking", "Analyzing research findings...")
    
    research = state.get("research", {})
    
    # Stream analysis
    async with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"""Based on this research, provide a comprehensive analysis:

Research Summary: {research.get('summary', 'N/A')}
Key Points: {research.get('key_points', [])}

Provide insights and recommendations."""
        }]
    ) as stream:
        response_text = ""
        async for text in stream.text_stream:
            response_text += text
            await stream_to_ui("response_chunk", text)
    
    return {
        **state,
        "final_response": response_text,
        "step": "complete"
    }


# ============= Guardrails =============

def check_input_safety(user_input: str) -> tuple[bool, str]:
    """Basic input guardrails."""
    # Add your safety checks here
    blocked_patterns = ["ignore previous instructions", "system prompt"]
    
    input_lower = user_input.lower()
    for pattern in blocked_patterns:
        if pattern in input_lower:
            return False, f"Blocked: potential jailbreak attempt"
    
    return True, ""


def check_output_safety(response: str) -> tuple[bool, str]:
    """Basic output guardrails."""
    # Add PII detection, toxicity checks, etc.
    return True, ""


# ============= Main Agent =============

async def run_agent(query: str):
    """
    Run the complete agentic workflow.
    
    In production, add:
    - Real LangGraph StateGraph with checkpointing
    - Proper error handling and recovery
    - Observability/logging
    """
    # Input guardrails
    safe, reason = check_input_safety(query)
    if not safe:
        await stream_to_ui("blocked", reason)
        return {"error": reason}
    
    # Initialize state
    state: AgentState = {
        "query": query,
        "messages": [],
        "research": None,
        "final_response": None,
        "step": "research"
    }
    
    # In production, use:
    # from langgraph.checkpoint.postgres import PostgresSaver
    # checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
    # app = graph.compile(checkpointer=checkpointer)
    # result = await app.ainvoke(state, {"configurable": {"thread_id": "..."}})
    
    print(f"\n{'='*50}")
    print(f"Starting agent workflow for: {query}")
    print(f"{'='*50}\n")
    
    # Simulate workflow steps
    await stream_to_ui("started", f"Processing query: {query}")
    
    # Output guardrails would check final_response
    safe, reason = check_output_safety(state.get("final_response", ""))
    if not safe:
        await stream_to_ui("filtered", reason)
    
    await stream_to_ui("complete", "Workflow finished")
    
    return state


# ============= Example Usage =============

if __name__ == "__main__":
    # Example query
    query = "What are the latest developments in AI agents?"
    
    # Run the agent
    result = asyncio.run(run_agent(query))
    
    print("\n" + "="*50)
    print("Final Result:")
    print(json.dumps(result, indent=2, default=str))

```