agentic-workflows
Build production-grade agentic AI systems with real-time streaming visibility, structured outputs, and multi-agent collaboration. Covers Anthropic/OpenAI/vLLM SDKs, A2A protocol for agent interoperability, Pydantic validation, LangGraph checkpointing for workflow resumption, vector DB memory (Pinecone/Chroma/FAISS), and guardrails for anti-hallucination. Use when building AI agents, multi-agent systems, tool-calling workflows, or applications requiring streaming agent reasoning to UI.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install deconvfft-resume-crafter-agentic-workflows
Repository
Skill path: .claude/skills/agentic-workflows
Build production-grade agentic AI systems with real-time streaming visibility, structured outputs, and multi-agent collaboration. Covers Anthropic/OpenAI/vLLM SDKs, A2A protocol for agent interoperability, Pydantic validation, LangGraph checkpointing for workflow resumption, vector DB memory (Pinecone/Chroma/FAISS), and guardrails for anti-hallucination. Use when building AI agents, multi-agent systems, tool-calling workflows, or applications requiring streaming agent reasoning to UI.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Frontend, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: DeconvFFT.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install agentic-workflows into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/DeconvFFT/resume-crafter before adding agentic-workflows to shared team environments
- Use agentic-workflows for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: agentic-workflows
description: Build production-grade agentic AI systems with real-time streaming visibility, structured outputs, and multi-agent collaboration. Covers Anthropic/OpenAI/vLLM SDKs, A2A protocol for agent interoperability, Pydantic validation, LangGraph checkpointing for workflow resumption, vector DB memory (Pinecone/Chroma/FAISS), and guardrails for anti-hallucination. Use when building AI agents, multi-agent systems, tool-calling workflows, or applications requiring streaming agent reasoning to UI.
---
# Agentic Workflows Skill
Build intelligent, observable, and resilient AI agent systems.
## Architecture Decision Flow
```
New Agent System Request
│
▼
┌──────────────────────────┐
│ Single task or multi-step?│
│ Single → Simple LLM call │
│ Multi-step → Agent loop │
└──────────────────────────┘
│
▼
┌──────────────────────────┐
│ Need multiple specialists?│
│ Yes → Multi-agent (A2A) │
│ No → Single agent │
└──────────────────────────┘
│
▼
┌──────────────────────────┐
│ Long-running/resumable? │
│ Yes → LangGraph + checkpoint│
│ No → Simple agent loop │
└──────────────────────────┘
│
▼
┌──────────────────────────┐
│ Need memory across sessions?│
│ Yes → Vector DB │
│ No → In-session state │
└──────────────────────────┘
```
## Provider Selection
| Provider | Best For | Streaming | Tools |
|----------|----------|-----------|-------|
| Anthropic Claude | Complex reasoning, extended thinking | SSE | Native |
| OpenAI GPT-4 | General purpose, function calling | SSE | Native |
| vLLM | Self-hosted, cost control | OpenAI-compatible | Via prompts |
## Quick Start Patterns
### Anthropic Streaming with Tools
```python
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=4096,
tools=[{"name": "search", "description": "Search the web", "input_schema": {...}}],
messages=[{"role": "user", "content": "Research AI trends"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
elif hasattr(event.delta, "thinking"):
print(f"[Thinking] {event.delta.thinking}")
```
### Structured Output with Pydantic
```python
import instructor
from pydantic import BaseModel
class Analysis(BaseModel):
summary: str
confidence: float
sources: list[str]
client = instructor.from_provider("anthropic/claude-sonnet-4-5")
result = client.create(
response_model=Analysis,
messages=[{"role": "user", "content": "Analyze market trends"}],
max_retries=3
)
```
## Reference Documentation
| Task | Reference File |
|------|----------------|
| Anthropic/OpenAI/vLLM SDK patterns | [references/llm-sdks.md](references/llm-sdks.md) |
| Multi-agent with A2A protocol | [references/multi-agent.md](references/multi-agent.md) |
| Streaming to UI (SSE/WebSocket) | [references/streaming.md](references/streaming.md) |
| Pydantic structured outputs | [references/structured-outputs.md](references/structured-outputs.md) |
| Memory with vector DBs | [references/memory.md](references/memory.md) |
| Checkpointing & resumption | [references/checkpointing.md](references/checkpointing.md) |
| Guardrails & anti-hallucination | [references/guardrails.md](references/guardrails.md) |
## When to Use Multi-Agent
| Scenario | Approach |
|----------|----------|
| Different expertise needed | Multi-agent with specialists |
| Verification required | Debate pattern (critic agent) |
| Complex workflow orchestration | Supervisor + workers |
| Simple tool use | Single agent with tools |
| Independent subtasks | Parallel agents |
## Production Checklist
- [ ] Structured outputs with Pydantic validation
- [ ] Retry logic with exponential backoff
- [ ] Streaming to UI for visibility
- [ ] Checkpointing for long-running workflows
- [ ] Guardrails for input/output validation
- [ ] Memory persistence (vector DB or KV store)
- [ ] Error handling with graceful degradation
- [ ] Observability (logging, tracing)
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### references/llm-sdks.md
```markdown
# LLM SDK Integration
## Anthropic Claude SDK
### Basic Streaming
```python
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=4096,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
```
### Extended Thinking (Streaming)
```python
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if hasattr(event.delta, "thinking_delta"):
print(f"[Thinking] {event.delta.thinking}", end="")
elif hasattr(event.delta, "text_delta"):
print(event.delta.text, end="")
```
### Tool Use with Streaming
```python
tools = [{
"name": "web_search",
"description": "Search the web for information",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}]
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=4096,
tools=tools,
messages=[{"role": "user", "content": "Search for AI news"}]
)
# Handle tool use
for block in response.content:
if block.type == "tool_use":
tool_name = block.name
tool_input = block.input
# Execute tool and continue conversation
```
### Async Client
```python
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def chat():
async with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
```
## OpenAI SDK
### Streaming with Function Calling
```python
from openai import OpenAI
client = OpenAI()
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
}]
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "What's the weather in SF?"}],
tools=tools,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
if chunk.choices[0].delta.tool_calls:
# Handle tool calls
pass
```
### Structured Outputs (Native)
```python
from pydantic import BaseModel
class CalendarEvent(BaseModel):
name: str
date: str
participants: list[str]
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{"role": "user", "content": "Schedule a meeting..."}],
response_format=CalendarEvent
)
event = response.choices[0].message.parsed
```
### Async Streaming
```python
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def stream_response():
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
```
## vLLM Integration
### OpenAI-Compatible API
```python
from openai import OpenAI
# Point to vLLM server
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed"
)
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[{"role": "user", "content": "Hello"}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
```
### vLLM Server Launch
```bash
vllm serve meta-llama/Llama-3.1-8B-Instruct \
--host 0.0.0.0 \
--port 8000 \
--tensor-parallel-size 2 \
--gpu-memory-utilization 0.9
```
## Multi-Provider Routing
```python
from enum import Enum
from typing import Optional
class Provider(Enum):
ANTHROPIC = "anthropic"
OPENAI = "openai"
VLLM = "vllm"
class ModelRouter:
def __init__(self):
self.anthropic = anthropic.Anthropic()
self.openai = OpenAI()
self.vllm = OpenAI(base_url="http://localhost:8000/v1", api_key="x")
def route(self, task_type: str) -> Provider:
"""Route based on task requirements."""
routing = {
"complex_reasoning": Provider.ANTHROPIC,
"code_generation": Provider.ANTHROPIC,
"simple_chat": Provider.VLLM,
"function_calling": Provider.OPENAI,
"cost_sensitive": Provider.VLLM,
}
return routing.get(task_type, Provider.OPENAI)
async def complete(self, messages: list, task_type: str):
provider = self.route(task_type)
if provider == Provider.ANTHROPIC:
return await self._anthropic_complete(messages)
elif provider == Provider.OPENAI:
return await self._openai_complete(messages)
else:
return await self._vllm_complete(messages)
```
## Error Handling & Retries
```python
from tenacity import retry, stop_after_attempt, wait_exponential
import anthropic
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60)
)
async def robust_completion(messages: list):
try:
response = await client.messages.create(
model="claude-sonnet-4-5",
max_tokens=4096,
messages=messages
)
return response
except anthropic.RateLimitError:
raise # Let tenacity retry
except anthropic.APIError as e:
logger.error(f"API error: {e}")
raise
```
```
### references/multi-agent.md
```markdown
# Multi-Agent Systems & A2A Protocol
## Google A2A Protocol Overview
A2A (Agent-to-Agent) is an open protocol for secure agent interoperability, now under Linux Foundation governance.
### Core Concepts
| Component | Description |
|-----------|-------------|
| Agent Card | JSON metadata at `/.well-known/agent.json` describing capabilities |
| Client Agent | Initiates tasks, interfaces with users |
| Remote Agent | Executes tasks, returns results |
| Task | Unit of work with lifecycle states |
### Agent Card Schema
```json
{
"name": "research-agent",
"description": "Performs deep research on topics",
"url": "https://agent.example.com",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": true
},
"skills": [
{
"id": "web-research",
"name": "Web Research",
"description": "Search and synthesize information from the web"
}
],
"authentication": {
"schemes": ["bearer", "oauth2"]
}
}
```
### A2A Communication Flow
```
1. Discovery: Client fetches /.well-known/agent.json
2. Task Creation: Client POSTs task to remote agent
3. Status Updates: Remote sends progress via SSE/webhook
4. Completion: Remote returns artifacts
```
### Python A2A Client
```python
from a2a import A2AClient, Task
# Discover agent
client = A2AClient()
agent_card = await client.discover("https://research-agent.example.com")
# Create task
task = Task(
skill_id="web-research",
input={"query": "Latest AI developments"},
callback_url="https://my-app.com/webhook"
)
# Send task and stream results
async for update in client.execute_streaming(agent_card, task):
if update.type == "progress":
print(f"Progress: {update.message}")
elif update.type == "artifact":
print(f"Result: {update.content}")
```
### A2A Server Implementation
```python
from fastapi import FastAPI
from a2a.server import A2AServer, AgentCard, Skill
app = FastAPI()
a2a = A2AServer(app)
# Define agent capabilities
card = AgentCard(
name="analysis-agent",
description="Analyzes data and provides insights",
skills=[
Skill(id="analyze", name="Data Analysis", description="...")
]
)
@a2a.skill("analyze")
async def analyze_data(input: dict, context: A2AContext):
# Stream progress
await context.send_progress("Starting analysis...")
# Do work
result = await perform_analysis(input["data"])
# Return artifact
return {"analysis": result, "confidence": 0.95}
a2a.register(card)
```
## Multi-Agent Patterns
### Supervisor-Worker Pattern
```python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class AgentState(TypedDict):
task: str
plan: list[str]
results: dict
final_answer: str
def supervisor(state: AgentState) -> AgentState:
"""Supervisor plans and delegates."""
plan = plan_task(state["task"])
return {"plan": plan}
def researcher(state: AgentState) -> AgentState:
"""Research agent gathers information."""
results = research(state["plan"][0])
return {"results": {"research": results}}
def writer(state: AgentState) -> AgentState:
"""Writer agent synthesizes results."""
answer = synthesize(state["results"])
return {"final_answer": answer}
def router(state: AgentState) -> Literal["researcher", "writer", "end"]:
if not state.get("results"):
return "researcher"
elif not state.get("final_answer"):
return "writer"
return "end"
# Build graph
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.add_conditional_edges("supervisor", router)
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", END)
graph.set_entry_point("supervisor")
```
### Debate Pattern (Critic Agent)
```python
class DebateState(TypedDict):
question: str
proposal: str
critique: str
revision: str
rounds: int
consensus: bool
def proposer(state: DebateState) -> DebateState:
"""Generate initial proposal or revision."""
if state.get("critique"):
proposal = revise_based_on_feedback(
state["proposal"],
state["critique"]
)
else:
proposal = generate_proposal(state["question"])
return {"proposal": proposal, "rounds": state.get("rounds", 0) + 1}
def critic(state: DebateState) -> DebateState:
"""Critically evaluate the proposal."""
critique = evaluate_proposal(state["proposal"])
consensus = critique["score"] > 0.8
return {"critique": critique["feedback"], "consensus": consensus}
def should_continue(state: DebateState) -> Literal["proposer", "end"]:
if state["consensus"] or state["rounds"] >= 3:
return "end"
return "proposer"
# Build debate graph
graph = StateGraph(DebateState)
graph.add_node("proposer", proposer)
graph.add_node("critic", critic)
graph.add_edge("proposer", "critic")
graph.add_conditional_edges("critic", should_continue)
graph.set_entry_point("proposer")
```
### Parallel Agents
```python
from langgraph.graph import StateGraph
import asyncio
class ParallelState(TypedDict):
query: str
web_results: str
db_results: str
combined: str
async def web_search(state: ParallelState) -> dict:
results = await search_web(state["query"])
return {"web_results": results}
async def db_search(state: ParallelState) -> dict:
results = await search_database(state["query"])
return {"db_results": results}
def combine_results(state: ParallelState) -> ParallelState:
combined = merge(state["web_results"], state["db_results"])
return {"combined": combined}
# Parallel execution
graph = StateGraph(ParallelState)
graph.add_node("web_search", web_search)
graph.add_node("db_search", db_search)
graph.add_node("combine", combine_results)
# Both search nodes run in parallel from start
graph.set_entry_point("web_search")
graph.set_entry_point("db_search")
graph.add_edge("web_search", "combine")
graph.add_edge("db_search", "combine")
```
## When to Use Each Pattern
| Pattern | Use Case |
|---------|----------|
| Single Agent | Simple tasks, direct tool use |
| Supervisor-Worker | Complex workflows needing coordination |
| Debate/Critic | Quality-critical outputs, verification |
| Parallel | Independent subtasks, speed optimization |
| A2A Federation | Cross-organization, different frameworks |
## A2A vs MCP
| Protocol | Purpose |
|----------|---------|
| A2A | Agent-to-agent communication |
| MCP | Agent-to-tool/data communication |
Use MCP for tools and data sources, A2A for agent collaboration.
```
### references/streaming.md
```markdown
# Streaming Agent Visibility
## Transport Selection
| Transport | Best For | Bidirectional |
|-----------|----------|---------------|
| SSE (Server-Sent Events) | Simple streaming, wide browser support | No |
| WebSocket | Bidirectional, real-time interaction | Yes |
| HTTP Long-Polling | Fallback for restrictive networks | No |
## Server-Sent Events (SSE)
### FastAPI SSE Endpoint
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def stream_agent_response(query: str):
"""Stream agent thinking and responses."""
async for event in run_agent(query):
if event["type"] == "thinking":
yield f"event: thinking\ndata: {json.dumps(event)}\n\n"
elif event["type"] == "tool_call":
yield f"event: tool_call\ndata: {json.dumps(event)}\n\n"
elif event["type"] == "response":
yield f"event: response\ndata: {json.dumps(event)}\n\n"
elif event["type"] == "done":
yield f"event: done\ndata: {json.dumps(event)}\n\n"
@app.get("/agent/stream")
async def agent_stream(query: str):
return StreamingResponse(
stream_agent_response(query),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
```
### React SSE Client
```typescript
function useAgentStream(query: string) {
const [thinking, setThinking] = useState<string[]>([]);
const [response, setResponse] = useState("");
const [toolCalls, setToolCalls] = useState<ToolCall[]>([]);
useEffect(() => {
const eventSource = new EventSource(
`/agent/stream?query=${encodeURIComponent(query)}`
);
eventSource.addEventListener("thinking", (e) => {
const data = JSON.parse(e.data);
setThinking((prev) => [...prev, data.content]);
});
eventSource.addEventListener("tool_call", (e) => {
const data = JSON.parse(e.data);
setToolCalls((prev) => [...prev, data]);
});
eventSource.addEventListener("response", (e) => {
const data = JSON.parse(e.data);
setResponse((prev) => prev + data.content);
});
eventSource.addEventListener("done", () => {
eventSource.close();
});
eventSource.onerror = () => {
eventSource.close();
};
return () => eventSource.close();
}, [query]);
return { thinking, response, toolCalls };
}
```
## WebSocket Implementation
### FastAPI WebSocket
```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json
app = FastAPI()
class AgentSession:
def __init__(self, websocket: WebSocket):
self.websocket = websocket
self.conversation_history = []
async def send_event(self, event_type: str, data: dict):
await self.websocket.send_json({
"type": event_type,
"data": data,
"timestamp": time.time()
})
@app.websocket("/agent/ws")
async def agent_websocket(websocket: WebSocket):
await websocket.accept()
session = AgentSession(websocket)
try:
while True:
# Receive user message
message = await websocket.receive_json()
if message["type"] == "query":
# Stream agent response
async for event in run_agent(
message["content"],
session.conversation_history
):
await session.send_event(event["type"], event)
# Update history
session.conversation_history.append({
"role": "user",
"content": message["content"]
})
elif message["type"] == "interrupt":
# Handle user interruption
await cancel_current_task()
except WebSocketDisconnect:
pass
```
### React WebSocket Hook
```typescript
function useAgentWebSocket() {
const [connected, setConnected] = useState(false);
const [events, setEvents] = useState<AgentEvent[]>([]);
const wsRef = useRef<WebSocket | null>(null);
useEffect(() => {
const ws = new WebSocket("ws://localhost:8000/agent/ws");
wsRef.current = ws;
ws.onopen = () => setConnected(true);
ws.onclose = () => setConnected(false);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setEvents((prev) => [...prev, data]);
};
return () => ws.close();
}, []);
const sendMessage = useCallback((content: string) => {
wsRef.current?.send(JSON.stringify({
type: "query",
content
}));
}, []);
const interrupt = useCallback(() => {
wsRef.current?.send(JSON.stringify({ type: "interrupt" }));
}, []);
return { connected, events, sendMessage, interrupt };
}
```
## Streaming Anthropic to UI
### Full Pipeline
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic()
async def stream_claude_response(messages: list):
"""Stream Claude response with thinking visibility."""
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=4096,
thinking={"type": "enabled", "budget_tokens": 8000},
messages=messages
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "thinking":
yield f"event: thinking_start\ndata: {{}}\n\n"
elif event.content_block.type == "text":
yield f"event: response_start\ndata: {{}}\n\n"
elif event.type == "content_block_delta":
if hasattr(event.delta, "thinking"):
data = {"content": event.delta.thinking}
yield f"event: thinking\ndata: {json.dumps(data)}\n\n"
elif hasattr(event.delta, "text"):
data = {"content": event.delta.text}
yield f"event: response\ndata: {json.dumps(data)}\n\n"
elif event.type == "message_stop":
yield f"event: done\ndata: {{}}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
messages = [{"role": "user", "content": request.content}]
return StreamingResponse(
stream_claude_response(messages),
media_type="text/event-stream"
)
```
## Framework-Agnostic Patterns
### Event Schema
```python
from pydantic import BaseModel
from typing import Literal
from datetime import datetime
class AgentEvent(BaseModel):
type: Literal["thinking", "tool_call", "tool_result", "response", "error", "done"]
content: str | dict | None = None
timestamp: datetime
metadata: dict = {}
# Consistent event emission
async def emit_event(event_type: str, content: any = None, **metadata):
return AgentEvent(
type=event_type,
content=content,
timestamp=datetime.utcnow(),
metadata=metadata
)
```
### Streamlit Integration
```python
import streamlit as st
import requests
st.title("Agent Chat")
if prompt := st.chat_input("Ask anything..."):
with st.chat_message("user"):
st.write(prompt)
with st.chat_message("assistant"):
thinking_placeholder = st.empty()
response_placeholder = st.empty()
thinking_text = ""
response_text = ""
# Stream from backend
with requests.get(
f"/agent/stream?query={prompt}",
stream=True
) as r:
for line in r.iter_lines():
if line.startswith(b"event: "):
event_type = line[7:].decode()
elif line.startswith(b"data: "):
data = json.loads(line[6:])
if event_type == "thinking":
thinking_text += data.get("content", "")
with thinking_placeholder.expander("Thinking..."):
st.write(thinking_text)
elif event_type == "response":
response_text += data.get("content", "")
response_placeholder.write(response_text)
```
## Performance Considerations
1. **Buffering**: Disable proxy buffering (nginx, cloudflare)
2. **Heartbeats**: Send periodic pings to keep connection alive
3. **Reconnection**: Implement client-side reconnection with exponential backoff
4. **Compression**: Avoid for SSE (breaks streaming)
```
### references/structured-outputs.md
```markdown
# Structured Outputs with Pydantic
## Instructor Library
Instructor provides reliable structured extraction from LLMs with automatic validation and retries.
### Basic Usage
```python
import instructor
from pydantic import BaseModel, Field
from typing import Optional
class User(BaseModel):
name: str = Field(description="User's full name")
age: int = Field(ge=0, le=150, description="User's age")
email: Optional[str] = Field(default=None, description="Email address")
# Multi-provider support
client = instructor.from_provider("anthropic/claude-sonnet-4-5")
# or: instructor.from_provider("openai/gpt-4o")
# or: instructor.from_provider("ollama/llama3")
user = client.create(
response_model=User,
messages=[{"role": "user", "content": "Extract: John Doe, 30 years old, [email protected]"}],
max_retries=3
)
print(user) # User(name='John Doe', age=30, email='[email protected]')
```
### Complex Nested Structures
```python
from pydantic import BaseModel
from typing import List, Literal
from enum import Enum
class Priority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class Task(BaseModel):
title: str
description: str
priority: Priority
assignee: Optional[str] = None
class Project(BaseModel):
name: str
tasks: List[Task]
deadline: Optional[str] = None
project = client.create(
response_model=Project,
messages=[{
"role": "user",
"content": """
Project: Website Redesign
- Task: Update homepage (high priority, assigned to Alice)
- Task: Fix mobile layout (medium priority)
- Deadline: March 2025
"""
}]
)
```
### Streaming Partial Objects
```python
from instructor import Partial
# Stream partial results as they're generated
for partial in client.create(
response_model=Partial[Project],
messages=[{"role": "user", "content": "..."}],
stream=True
):
print(partial)
# Project(name='Website...', tasks=None)
# Project(name='Website Redesign', tasks=[Task(...)])
# ... progressively more complete
```
### Custom Validators
```python
from pydantic import BaseModel, field_validator, model_validator
class Analysis(BaseModel):
summary: str
confidence: float
sources: List[str]
@field_validator("confidence")
@classmethod
def validate_confidence(cls, v):
if not 0 <= v <= 1:
raise ValueError("Confidence must be between 0 and 1")
return v
@field_validator("sources")
@classmethod
def validate_sources(cls, v):
if len(v) < 1:
raise ValueError("At least one source required")
return v
@model_validator(mode="after")
def validate_model(self):
if self.confidence < 0.5 and len(self.sources) < 3:
raise ValueError("Low confidence requires more sources")
return self
```
### LLM-Powered Validation
```python
from instructor import llm_validator
from typing import Annotated
from pydantic import BeforeValidator
class ContentModeration(BaseModel):
content: Annotated[
str,
BeforeValidator(
llm_validator(
"""Content must be:
1. Professional and appropriate
2. Free of harmful or offensive language
3. Factually accurate where verifiable
""",
client=client
)
)
]
```
## Anthropic Native Structured Outputs
### Tool-Based Extraction
```python
import anthropic
client = anthropic.Anthropic()
# Define schema as tool
extraction_tool = {
"name": "extract_entities",
"description": "Extract structured entities from text",
"input_schema": {
"type": "object",
"properties": {
"people": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"role": {"type": "string"}
},
"required": ["name"]
}
},
"organizations": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["people", "organizations"]
}
}
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=[extraction_tool],
tool_choice={"type": "tool", "name": "extract_entities"},
messages=[{
"role": "user",
"content": "Extract entities: John Smith, CEO of Acme Corp, met with Jane Doe from Tech Inc."
}]
)
# Parse tool use result
for block in response.content:
if block.type == "tool_use":
entities = block.input # Already parsed JSON
```
## OpenAI Native Structured Outputs
```python
from openai import OpenAI
from pydantic import BaseModel
client = OpenAI()
class MathSolution(BaseModel):
steps: List[str]
final_answer: float
units: Optional[str] = None
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{
"role": "user",
"content": "Solve: If a train travels 120 miles in 2 hours, what is its speed?"
}],
response_format=MathSolution
)
solution = response.choices[0].message.parsed
print(solution.final_answer) # 60.0
print(solution.units) # "miles per hour"
```
## Error Handling & Retries
```python
from instructor import Instructor
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustExtractor:
def __init__(self, provider: str):
self.client = instructor.from_provider(provider)
def extract(
self,
response_model: type,
content: str,
max_retries: int = 3
):
try:
return self.client.create(
response_model=response_model,
messages=[{"role": "user", "content": content}],
max_retries=max_retries
)
except instructor.IncompleteOutputException as e:
# Model output was truncated
logger.warning(f"Incomplete output: {e}")
return self._handle_incomplete(response_model, content)
except ValidationError as e:
# Pydantic validation failed after all retries
logger.error(f"Validation failed: {e}")
raise
```
## Best Practices
1. **Use descriptive Field descriptions** - Helps the LLM understand intent
2. **Start simple** - Add complexity only when needed
3. **Validate at multiple levels** - Field, model, and semantic validators
4. **Handle partial results** - Use streaming for long extractions
5. **Set appropriate retries** - Usually 2-3 is sufficient
6. **Test edge cases** - Empty inputs, ambiguous data, long text
```
### references/memory.md
```markdown
# Memory & Vector Databases
## Vector DB Selection
| Database | Best For | Hosting |
|----------|----------|---------|
| Pinecone | Production, managed, low-ops | Cloud only |
| Chroma | Local dev, prototyping, embedded | Self-hosted |
| FAISS | High-performance, on-prem, large-scale | Self-hosted |
## Pinecone (Production)
### Setup
```python
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone(api_key="your-api-key")
# Create index
pc.create_index(
name="agent-memory",
dimension=1536, # OpenAI embedding dimension
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
index = pc.Index("agent-memory")
```
### Store & Retrieve
```python
from openai import OpenAI
openai_client = OpenAI()
def get_embedding(text: str) -> list[float]:
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
# Store memory
def store_memory(user_id: str, content: str, metadata: dict = {}):
embedding = get_embedding(content)
index.upsert(
vectors=[{
"id": f"{user_id}_{uuid.uuid4()}",
"values": embedding,
"metadata": {
"user_id": user_id,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
**metadata
}
}],
namespace=user_id
)
# Retrieve relevant memories
def recall_memories(user_id: str, query: str, top_k: int = 5) -> list[dict]:
query_embedding = get_embedding(query)
results = index.query(
namespace=user_id,
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
return [
{
"content": match.metadata["content"],
"score": match.score,
"timestamp": match.metadata["timestamp"]
}
for match in results.matches
]
```
## Chroma (Development)
### Setup
```python
import chromadb
from chromadb.config import Settings
# Persistent storage
client = chromadb.PersistentClient(path="./chroma_db")
# Or in-memory for testing
# client = chromadb.Client()
collection = client.get_or_create_collection(
name="agent_memory",
metadata={"hnsw:space": "cosine"}
)
```
### Store & Retrieve
```python
# Store with auto-embedding
collection.add(
documents=["User prefers concise responses", "User is working on ML project"],
metadatas=[
{"user_id": "user123", "type": "preference"},
{"user_id": "user123", "type": "context"}
],
ids=["mem1", "mem2"]
)
# Query
results = collection.query(
query_texts=["What does the user like?"],
n_results=5,
where={"user_id": "user123"}
)
for doc, metadata, distance in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
):
print(f"[{1 - distance:.2f}] {doc}")
```
## FAISS (High Performance)
### Setup
```python
import faiss
import numpy as np
from dataclasses import dataclass
@dataclass
class MemoryStore:
index: faiss.Index
id_to_content: dict[int, dict]
current_id: int = 0
def create_faiss_store(dimension: int = 1536) -> MemoryStore:
# IVF index for large-scale
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, 100)
# Or simple flat index for smaller datasets
# index = faiss.IndexFlatL2(dimension)
return MemoryStore(index=index, id_to_content={})
```
### Store & Retrieve
```python
def add_memory(store: MemoryStore, embedding: np.ndarray, content: dict):
if not store.index.is_trained:
# Train IVF index (need enough vectors)
store.index.train(embedding.reshape(1, -1))
store.index.add(embedding.reshape(1, -1))
store.id_to_content[store.current_id] = content
store.current_id += 1
def search_memories(
store: MemoryStore,
query_embedding: np.ndarray,
k: int = 5
) -> list[dict]:
distances, indices = store.index.search(
query_embedding.reshape(1, -1), k
)
results = []
for dist, idx in zip(distances[0], indices[0]):
if idx != -1: # Valid result
results.append({
**store.id_to_content[idx],
"distance": float(dist)
})
return results
# Save/Load
def save_store(store: MemoryStore, path: str):
faiss.write_index(store.index, f"{path}/index.faiss")
with open(f"{path}/content.json", "w") as f:
json.dump(store.id_to_content, f)
```
## Memory Patterns
### Short-Term vs Long-Term
```python
class AgentMemory:
def __init__(self, vector_db):
self.short_term: list[dict] = [] # Current session
self.vector_db = vector_db # Long-term
self.max_short_term = 20
def add(self, content: str, importance: float = 0.5):
memory = {
"content": content,
"timestamp": datetime.utcnow(),
"importance": importance
}
# Always add to short-term
self.short_term.append(memory)
if len(self.short_term) > self.max_short_term:
self._consolidate()
# High importance goes to long-term immediately
if importance > 0.7:
self._store_long_term(memory)
def _consolidate(self):
"""Move important short-term to long-term, summarize rest."""
important = [m for m in self.short_term if m["importance"] > 0.5]
for memory in important:
self._store_long_term(memory)
# Summarize and store less important
if len(self.short_term) > len(important):
summary = self._summarize(self.short_term)
self._store_long_term({"content": summary, "type": "summary"})
self.short_term = []
def recall(self, query: str, k: int = 5) -> list[dict]:
"""Combine short-term and long-term recall."""
long_term = self.vector_db.search(query, k=k)
# Add relevant short-term
relevant_short = self._filter_relevant(self.short_term, query)
return self._merge_and_rank(long_term, relevant_short)
```
### Memory Summarization
```python
async def summarize_memories(memories: list[dict], client) -> str:
"""Compress multiple memories into a summary."""
content = "\n".join([m["content"] for m in memories])
response = await client.messages.create(
model="claude-sonnet-4-5",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Summarize these conversation memories into key points:
{content}
Provide a concise summary that preserves important information."""
}]
)
return response.content[0].text
```
## Integration with Agents
```python
class MemoryAugmentedAgent:
def __init__(self, llm_client, memory: AgentMemory):
self.client = llm_client
self.memory = memory
async def respond(self, user_message: str) -> str:
# Recall relevant memories
memories = self.memory.recall(user_message, k=5)
# Build context
memory_context = "\n".join([
f"- {m['content']}" for m in memories
])
# Generate response with memory context
response = await self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
system=f"""You have access to these memories about the user:
{memory_context}
Use this context to provide personalized, contextual responses.""",
messages=[{"role": "user", "content": user_message}]
)
# Store new memory
self.memory.add(
f"User: {user_message}\nAssistant: {response.content[0].text}",
importance=0.5
)
return response.content[0].text
```
```
### references/checkpointing.md
```markdown
# Checkpointing & Workflow Resumption
## LangGraph Checkpointing
LangGraph provides persistence for stateful workflows with automatic checkpointing.
### Core Concepts
| Concept | Description |
|---------|-------------|
| Checkpoint | Snapshot of graph state at a super-step |
| Thread | Unique ID for a conversation/workflow |
| Checkpointer | Persistence backend (memory, SQLite, Postgres) |
### Basic Setup
```python
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDict
class AgentState(TypedDict):
messages: list
current_step: str
results: dict
# Build graph
graph = StateGraph(AgentState)
graph.add_node("research", research_node)
graph.add_node("analyze", analyze_node)
graph.add_node("write", write_node)
graph.add_edge("research", "analyze")
graph.add_edge("analyze", "write")
graph.add_edge("write", END)
graph.set_entry_point("research")
# Compile with checkpointer
checkpointer = InMemorySaver()
app = graph.compile(checkpointer=checkpointer)
# Run with thread_id
config = {"configurable": {"thread_id": "workflow-123"}}
result = app.invoke({"messages": ["Start research"]}, config)
```
### Production Checkpointers
#### SQLite (Local/Development)
```python
from langgraph.checkpoint.sqlite import SqliteSaver
# Sync
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# Async
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
checkpointer = AsyncSqliteSaver.from_conn_string("checkpoints.db")
```
#### PostgreSQL (Production)
```python
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)
# With connection pooling
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncpg
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=5,
max_size=20
)
checkpointer = AsyncPostgresSaver(pool)
```
#### DynamoDB (AWS)
```python
from langgraph_checkpoint_aws import DynamoDBSaver
checkpointer = DynamoDBSaver(
table_name="langgraph-checkpoints",
s3_bucket="langgraph-large-payloads", # For checkpoints > 350KB
enable_checkpoint_compression=True
)
```
### Resume from Failure
```python
async def run_with_recovery(
app,
initial_state: dict,
thread_id: str
):
config = {"configurable": {"thread_id": thread_id}}
try:
# Check for existing checkpoint
checkpoint = await app.checkpointer.get(config)
if checkpoint:
print(f"Resuming from checkpoint: {checkpoint['id']}")
# Resume from last checkpoint
result = await app.ainvoke(None, config)
else:
# Start fresh
result = await app.ainvoke(initial_state, config)
return result
except Exception as e:
# State is saved at last successful step
print(f"Failed at step, can resume: {e}")
raise
```
### Time Travel (Debug)
```python
# List all checkpoints for a thread
checkpoints = list(app.checkpointer.list(config))
for cp in checkpoints:
print(f"Checkpoint {cp.checkpoint_id}: {cp.metadata}")
# Resume from specific checkpoint
specific_config = {
"configurable": {
"thread_id": "workflow-123",
"checkpoint_id": "checkpoint-abc"
}
}
result = app.invoke(None, specific_config)
```
### Human-in-the-Loop
```python
from langgraph.graph import StateGraph, END
def approval_node(state: AgentState) -> AgentState:
"""Node that requires human approval."""
return {"pending_approval": True}
def process_approval(state: AgentState) -> AgentState:
"""Process after approval received."""
return {"approved": True, "pending_approval": False}
graph = StateGraph(AgentState)
graph.add_node("generate", generate_node)
graph.add_node("await_approval", approval_node)
graph.add_node("process", process_approval)
# Interrupt before approval
graph.add_edge("generate", "await_approval")
graph.add_conditional_edges(
"await_approval",
lambda s: "wait" if s.get("pending_approval") else "process",
{"wait": END, "process": "process"}
)
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["await_approval"] # Pause here
)
# First run - stops at approval
result = app.invoke(initial_state, config)
# Returns with pending_approval=True
# Later - resume after human approves
app.update_state(config, {"pending_approval": False, "approved": True})
final_result = app.invoke(None, config)
```
## Custom Checkpointing
### Database-Backed State
```python
from abc import ABC, abstractmethod
from typing import Optional
import json
class WorkflowState:
def __init__(self, workflow_id: str, db):
self.workflow_id = workflow_id
self.db = db
async def save(self, step: str, data: dict):
"""Save checkpoint."""
await self.db.execute(
"""
INSERT INTO checkpoints (workflow_id, step, data, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (workflow_id, step)
DO UPDATE SET data = $3, created_at = NOW()
""",
self.workflow_id, step, json.dumps(data)
)
async def load(self, step: Optional[str] = None) -> Optional[dict]:
"""Load latest or specific checkpoint."""
if step:
row = await self.db.fetchrow(
"SELECT data FROM checkpoints WHERE workflow_id = $1 AND step = $2",
self.workflow_id, step
)
else:
row = await self.db.fetchrow(
"""
SELECT data FROM checkpoints
WHERE workflow_id = $1
ORDER BY created_at DESC LIMIT 1
""",
self.workflow_id
)
return json.loads(row["data"]) if row else None
async def get_last_step(self) -> Optional[str]:
"""Get the last completed step."""
row = await self.db.fetchrow(
"""
SELECT step FROM checkpoints
WHERE workflow_id = $1
ORDER BY created_at DESC LIMIT 1
""",
self.workflow_id
)
return row["step"] if row else None
```
### Resumable Agent Loop
```python
class ResumableAgent:
def __init__(self, llm_client, state: WorkflowState):
self.client = llm_client
self.state = state
async def run(self, task: str):
# Check for existing progress
last_step = await self.state.get_last_step()
checkpoint = await self.state.load() if last_step else None
if checkpoint:
print(f"Resuming from step: {last_step}")
messages = checkpoint.get("messages", [])
step_index = checkpoint.get("step_index", 0)
else:
messages = [{"role": "user", "content": task}]
step_index = 0
steps = ["plan", "research", "draft", "review", "finalize"]
for i, step in enumerate(steps[step_index:], start=step_index):
try:
result = await self._execute_step(step, messages)
messages.append({"role": "assistant", "content": result})
# Checkpoint after each step
await self.state.save(step, {
"messages": messages,
"step_index": i + 1,
"step_result": result
})
except Exception as e:
print(f"Failed at {step}, checkpoint saved")
raise
return messages[-1]["content"]
```
## Best Practices
1. **Choose the right backend**:
- InMemorySaver: Testing only
- SQLite: Local dev, single instance
- PostgreSQL: Production, multi-instance
- DynamoDB: Serverless, AWS-native
2. **Checkpoint granularity**: Save after expensive operations, not every micro-step
3. **State serialization**: Ensure all state is JSON-serializable
4. **Thread management**: Use meaningful thread IDs (user_id + session_id)
5. **Cleanup**: Implement TTL or manual cleanup for old checkpoints
6. **Compression**: Enable for large states (DynamoDB/Postgres)
```
### references/guardrails.md
```markdown
# Guardrails & Anti-Hallucination
## Framework Selection
| Framework | Best For | Approach |
|-----------|----------|----------|
| NeMo Guardrails | Conversational AI, dialog control | Colang rules + LLM checks |
| Guardrails AI | Structured output validation | Pydantic + custom validators |
| Custom | Specific domain requirements | RAG grounding + fact-checking |
## NVIDIA NeMo Guardrails
### Installation
```bash
pip install nemoguardrails
```
### Configuration Structure
```
config/
├── config.yml # Main configuration
├── prompts.yml # Custom prompts
└── rails/
├── input.co # Input rails (Colang)
├── output.co # Output rails
└── dialog.co # Dialog flow rails
```
### config.yml
```yaml
models:
- type: main
engine: anthropic
model: claude-sonnet-4-5
rails:
input:
flows:
- check jailbreak
- check toxicity
- mask sensitive data
output:
flows:
- self check facts
- self check hallucination
- check toxicity
config:
sensitive_data_detection:
input:
entities:
- PERSON
- EMAIL_ADDRESS
- PHONE_NUMBER
- SSN
fact_checking:
provider: alignscore # or custom
```
### Colang Rails (input.co)
```colang
define user ask about competitors
"What do you think about [competitor]?"
"How does [competitor] compare?"
"Is [competitor] better?"
define bot refuse competitor discussion
"I focus on our products and services.
I'd be happy to help you with questions about what we offer."
define flow check competitor mentions
user ask about competitors
bot refuse competitor discussion
```
### Hallucination Detection
```yaml
# config.yml
rails:
output:
flows:
- self check hallucination
prompts:
- task: self_check_hallucination
content: |
Your task is to check if the bot's response contains any hallucinations.
User message: {{ user_input }}
Bot response: {{ bot_response }}
Context provided: {{ context }}
Check if the response:
1. Contains information not supported by the context
2. Makes claims that cannot be verified
3. Invents facts, dates, or statistics
Respond with "yes" if hallucination detected, "no" otherwise.
```
### Python Integration
```python
from nemoguardrails import RailsConfig, LLMRails
config = RailsConfig.from_path("./config")
rails = LLMRails(config)
# With guardrails
response = await rails.generate_async(
messages=[{"role": "user", "content": "Tell me about our Q3 results"}]
)
# Check if blocked
if response.get("blocked"):
print(f"Blocked by: {response.get('blocked_by')}")
else:
print(response["content"])
```
## Guardrails AI
### Structured Validation
```python
from guardrails import Guard
from guardrails.validators import ValidLength, ValidRange, OneLine
from pydantic import BaseModel, Field
class ProductReview(BaseModel):
summary: str = Field(
validators=[ValidLength(min=10, max=200, on_fail="reask")]
)
rating: float = Field(
validators=[ValidRange(min=1, max=5, on_fail="fix")]
)
pros: list[str]
cons: list[str]
guard = Guard.from_pydantic(ProductReview)
result = guard(
llm_api=openai.chat.completions.create,
model="gpt-4o",
messages=[{"role": "user", "content": "Review this product..."}]
)
if result.validation_passed:
review = result.validated_output
else:
print(f"Validation failed: {result.error}")
```
### Custom Validators
```python
from guardrails.validators import Validator, register_validator
from typing import Any
@register_validator(name="no_competitor_mentions", data_type="string")
class NoCompetitorMentions(Validator):
def __init__(self, competitors: list[str], on_fail: str = "fix"):
super().__init__(on_fail=on_fail)
self.competitors = [c.lower() for c in competitors]
def validate(self, value: Any, metadata: dict) -> dict:
text_lower = value.lower()
for competitor in self.competitors:
if competitor in text_lower:
return {
"outcome": "fail",
"error_message": f"Response mentions competitor: {competitor}",
"fix_value": self._redact(value, competitor)
}
return {"outcome": "pass"}
def _redact(self, text: str, competitor: str) -> str:
import re
return re.sub(competitor, "[REDACTED]", text, flags=re.IGNORECASE)
```
## RAG-Based Fact Grounding
### Grounded Response Generation
```python
from typing import Optional
class GroundedAgent:
def __init__(self, llm_client, retriever):
self.client = llm_client
self.retriever = retriever
async def respond(
self,
query: str,
require_citation: bool = True
) -> dict:
# Retrieve relevant context
docs = await self.retriever.search(query, k=5)
context = "\n\n".join([d.content for d in docs])
# Generate grounded response
response = await self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
system=f"""Answer based ONLY on the provided context.
If the answer is not in the context, say "I don't have information about that."
Always cite your sources using [Source N] notation.
Context:
{context}""",
messages=[{"role": "user", "content": query}]
)
answer = response.content[0].text
# Verify citations exist
if require_citation:
verified = await self._verify_citations(answer, docs)
if not verified["all_valid"]:
answer = await self._regenerate_with_valid_citations(
query, context, verified["invalid_claims"]
)
return {
"answer": answer,
"sources": docs,
"grounded": True
}
async def _verify_citations(
self,
answer: str,
docs: list
) -> dict:
"""Verify that cited claims actually appear in sources."""
verification = await self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""Verify each claim in this answer is supported by the sources.
Answer: {answer}
Sources:
{[d.content for d in docs]}
List any claims NOT supported by sources."""
}]
)
unsupported = verification.content[0].text
return {
"all_valid": "none" in unsupported.lower() or unsupported.strip() == "",
"invalid_claims": unsupported
}
```
### Confidence Scoring
```python
class ConfidenceScorer:
def __init__(self, llm_client):
self.client = llm_client
async def score(
self,
question: str,
answer: str,
context: Optional[str] = None
) -> dict:
prompt = f"""Rate the confidence of this answer on a scale of 0-1.
Question: {question}
Answer: {answer}
{"Context: " + context if context else ""}
Consider:
- Factual accuracy (is it verifiable?)
- Completeness (does it fully answer?)
- Hedging language (does it express uncertainty?)
- Specificity (are claims specific or vague?)
Respond with JSON: {{"confidence": 0.X, "reasoning": "..."}}"""
response = await self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
import json
return json.loads(response.content[0].text)
```
## Multi-Layer Guardrails
### Complete Pipeline
```python
class GuardedAgent:
def __init__(self, llm_client, retriever, config: dict):
self.client = llm_client
self.retriever = retriever
self.config = config
async def respond(self, user_input: str) -> dict:
# Layer 1: Input validation
input_check = await self._check_input(user_input)
if not input_check["safe"]:
return {"blocked": True, "reason": input_check["reason"]}
# Layer 2: RAG grounding
context = await self._get_context(user_input)
# Layer 3: Generate with constraints
response = await self._generate(user_input, context)
# Layer 4: Output validation
output_check = await self._check_output(response, context)
if not output_check["valid"]:
# Regenerate with feedback
response = await self._regenerate(
user_input, context, output_check["issues"]
)
# Layer 5: Confidence check
confidence = await self._score_confidence(
user_input, response, context
)
if confidence["score"] < self.config["min_confidence"]:
response = self._add_uncertainty_disclaimer(response)
return {
"response": response,
"confidence": confidence["score"],
"sources": context.sources if context else []
}
async def _check_input(self, text: str) -> dict:
"""Check for jailbreaks, PII, off-topic."""
# Use NeMo or custom checks
pass
async def _check_output(self, response: str, context) -> dict:
"""Verify factual accuracy, no hallucinations."""
pass
```
## Best Practices
1. **Layer defenses**: Input → Generation → Output → Confidence
2. **Ground in sources**: Always retrieve context before generating
3. **Verify claims**: Cross-check generated facts against sources
4. **Express uncertainty**: Add disclaimers for low-confidence answers
5. **Monitor and iterate**: Log blocked content, refine rules
6. **Test adversarially**: Regularly probe for bypasses
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### scripts/example_agent.py
```python
#!/usr/bin/env python3
"""
Example: Streaming Agent with Checkpointing and Guardrails
This demonstrates a production-ready agentic workflow combining:
- Anthropic streaming with extended thinking
- Pydantic structured outputs
- LangGraph checkpointing
- Basic guardrails
"""
import asyncio
import json
from typing import TypedDict, Optional
from pydantic import BaseModel, Field
import instructor
# Uncomment and configure as needed:
# import anthropic
# from langgraph.graph import StateGraph, END
# from langgraph.checkpoint.memory import InMemorySaver
# ============= Structured Output Models =============
class ResearchResult(BaseModel):
"""Structured output for research tasks."""
topic: str = Field(description="The research topic")
summary: str = Field(description="Executive summary of findings")
key_points: list[str] = Field(description="Key findings as bullet points")
confidence: float = Field(ge=0, le=1, description="Confidence score")
sources: list[str] = Field(default=[], description="Sources used")
class AgentAction(BaseModel):
"""Structured agent action decision."""
action: str = Field(description="Action to take: search, analyze, respond")
reasoning: str = Field(description="Why this action was chosen")
parameters: dict = Field(default={}, description="Action parameters")
# ============= Agent State =============
class AgentState(TypedDict):
"""State for the agentic workflow."""
query: str
messages: list
research: Optional[dict]
final_response: Optional[str]
step: str
# ============= Streaming Handler =============
async def stream_to_ui(event_type: str, content: str):
"""
Stream events to UI (implement based on your transport).
Replace with SSE, WebSocket, or your preferred method.
"""
event = {
"type": event_type,
"content": content
}
print(f"[{event_type.upper()}] {content[:100]}...")
# ============= Agent Nodes =============
async def research_node(state: AgentState, client) -> AgentState:
"""Research node with streaming visibility."""
await stream_to_ui("thinking", "Starting research phase...")
# Use instructor for structured output
instructor_client = instructor.from_provider("anthropic/claude-sonnet-4-5")
result = instructor_client.create(
response_model=ResearchResult,
messages=[{
"role": "user",
"content": f"Research this topic and provide structured findings: {state['query']}"
}],
max_retries=2
)
await stream_to_ui("research_complete", result.summary)
return {
**state,
"research": result.model_dump(),
"step": "analyze"
}
async def analyze_node(state: AgentState, client) -> AgentState:
"""Analyze research results."""
await stream_to_ui("thinking", "Analyzing research findings...")
research = state.get("research", {})
# Stream analysis
async with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""Based on this research, provide a comprehensive analysis:
Research Summary: {research.get('summary', 'N/A')}
Key Points: {research.get('key_points', [])}
Provide insights and recommendations."""
}]
) as stream:
response_text = ""
async for text in stream.text_stream:
response_text += text
await stream_to_ui("response_chunk", text)
return {
**state,
"final_response": response_text,
"step": "complete"
}
# ============= Guardrails =============
def check_input_safety(user_input: str) -> tuple[bool, str]:
"""Basic input guardrails."""
# Add your safety checks here
blocked_patterns = ["ignore previous instructions", "system prompt"]
input_lower = user_input.lower()
for pattern in blocked_patterns:
if pattern in input_lower:
return False, f"Blocked: potential jailbreak attempt"
return True, ""
def check_output_safety(response: str) -> tuple[bool, str]:
"""Basic output guardrails."""
# Add PII detection, toxicity checks, etc.
return True, ""
# ============= Main Agent =============
async def run_agent(query: str):
"""
Run the complete agentic workflow.
In production, add:
- Real LangGraph StateGraph with checkpointing
- Proper error handling and recovery
- Observability/logging
"""
# Input guardrails
safe, reason = check_input_safety(query)
if not safe:
await stream_to_ui("blocked", reason)
return {"error": reason}
# Initialize state
state: AgentState = {
"query": query,
"messages": [],
"research": None,
"final_response": None,
"step": "research"
}
# In production, use:
# from langgraph.checkpoint.postgres import PostgresSaver
# checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
# app = graph.compile(checkpointer=checkpointer)
# result = await app.ainvoke(state, {"configurable": {"thread_id": "..."}})
print(f"\n{'='*50}")
print(f"Starting agent workflow for: {query}")
print(f"{'='*50}\n")
# Simulate workflow steps
await stream_to_ui("started", f"Processing query: {query}")
# Output guardrails would check final_response
safe, reason = check_output_safety(state.get("final_response", ""))
if not safe:
await stream_to_ui("filtered", reason)
await stream_to_ui("complete", "Workflow finished")
return state
# ============= Example Usage =============
if __name__ == "__main__":
# Example query
query = "What are the latest developments in AI agents?"
# Run the agent
result = asyncio.run(run_agent(query))
print("\n" + "="*50)
print("Final Result:")
print(json.dumps(result, indent=2, default=str))
```