intelligent-router
Intelligent model routing for sub-agent task delegation. Choose the optimal model based on task complexity, cost, and capability requirements. Reduces costs by routing simple tasks to cheaper models while preserving quality for complex work.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-intelligent-router
Repository
Skill path: skills/bowen31337/intelligent-router
Intelligent model routing for sub-agent task delegation. Choose the optimal model based on task complexity, cost, and capability requirements. Reduces costs by routing simple tasks to cheaper models while preserving quality for complex work.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install intelligent-router into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding intelligent-router to shared team environments
- Use intelligent-router for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: intelligent-router
description: Intelligent model routing for sub-agent task delegation. Choose the optimal model based on task complexity, cost, and capability requirements. Reduces costs by routing simple tasks to cheaper models while preserving quality for complex work.
version: 3.2.0
core: true
---
# Intelligent Router — Core Skill
> **CORE SKILL**: This skill is infrastructure, not guidance. Installation = enforcement.
> Run `bash skills/intelligent-router/install.sh` to activate.
## What It Does
Automatically classifies any task into a tier (SIMPLE/MEDIUM/COMPLEX/REASONING/CRITICAL)
and recommends the cheapest model that can handle it well.
**The problem it solves:** Without routing, every cron job and sub-agent defaults to Sonnet
(expensive). With routing, monitoring tasks use free local models, saving 80-95% on cost.
---
## MANDATORY Protocol (enforced via AGENTS.md)
### Before spawning any sub-agent:
```bash
python3 skills/intelligent-router/scripts/router.py classify "task description"
```
### Before creating any cron job:
```bash
python3 skills/intelligent-router/scripts/spawn_helper.py "task description"
# Outputs the exact model ID and payload snippet to use
```
### To validate a cron payload has model set:
```bash
python3 skills/intelligent-router/scripts/spawn_helper.py --validate '{"kind":"agentTurn","message":"..."}'
```
### ❌ VIOLATION (never do this):
```python
# Cron job without model = Sonnet default = expensive waste
{"kind": "agentTurn", "message": "check server..."} # ← WRONG
```
### ✅ CORRECT:
```python
# Always specify model from router recommendation
{"kind": "agentTurn", "message": "check server...", "model": "ollama/glm-4.7-flash"}
```
---
## Tier System
| Tier | Use For | Primary Model | Cost |
|------|---------|---------------|------|
| 🟢 SIMPLE | Monitoring, heartbeat, checks, summaries | `anthropic-proxy-6/glm-4.7` (alt: proxy-4) | $0.50/M |
| 🟡 MEDIUM | Code fixes, patches, research, data analysis | `nvidia-nim/meta/llama-3.3-70b-instruct` | $0.40/M |
| 🟠 COMPLEX | Features, architecture, multi-file, debug | `anthropic/claude-sonnet-4-6` | $3/M |
| 🔵 REASONING | Proofs, formal logic, deep analysis | `nvidia-nim/moonshotai/kimi-k2-thinking` | $1/M |
| 🔴 CRITICAL | Security, production, high-stakes | `anthropic/claude-opus-4-6` | $5/M |
**SIMPLE fallback chain:** `anthropic-proxy-4/glm-4.7` → `nvidia-nim/qwen/qwen2.5-7b-instruct` ($0.15/M)
> ⚠️ **`ollama-gpu-server` is BLOCKED** for cron/spawn use. Ollama binds to `127.0.0.1` by default — unreachable over LAN from the OpenClaw host. The `router_policy.py` enforcer will reject any payload referencing it.
**Tier classification uses 4 capability signals (not cost alone):**
- `effective_params` (50%) — extracted from model ID or `known-model-params.json` for closed-source models
- `context_window` (20%) — larger = more capable
- `cost_input` (20%) — price as quality proxy (weak signal, last resort for unknown sizes)
- `reasoning_flag` (10%) — bonus for dedicated thinking specialists (R1, QwQ, Kimi-K2)
---
## Policy Enforcer (NEW in v3.2.0)
`router_policy.py` catches bad model assignments **before they are created**, not after they fail.
### Validate a cron payload before submitting
```bash
python3 skills/intelligent-router/scripts/router_policy.py check \
'{"kind":"agentTurn","model":"ollama-gpu-server/glm-4.7-flash","message":"check server"}'
# Output: VIOLATION: Blocked model 'ollama-gpu-server/glm-4.7-flash'. Recommended: anthropic-proxy-6/glm-4.7
```
### Get enforced model recommendation for a task
```bash
python3 skills/intelligent-router/scripts/router_policy.py recommend "monitor alphastrike service"
# Output: Tier: SIMPLE Model: anthropic-proxy-6/glm-4.7
python3 skills/intelligent-router/scripts/router_policy.py recommend "monitor alphastrike service" --alt
# Output: Tier: SIMPLE Model: anthropic-proxy-4/glm-4.7 ← alternate key for load distribution
```
### Audit all existing cron jobs
```bash
python3 skills/intelligent-router/scripts/router_policy.py audit
# Scans all crons, reports any with blocked or missing models
```
### Show blocklist
```bash
python3 skills/intelligent-router/scripts/router_policy.py blocklist
```
### Policy rules enforced
1. **Model must be set** — no model field = Sonnet default = expensive waste
2. **No blocked models** — `ollama-gpu-server/*` and bare `ollama/*` are rejected for cron use
3. **CRITICAL tasks** — warns if using a non-Opus model for classified-critical work
---
## Installation (Core Skill Setup)
Run once to self-integrate into AGENTS.md:
```bash
bash skills/intelligent-router/install.sh
```
This patches AGENTS.md with the mandatory protocol so it's always in context.
---
## CLI Reference
```bash
# ── Policy enforcer (run before creating any cron/spawn) ──
python3 skills/intelligent-router/scripts/router_policy.py check '{"kind":"agentTurn","model":"...","message":"..."}'
python3 skills/intelligent-router/scripts/router_policy.py recommend "task description"
python3 skills/intelligent-router/scripts/router_policy.py recommend "task" --alt # alternate proxy key
python3 skills/intelligent-router/scripts/router_policy.py audit # scan all crons
python3 skills/intelligent-router/scripts/router_policy.py blocklist
# ── Core router ──
# Classify + recommend model
python3 skills/intelligent-router/scripts/router.py classify "task"
# Get model id only (for scripting)
python3 skills/intelligent-router/scripts/spawn_helper.py --model-only "task"
# Show spawn command
python3 skills/intelligent-router/scripts/spawn_helper.py "task"
# Validate cron payload has model set
python3 skills/intelligent-router/scripts/spawn_helper.py --validate '{"kind":"agentTurn","message":"..."}'
# List all models by tier
python3 skills/intelligent-router/scripts/router.py models
# Detailed scoring breakdown
python3 skills/intelligent-router/scripts/router.py score "task"
# Config health check
python3 skills/intelligent-router/scripts/router.py health
# Auto-discover working models (NEW)
python3 skills/intelligent-router/scripts/discover_models.py
# Auto-discover + update config
python3 skills/intelligent-router/scripts/discover_models.py --auto-update
# Test specific tier only
python3 skills/intelligent-router/scripts/discover_models.py --tier COMPLEX
```
---
## Scoring System
15-dimension weighted scoring (not just keywords):
1. **Reasoning markers** (0.18) — prove, theorem, derive
2. **Code presence** (0.15) — code blocks, file extensions
3. **Multi-step patterns** (0.12) — first...then, numbered lists
4. **Agentic task** (0.10) — run, fix, deploy, build
5. **Technical terms** (0.10) — architecture, security, protocol
6. **Token count** (0.08) — complexity from length
7. **Creative markers** (0.05) — story, compose, brainstorm
8. **Question complexity** (0.05) — multiple who/what/how
9. **Constraint count** (0.04) — must, require, exactly
10. **Imperative verbs** (0.03) — analyze, evaluate, audit
11. **Output format** (0.03) — json, table, markdown
12. **Simple indicators** (0.02) — check, get, show (inverted)
13. **Domain specificity** (0.02) — acronyms, dotted notation
14. **Reference complexity** (0.02) — "mentioned above"
15. **Negation complexity** (0.01) — not, never, except
Confidence: `1 / (1 + exp(-8 × (score - 0.5)))`
---
## Config
Models defined in `config.json`. Add new models there, router picks them up automatically.
Local Ollama models have zero cost — always prefer them for SIMPLE tasks.
---
## Auto-Discovery (Self-Healing)
The intelligent-router can **automatically discover working models** from all configured providers via **real live inference tests** (not config-existence checks).
### How It Works
1. **Provider Scanning:** Reads `~/.openclaw/openclaw.json` → finds all models
2. **Live Inference Test:** Sends `"hi"` to each model, checks it actually responds (catches auth failures, quota exhaustion, 404s, timeouts)
3. **OAuth Bypass:** Providers with `sk-ant-oat01-*` tokens (Anthropic OAuth) are skipped in raw HTTP — OpenClaw refreshes these transparently, so they're always marked available
4. **Thinking Model Support:** Models that return `content=None` + `reasoning_content` (GLM-4.7, Kimi-K2, Qwen3-thinking) are correctly detected as available
5. **Auto-Classification:** Tiers assigned via `tier_classifier.py` using 4 capability signals
6. **Config Update:** Removes unavailable models, rebuilds tier primaries from working set
7. **Cron:** Hourly refresh (cron id: `a8992c1f`) keeps model list current, alerts if availability changes by >2
### Usage
```bash
# One-time discovery
python3 skills/intelligent-router/scripts/discover_models.py
# Auto-update config with working models only
python3 skills/intelligent-router/scripts/discover_models.py --auto-update
# Set up hourly refresh cron
openclaw cron add --job '{
"name": "Model Discovery Refresh",
"schedule": {"kind": "every", "everyMs": 3600000},
"payload": {
"kind": "systemEvent",
"text": "Run: bash skills/intelligent-router/scripts/auto_refresh_models.sh",
"model": "ollama/glm-4.7-flash"
}
}'
```
### Benefits
✅ **Self-healing:** Automatically removes broken models (e.g., expired OAuth)
✅ **Zero maintenance:** No manual model list updates
✅ **New models:** Auto-adds newly released models
✅ **Cost optimization:** Always uses cheapest working model per tier
### Discovery Output
Results saved to `skills/intelligent-router/discovered-models.json`:
```json
{
"scan_timestamp": "2026-02-19T21:00:00",
"total_models": 25,
"available_models": 23,
"unavailable_models": 2,
"providers": {
"anthropic": {
"available": 2,
"unavailable": 0,
"models": [...]
}
}
}
```
### Pinning Models
To preserve a model even if it fails discovery:
```json
{
"id": "special-model",
"tier": "COMPLEX",
"pinned": true // Never remove during auto-update
}
```
## ⚠️ Known Gap — Proactive Health-Based Routing (2026-03-04)
Current router is **reactive** not **proactive**:
- Fallback only fires AFTER a 429 is received
- No awareness of concurrent sessions on same proxy
- No cooldown tracking after rate-limit events
**Needed improvements:**
1. Track last-429 timestamp per provider → skip if within cooldown window
2. Track active concurrent spawns per provider → if >1 active, route to OAuth
3. Before spawning N parallel agents, check if single provider can handle N concurrent
4. Expose `router.get_best_available(n_concurrent=2)` API
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### README.md
```markdown
# Intelligent Router
**Version:** 2.0.0
**License:** MIT
An intelligent model routing system for AI agent task delegation. Optimize costs by routing simple tasks to cheaper models while preserving quality for complex work.
## Overview
The Intelligent Router helps AI agents make smart decisions about which LLM model to use for different tasks. By classifying tasks into four tiers (SIMPLE, MEDIUM, COMPLEX, CRITICAL) and routing them to appropriate models, you can reduce costs by 80-95% without sacrificing quality on important work.
**Key benefits:**
- 📉 **Massive cost savings** — Route simple tasks to cheap models
- 🎯 **Quality where it matters** — Use premium models for critical work
- 🚀 **Simple to use** — Clear tier system and CLI tools
- 🔧 **Fully customizable** — Bring your own models and pricing
- 📊 **Cost estimation** — Know before you spend
## Quick Start
### 1. Installation
```bash
# Via ClawHub (recommended)
clawhub install intelligent-router
# Or manually
cd skills/
git clone <this-repo> intelligent-router
```
### 2. Configuration
Edit `config.json` to define your available models:
```json
{
"models": [
{
"id": "openai/gpt-4o-mini",
"alias": "GPT-4o Mini",
"tier": "MEDIUM",
"provider": "openai",
"input_cost_per_m": 0.15,
"output_cost_per_m": 0.60,
"context_window": 128000,
"capabilities": ["text", "code", "vision"],
"notes": "Great balance of cost and capability"
}
]
}
```
**Required fields:**
- `id` — Model identifier (e.g., "provider/model-name")
- `alias` — Human-friendly name
- `tier` — One of: SIMPLE, MEDIUM, COMPLEX, CRITICAL
- `input_cost_per_m` — Cost per million input tokens (USD)
- `output_cost_per_m` — Cost per million output tokens (USD)
**Recommended:** Include at least one model per tier for full coverage.
### 3. Classify Tasks
Use the CLI to classify tasks and get model recommendations:
```bash
# Classify a task
python scripts/router.py classify "fix authentication bug"
# Output:
# Classification: MEDIUM
# Recommended Model: GPT-4o Mini
# Cost: $0.15/$0.60 per M tokens
# Estimate cost
python scripts/router.py cost-estimate "build payment processing system"
# Output:
# Tier: COMPLEX
# Estimated cost: $0.060 USD
# List your models
python scripts/router.py models
# Output: All configured models grouped by tier
# Check configuration health
python scripts/router.py health
# Output: Validates config.json structure
```
### 4. Use in Your Agent
When spawning sub-agents, reference models from your config:
```python
# Simple task — use cheap model
sessions_spawn(
task="Check server status and report",
model="openai/gpt-4o-mini", # Your SIMPLE tier model
label="health-check"
)
# Complex task — use premium model
sessions_spawn(
task="Build authentication system with JWT",
model="anthropic/claude-sonnet-4", # Your COMPLEX tier model
label="auth-build"
)
```
## The Four-Tier System
| Tier | Use For | Model Characteristics | Example Cost |
|------|---------|----------------------|--------------|
| **🟢 SIMPLE** | Monitoring, checks, summaries | Fast, cheap, reliable | $0.10-$0.50/M |
| **🟡 MEDIUM** | Code fixes, research, analysis | Balanced cost/quality | $0.50-$3.00/M |
| **🟠 COMPLEX** | Multi-file builds, debugging | High-quality reasoning | $3.00-$5.00/M |
| **🔴 CRITICAL** | Security, production, financial | Best available | $5.00+/M |
### Tier Selection Heuristics
**Keywords that trigger each tier:**
- **SIMPLE**: check, monitor, fetch, status, list, summarize
- **MEDIUM**: fix, patch, research, analyze, review, test
- **COMPLEX**: build, create, debug, architect, design, integrate
- **CRITICAL**: security, production, deploy, financial, audit
**Examples:**
```
"Check GitHub notifications" → SIMPLE
"Fix bug in login.py" → MEDIUM
"Build authentication system" → COMPLEX
"Security audit of auth code" → CRITICAL
```
## Configuration Guide
### Model Selection Criteria
**SIMPLE Tier:**
- Cost under $0.50/M input tokens
- Good for repetitive, well-defined tasks
- Examples: GPT-4o Mini, Gemini Flash, local Ollama models
**MEDIUM Tier:**
- Cost $0.50-$3.00/M input tokens
- Good at code and general reasoning
- Examples: GPT-4o Mini, Claude Haiku, Llama 3.3 70B
**COMPLEX Tier:**
- Cost $3.00-$5.00/M input tokens
- Excellent code generation and reasoning
- Examples: Claude Sonnet, GPT-4o, Gemini Pro
**CRITICAL Tier:**
- Best available quality
- For high-stakes operations only
- Examples: Claude Opus, GPT-4, Gemini Ultra, o1/o3
### Example Configurations
**Budget-conscious setup:**
```json
{
"models": [
{"id": "local/ollama-qwen", "tier": "SIMPLE", "input_cost_per_m": 0.00, ...},
{"id": "openai/gpt-4o-mini", "tier": "MEDIUM", "input_cost_per_m": 0.15, ...},
{"id": "anthropic/claude-sonnet", "tier": "COMPLEX", "input_cost_per_m": 3.00, ...}
]
}
```
**Performance-focused setup:**
```json
{
"models": [
{"id": "openai/gpt-4o-mini", "tier": "SIMPLE", "input_cost_per_m": 0.15, ...},
{"id": "anthropic/claude-sonnet", "tier": "MEDIUM", "input_cost_per_m": 3.00, ...},
{"id": "anthropic/claude-opus", "tier": "CRITICAL", "input_cost_per_m": 15.00, ...}
]
}
```
## CLI Reference
### `router.py classify <task>`
Classify a task and recommend a model.
```bash
python scripts/router.py classify "debug race condition in worker threads"
```
**Output:**
```
Task: debug race condition in worker threads
Classification: COMPLEX
Reasoning: Multi-file development, debugging, or architectural work
Recommended Model:
ID: anthropic/claude-sonnet-4
Alias: Claude Sonnet
Provider: anthropic
Cost: $3.00/$15.00 per M tokens
Notes: High-quality model for complex multi-file development
```
---
### `router.py models`
List all configured models grouped by tier.
```bash
python scripts/router.py models
```
**Output:**
```
Configured Models by Tier:
SIMPLE:
• GPT-4o Mini (openai/gpt-4o-mini) - $0.15/$0.60/M
MEDIUM:
• Claude Haiku (anthropic/claude-haiku) - $0.80/$4.00/M
COMPLEX:
• Claude Sonnet (anthropic/claude-sonnet-4) - $3.00/$15.00/M
CRITICAL:
• Claude Opus (anthropic/claude-opus-4) - $15.00/$75.00/M
```
---
### `router.py health`
Validate configuration file.
```bash
python scripts/router.py health
```
**Output:**
```
Configuration Health Check
Config: /path/to/config.json
Status: HEALTHY
Models: 4
✅ Configuration is valid
```
---
### `router.py cost-estimate <task>`
Estimate the cost of running a task.
```bash
python scripts/router.py cost-estimate "build payment processing system"
```
**Output:**
```
Task: build payment processing system
Cost Estimate:
Tier: COMPLEX
Model: Claude Sonnet
Estimated Tokens: 5000 in / 3000 out
Input Cost: $0.015000
Output Cost: $0.045000
Total Cost: $0.060000 USD
```
## Usage Patterns
### Pattern 1: Simple Routing
For straightforward tasks, just spawn with the appropriate model:
```python
# Classify task (mentally or via CLI)
# "Check server health" → SIMPLE tier
sessions_spawn(
task="Check server health and report status",
model="openai/gpt-4o-mini", # Your SIMPLE tier model
label="health-check"
)
```
### Pattern 2: Two-Phase Processing
For large tasks, use a cheap model for bulk work, then refine with a better model:
```python
# Phase 1: Extract with SIMPLE model
sessions_spawn(
task="Extract key sections from research paper at /tmp/paper.pdf",
model="{simple_model}",
label="extract"
)
# Phase 2: Analyze with MEDIUM model (after extraction completes)
sessions_spawn(
task="Analyze extracted sections at /tmp/sections.txt",
model="{medium_model}",
label="analyze"
)
```
**Savings:** ~80% cost reduction by processing bulk content with cheap model.
### Pattern 3: Tiered Escalation
Start with MEDIUM tier, escalate to COMPLEX if needed:
```python
# Try MEDIUM first
result = sessions_spawn(
task="Debug authentication issue",
model="{medium_model}",
label="debug-attempt-1"
)
# If unsuccessful, escalate
if not result.successful:
sessions_spawn(
task="Deep debug of authentication (previous attempt incomplete)",
model="{complex_model}",
label="debug-attempt-2"
)
```
### Pattern 4: Batch Processing
Group similar simple tasks together:
```python
checks = ["server1", "server2", "server3", "database", "cache"]
sessions_spawn(
task=f"Health check these services: {', '.join(checks)}. Report any issues.",
model="{simple_model}",
label="batch-checks"
)
```
## Cost Optimization Tips
### 1. Profile Your Workload
Track which tasks are most frequent:
- High-frequency tasks → optimize aggressively (use SIMPLE tier)
- Low-frequency tasks → quality over cost (use COMPLEX/CRITICAL tier)
### 2. Measure Success Rates
If a cheaper model requires frequent retries, it's not actually cheaper:
- Track: `(cost per attempt) / (success rate)` = true cost
- If SIMPLE tier has <80% success rate, use MEDIUM tier instead
### 3. Use Local Models for SIMPLE Tier
If you have GPU access, run local models (Ollama, vLLM) for high-frequency simple tasks:
- Zero API costs
- Unlimited usage
- Privacy benefits
### 4. Enable Thinking Mode Selectively
Extended thinking can 2-5x the cost but dramatically improves quality:
- **Use for:** Architecture decisions, complex debugging, critical analysis
- **Avoid for:** Routine tasks, simple code fixes, monitoring
```python
# Thinking mode for hard problem
sessions_spawn(
task="Design scalable architecture for real-time system",
model="{complex_model}",
thinking="on", # Worth the extra cost
label="architecture"
)
```
### 5. Batch When Possible
Instead of spawning 10 agents for 10 health checks, spawn 1 agent to do all 10.
**Savings example:**
- 10 separate calls: 10× overhead
- 1 batched call: 1× overhead = ~40% reduction in actual costs
## Real-World Savings
**Example daily workload:**
| Task | Frequency | Tier | Cost/day | If All COMPLEX |
|------|-----------|------|----------|----------------|
| Health checks | 48/day | SIMPLE | $0.005 | $2.40 |
| Monitoring | 12/day | SIMPLE | $0.002 | $0.60 |
| Code reviews | 5/day | MEDIUM | $0.01 | $0.25 |
| Bug fixes | 2/day | MEDIUM | $0.01 | $0.10 |
| Features | 1/day | COMPLEX | $0.05 | $0.05 |
| Security | 1/week | CRITICAL | $0.07 | $0.07 |
| **Total** | | | **$0.147** | **$3.47** |
**Monthly:** $4.40 with routing vs $104 without = **96% savings**
## Documentation
- **[SKILL.md](SKILL.md)** — Complete routing guide and usage patterns
- **[references/model-catalog.md](references/model-catalog.md)** — How to evaluate and select models
- **[references/examples.md](references/examples.md)** — Real-world routing examples
- **[config.json](config.json)** — Model configuration template
## Requirements
- **Python:** 3.8 or higher
- **Dependencies:** None (uses only standard library)
- **Platform:** Cross-platform (Linux, macOS, Windows)
## Contributing
Contributions welcome! Areas for improvement:
- Additional classification heuristics
- Support for more cost factors (latency, throughput, etc.)
- Model capability detection
- Provider-specific optimizations
## License
MIT License — see LICENSE file for details.
## Support
- **Issues:** Open a GitHub issue for bugs or questions
- **Documentation:** See SKILL.md for detailed usage guide
- **Examples:** See references/examples.md for real-world patterns
---
**Built for ClawHub** — Part of the OpenClaw skill ecosystem.
```
### _meta.json
```json
{
"owner": "bowen31337",
"slug": "intelligent-router",
"displayName": "Intelligent Model Router",
"latest": {
"version": "3.0.1",
"publishedAt": 1772659114603,
"commit": "https://github.com/openclaw/skills/commit/865d480f14e7b83e2c1485552ceac6683fb6d670"
},
"history": [
{
"version": "3.2.0",
"publishedAt": 1772051840378,
"commit": "https://github.com/openclaw/skills/commit/3af2fc4cd663376436ef2cb9f76ae1b4de3474c7"
},
{
"version": "3.1.0",
"publishedAt": 1771535609130,
"commit": "https://github.com/openclaw/skills/commit/7f55d6bbb6bdafb782f5d6625b4a9b8f064c771d"
},
{
"version": "2.2.0",
"publishedAt": 1771024183865,
"commit": "https://github.com/openclaw/skills/commit/fd06167f10c52d6ceb89693dcd250fffe508186d"
},
{
"version": "2.0.0",
"publishedAt": 1770722496977,
"commit": "https://github.com/openclaw/skills/commit/018ced52f4a241294204ed3b6aeb403390cae0d9"
},
{
"version": "1.0.0",
"publishedAt": 1770530882971,
"commit": "https://github.com/openclaw/skills/commit/1392e54bc70a36e5c2a8fefe04595f07dbeeff6b"
}
]
}
```
### scripts/auto_refresh_models.sh
```bash
#!/bin/bash
# Intelligent Router - Auto-Refresh Cron Wrapper
# Usage: Add to cron for hourly model discovery
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR/../.."
echo "[$(date -Iseconds)] Starting model discovery refresh..."
# Run discovery with auto-update
python3 skills/intelligent-router/scripts/discover_models.py --auto-update
# Check if any models went down
DISCOVERY_FILE="skills/intelligent-router/discovered-models.json"
if [ -f "$DISCOVERY_FILE" ]; then
UNAVAILABLE=$(jq '.unavailable_models' "$DISCOVERY_FILE")
if [ "$UNAVAILABLE" -gt 0 ]; then
echo "⚠️ Warning: $UNAVAILABLE model(s) unavailable"
# Send alert to main session
if command -v openclaw &> /dev/null; then
echo "Model discovery alert: $UNAVAILABLE model(s) failed health check" | \
openclaw sessions send --label main --message "$(cat)"
fi
fi
fi
echo "[$(date -Iseconds)] Model discovery refresh complete"
```
### scripts/discover_models.py
```python
#!/usr/bin/env python3
"""
Intelligent Router - Model Auto-Discovery
Auto-discovers working models from all configured providers.
Tests each model with a minimal inference call to verify:
- Model is accessible
- Auth is working
- Returns valid responses
Usage:
python3 discover_models.py # Scan and display
python3 discover_models.py --auto-update # Scan and update config.json
python3 discover_models.py --tier COMPLEX # Test specific tier models
"""
import json
import sys
import time
import asyncio
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional
import subprocess
# Capability-based tier classifier (no hard-coded name patterns)
sys.path.insert(0, str(Path(__file__).parent))
try:
from tier_classifier import classify_from_openclaw_config, build_tier_config
_CLASSIFIER_AVAILABLE = True
except ImportError:
_CLASSIFIER_AVAILABLE = False
# ANSI colors
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"
CONFIG_PATH = Path.home() / ".openclaw" / "openclaw.json"
ROUTER_CONFIG = Path(__file__).parent.parent / "config.json"
DISCOVERY_OUTPUT = Path(__file__).parent.parent / "discovered-models.json"
def load_openclaw_config() -> Dict[str, Any]:
"""Load main OpenClaw config."""
if not CONFIG_PATH.exists():
print(f"{RED}Error: Config not found at {CONFIG_PATH}{RESET}")
sys.exit(1)
with open(CONFIG_PATH) as f:
return json.load(f)
def load_router_config() -> Dict[str, Any]:
"""Load intelligent-router config."""
if not ROUTER_CONFIG.exists():
return {"models": [], "routing_rules": {}}
with open(ROUTER_CONFIG) as f:
return json.load(f)
def test_model_live(provider_cfg: dict, provider_name: str, model_id: str, timeout: int = 30) -> Dict[str, Any]:
"""
Real inference test: send "hi" to the model and verify it responds.
Supports both OpenAI-compatible and Anthropic-messages APIs.
Returns: {available: bool, latency: float, error: str | None, response_preview: str | None}
"""
import urllib.request
import urllib.error
start = time.time()
base_url = provider_cfg.get("baseUrl", "").rstrip("/")
api_key = provider_cfg.get("apiKey", "")
api_type = provider_cfg.get("api", "openai-completions")
try:
if api_type == "openai-completions":
# Standard OpenAI /v1/chat/completions
url = f"{base_url}/chat/completions"
payload = {
"model": model_id,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
"stream": False,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
elif api_type == "anthropic-messages":
# Anthropic /v1/messages — URL suffix varies by proxy
if base_url.endswith("/messages"):
url = base_url
elif "/v1" in base_url:
url = f"{base_url}/messages"
else:
url = f"{base_url}/v1/messages"
payload = {
"model": model_id,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
}
headers = {
"Content-Type": "application/json",
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
}
else:
return {
"available": False,
"latency": 0.0,
"error": f"Unknown API type: {api_type}",
"response_preview": None,
"timestamp": datetime.now().isoformat(),
}
body = json.dumps(payload).encode()
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
latency = round(time.time() - start, 3)
raw = resp.read().decode()
data = json.loads(raw)
# Extract response text — handle thinking models:
# - GLM-5 style: content=None, reasoning_content="..."
# - MiniMax/QwQ style: content="<think>...</think>actual answer"
preview = None
if api_type == "openai-completions":
msg = data.get("choices", [{}])[0].get("message", {})
text = msg.get("content") or msg.get("reasoning_content") or ""
# Strip <think>...</think> blocks (MiniMax, QwQ, DeepSeek style).
# Also strip partial <think> (when max_tokens cuts off inside thinking block).
import re as _re
text = _re.sub(r"<think>.*?(?:</think>|$)", "", str(text), flags=_re.DOTALL).strip()
# For thinking models: even if all output was <think>, the model IS responding
if not text:
finish = data.get("choices", [{}])[0].get("finish_reason", "")
text = "(thinking model)" if finish in ("length", "stop") else ""
preview = text[:40] if text else None
elif api_type == "anthropic-messages":
content = data.get("content", [{}])
if content:
preview = (content[0].get("text") or "")[:40] or None
return {
"available": True,
"latency": latency,
"error": None,
"response_preview": preview,
"timestamp": datetime.now().isoformat(),
}
except urllib.error.HTTPError as e:
body_text = ""
try:
body_text = e.read().decode()[:120]
except Exception:
pass
return {
"available": False,
"latency": round(time.time() - start, 3),
"error": f"HTTP {e.code}: {body_text}",
"response_preview": None,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"available": False,
"latency": round(time.time() - start, 3),
"error": str(e)[:200],
"response_preview": None,
"timestamp": datetime.now().isoformat(),
}
def test_model_via_openclaw(provider: str, model: str, provider_cfg: dict = None, live: bool = True) -> Dict[str, Any]:
"""
Test a model — live inference by default, config-only check if live=False.
Live mode: sends "hi" and checks the model actually responds (catches auth failures,
unavailable models, quota exhaustion, etc.)
Config-only mode: just verifies the model entry exists (zero cost, but misses real errors).
"""
start = time.time()
# Config existence check (fast, always done first)
config = load_openclaw_config()
providers = config.get("models", {}).get("providers", {})
if provider not in providers:
return {
"available": False,
"latency": round(time.time() - start, 3),
"error": f"Provider not found: {provider}",
"timestamp": datetime.now().isoformat()
}
p_cfg = provider_cfg or providers[provider]
models_list = p_cfg.get("models", [])
model_entry = next((m for m in models_list if m.get("id") == model), None)
if not model_entry:
return {
"available": False,
"latency": round(time.time() - start, 3),
"error": f"Model not found in config: {model}",
"timestamp": datetime.now().isoformat()
}
if not live:
return {
"available": True,
"latency": round(time.time() - start, 3),
"error": None,
"timestamp": datetime.now().isoformat()
}
# OAuth token providers (sk-ant-oat01-*): OpenClaw refreshes these transparently.
# Raw HTTP tests use stale cached tokens → always 401 false negatives. Skip live test.
api_key = p_cfg.get("apiKey", "")
if api_key.startswith("sk-ant-oat01-"):
return {
"available": True,
"latency": 0.0,
"error": None,
"response_preview": "(OAuth — tested via OpenClaw, not raw HTTP)",
"timestamp": datetime.now().isoformat()
}
# Live inference test
return test_model_live(p_cfg, provider, model)
def discover_models(config: Dict[str, Any], tier_filter: Optional[str] = None, live: bool = True) -> Dict[str, Any]:
"""
Discover all models from OpenClaw providers and test availability.
"""
providers = config.get("models", {}).get("providers", {})
router_cfg = load_router_config()
discovered = {
"scan_timestamp": datetime.now().isoformat(),
"total_models": 0,
"available_models": 0,
"unavailable_models": 0,
"providers": {}
}
# Get existing router config for tier info
existing_models = {m["id"]: m for m in router_cfg.get("models", [])}
# Build tier lookup from real OpenClaw config metadata (no hard-coded name patterns)
_tier_lookup = {}
if _CLASSIFIER_AVAILABLE:
try:
classified = classify_from_openclaw_config()
for m in classified:
_tier_lookup[(m["provider"], m["id"])] = m["tier"]
except Exception as e:
print(f"{YELLOW}Warning: tier classifier failed ({e}), falling back to existing tiers{RESET}")
for provider_name, provider_config in providers.items():
print(f"\n{BLUE}Scanning provider: {provider_name}{RESET}")
print("-" * 60)
models = provider_config.get("models", [])
provider_result = {
"name": provider_name,
"models": [],
"available": 0,
"unavailable": 0
}
for model in models:
model_id = model.get("id")
model_name = model.get("name", model_id)
# Skip if tier filter is set and model doesn't match
if tier_filter:
model_tier = existing_models.get(model_id, {}).get("tier")
if model_tier != tier_filter:
continue
discovered["total_models"] += 1
print(f" Testing: {model_name}... ", end="", flush=True)
result = test_model_via_openclaw(provider_name, model_id, provider_cfg=provider_config, live=live)
# Tier from capability classifier (uses real metadata, no name heuristics)
classified_tier = _tier_lookup.get(
(provider_name, model_id),
existing_models.get(model_id, {}).get("tier", "MEDIUM")
)
model_result = {
"id": model_id,
"name": model_name,
"provider": provider_name,
"tier": classified_tier,
"capabilities": model.get("capabilities", []),
"cost": model.get("cost", {}),
"context_window": model.get("contextWindow", 0),
"agentic": model.get("agentic", False),
**result
}
provider_result["models"].append(model_result)
if result["available"]:
preview = result.get("response_preview", "")
preview_str = f' → "{preview}"' if preview else ""
print(f"{GREEN}✓{RESET} ({result['latency']}s{preview_str})")
provider_result["available"] += 1
discovered["available_models"] += 1
else:
print(f"{RED}✗{RESET} ({result.get('error', 'unknown')[:60]})")
provider_result["unavailable"] += 1
discovered["unavailable_models"] += 1
discovered["providers"][provider_name] = provider_result
return discovered
def print_summary(discovered: Dict[str, Any]):
"""Print discovery summary."""
print("\n" + "=" * 60)
print(f"{BLUE}DISCOVERY SUMMARY{RESET}")
print("=" * 60)
print(f"Total models scanned: {discovered['total_models']}")
print(f"{GREEN}Available: {discovered['available_models']}{RESET}")
print(f"{RED}Unavailable: {discovered['unavailable_models']}{RESET}")
print(f"Scan time: {discovered['scan_timestamp']}")
print("\n" + "=" * 60)
print(f"{BLUE}UNAVAILABLE MODELS{RESET}")
print("=" * 60)
unavailable = []
for provider, data in discovered["providers"].items():
for model in data["models"]:
if not model["available"]:
unavailable.append(f" - {model['name']} ({provider}): {model['error']}")
if unavailable:
for item in unavailable:
print(f"{RED}{item}{RESET}")
else:
print(f"{GREEN}All models available!{RESET}")
def update_router_config(discovered: Dict[str, Any]):
"""
Update router config.json with discovered models.
Preserves tier rules, removes unavailable models.
"""
router_cfg = load_router_config()
# Build new models list (only available ones)
new_models = []
for provider, data in discovered["providers"].items():
for model in data["models"]:
if model["available"]:
# Convert discovery format back to router config format
router_model = {
"id": model["id"],
"alias": model["name"].replace(" ", "-"),
"tier": model["tier"],
"provider": provider,
"input_cost_per_m": model["cost"].get("input", 0),
"output_cost_per_m": model["cost"].get("output", 0),
"context_window": model["context_window"],
"capabilities": model["capabilities"],
"agentic": model["agentic"],
"notes": f"Auto-discovered {model['timestamp']}"
}
new_models.append(router_model)
# Preserve pinned models (manual overrides)
existing_models = router_cfg.get("models", [])
for existing in existing_models:
if existing.get("pinned"):
# Keep pinned models even if unavailable
new_models.append(existing)
# Rebuild tiers and routing_rules from capability classifier (no hard-coded names)
if _CLASSIFIER_AVAILABLE:
try:
classified = classify_from_openclaw_config()
# Filter to only available models
available_ids = {(m["provider"], m["id"]) for m in new_models}
available_classified = [
m for m in classified
if (m["provider"], m["id"]) in available_ids
]
tier_cfg = build_tier_config(available_classified)
router_cfg["tiers"] = tier_cfg
# Sync routing_rules from tiers
use_for = {
"SIMPLE": ["monitoring", "status checks", "summarization", "simple API calls",
"memory consolidation", "tweet monitoring", "price alerts", "heartbeat"],
"MEDIUM": ["code fixes", "research", "data analysis", "API integration",
"documentation", "general QA", "moderate complexity"],
"COMPLEX": ["feature development", "architecture", "debugging", "code review",
"multi-step reasoning", "trading strategy"],
"REASONING": ["formal logic", "mathematical proofs", "deep analysis",
"long-horizon planning", "algorithmic design"],
"CRITICAL": ["security review", "production decisions", "financial operations",
"high-stakes analysis"],
}
for tier_name, cfg in tier_cfg.items():
router_cfg["routing_rules"][tier_name] = {
"primary": cfg["primary"],
"fallback_chain": cfg["fallbacks"][:5],
"use_for": use_for.get(tier_name, []),
}
# Apply manual tier_overrides (survive auto-updates)
# Set tier_overrides in config.json to lock a primary regardless of scorer.
overrides = router_cfg.get("tier_overrides", {})
for tier_name, override in overrides.items():
forced = override.get("forced_primary")
if not forced or tier_name not in router_cfg["routing_rules"]:
continue
rule = router_cfg["routing_rules"][tier_name]
if rule["primary"] != forced:
old_primary = rule["primary"]
new_fallback = [old_primary] + [
m for m in rule["fallback_chain"] if m != forced and m != old_primary
]
rule["primary"] = forced
rule["fallback_chain"] = new_fallback[:5]
print(f" Override: {tier_name} primary locked to {forced} (was {old_primary})")
print(f"{GREEN}✓ Tiers and routing_rules rebuilt from capability metadata{RESET}")
for tier, cfg in tier_cfg.items():
print(f" {tier}: {cfg['primary']}")
except Exception as e:
print(f"{YELLOW}Warning: tier rebuild failed ({e}){RESET}")
# Update config
router_cfg["models"] = new_models
router_cfg["last_discovery"] = discovered["scan_timestamp"]
# Write updated config
with open(ROUTER_CONFIG, "w") as f:
json.dump(router_cfg, f, indent=2)
print(f"\n{GREEN}✓ Updated {ROUTER_CONFIG}{RESET}")
print(f" Models: {len(new_models)} (available)")
print(f" Last discovery: {discovered['scan_timestamp']}")
def main():
import argparse
parser = argparse.ArgumentParser(description="Auto-discover working models")
parser.add_argument("--auto-update", action="store_true", help="Update config.json with discovered models")
parser.add_argument("--tier", help="Only test models from specific tier (SIMPLE/MEDIUM/COMPLEX/REASONING/CRITICAL)")
parser.add_argument("--output", help="Output JSON file path", default=str(DISCOVERY_OUTPUT))
parser.add_argument("--no-live", action="store_true", help="Skip live inference tests (config-only check, free)")
args = parser.parse_args()
print(f"{BLUE}Intelligent Router - Model Auto-Discovery{RESET}")
print(f"Scan time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Load configs
config = load_openclaw_config()
# Discover models
live = not args.no_live
if not live:
print(f"{YELLOW}⚡ Config-only mode (--no-live): skipping inference tests{RESET}")
else:
print(f"{BLUE}🔍 Live inference mode: sending 'hi' to each model to verify availability{RESET}")
discovered = discover_models(config, tier_filter=args.tier, live=live)
# Print summary
print_summary(discovered)
# Save discovery results
with open(args.output, "w") as f:
json.dump(discovered, f, indent=2)
print(f"\n{GREEN}✓ Saved discovery results to {args.output}{RESET}")
# Auto-update if requested
if args.auto_update:
update_router_config(discovered)
# Suggest next steps
print(f"\n{YELLOW}Next steps:{RESET}")
print(f" 1. Review updated config: cat {ROUTER_CONFIG}")
print(f" 2. Test router: python3 skills/intelligent-router/scripts/router.py health")
print(f" 3. Commit changes: git add {ROUTER_CONFIG} && git commit -m 'Auto-update model list'")
if __name__ == "__main__":
main()
```
### scripts/fix_tiers.py
```python
#!/usr/bin/env python3
"""
Fix intelligent-router config: assign tiers to all models and rebuild fallback chains.
Run: uv run python skills/intelligent-router/scripts/fix_tiers.py
"""
import json
from pathlib import Path
CONFIG = Path(__file__).parent.parent / "config.json"
# ── Tier definitions ───────────────────────────────────────────────────────────
# SIMPLE: free/cheap/fast — monitoring, summaries, simple tasks
# MEDIUM: mid-range — code fixes, research, analysis
# COMPLEX: high quality — features, architecture, debugging
# REASONING: thinking models — proofs, complex logic
# CRITICAL: best available — security, production decisions
def assign_tier(model: dict) -> str:
provider = model.get("provider", "")
mid = model.get("id", "").lower()
alias = model.get("alias", "").lower()
# CRITICAL — Opus models
if "opus" in mid or "opus" in alias:
return "CRITICAL"
if "nemotron-ultra-253b" in mid or "253b" in mid:
return "CRITICAL"
# REASONING — thinking/reasoning models
if any(x in mid for x in ["r1", "qwq", "thinking", "reasoning", "kimi-k2-thinking", "phi-4-mini-flash-reasoning"]):
return "REASONING"
if "kimi-k2" in mid and "thinking" in mid:
return "REASONING"
# COMPLEX — Sonnet, large capable models
if "sonnet" in mid:
return "COMPLEX"
if "glm-4.7" in mid and provider in ("anthropic-proxy-4", "anthropic-proxy-6", "anthropic-proxy-2", "anthropic-proxy-1"):
return "COMPLEX"
if any(x in mid for x in ["llama-4-maverick", "nemotron-super-49b", "nemotron-51b", "mistral-large"]):
return "COMPLEX"
if "kimi-k2" in mid:
return "COMPLEX"
# SIMPLE — GLM-4.7 via cheap proxies (proxy-4/6 are z.ai, very cheap)
if "glm-4.7" in mid and provider in ("anthropic-proxy-4", "anthropic-proxy-6",
"nvidia-nim", "ollama-gpu-server"):
return "SIMPLE"
# MEDIUM — 70B class, DeepSeek V3, capable mid-range
if any(x in mid for x in ["deepseek-v3", "deepseek-chat", "llama-3.3-70b", "llama-3.1-70b",
"llama-4-scout", "qwen2.5:32b", "llama3.3", "llama-3.3"]):
return "MEDIUM"
if "glm-4.7" in mid and provider in ("anthropic-proxy-5",):
return "MEDIUM"
# SIMPLE — small/free/fast models
if any(x in mid for x in ["glm-4.7-flash", "llama3.2:3b", "qwen2.5:1.5b",
"llama3.2:3b", "llama-3.2-3b", "phi-3.5"]):
return "SIMPLE"
if provider == "ollama" and any(x in mid for x in ["3b", "7b", "1.5b"]):
return "SIMPLE"
# MEDIUM fallback for larger ollama models
if provider == "ollama":
return "MEDIUM"
if provider == "nvidia-nim":
return "MEDIUM"
return "SIMPLE"
def full_id(model: dict) -> str:
"""Return provider/id format for use in tier fallback chains."""
p = model.get("provider", "")
m = model.get("id", "")
if p:
return f"{p}/{m}"
return m
def build_tiers(models: list) -> dict:
"""Build tier configs with primary + fallback chains."""
by_tier: dict[str, list] = {t: [] for t in ["SIMPLE", "MEDIUM", "COMPLEX", "REASONING", "CRITICAL"]}
for m in models:
tier = m["tier"]
if tier in by_tier:
by_tier[tier].append(m)
def tier_cfg(primary_ids: list, fallback_ids: list, description: str) -> dict:
return {
"description": description,
"primary": primary_ids[0] if primary_ids else "",
"fallbacks": primary_ids[1:] + fallback_ids,
}
# Build ordered lists per tier
def ids(tier):
return [full_id(m) for m in by_tier[tier]]
# SIMPLE tier: GPU-server GLM flash first (free local), then proxy-4/6, then others
def simple_sort(m):
pid = full_id(m)
if pid == "ollama-gpu-server/glm-4.7-flash": return 0 # free local
if pid == "anthropic-proxy-4/glm-4.7": return 1 # z.ai key 2
if pid == "anthropic-proxy-6/glm-4.7": return 2 # z.ai key 1
if "glm" in m["id"].lower(): return 3
if "flash" in m["id"].lower(): return 4
return 5
simple_order = sorted(by_tier["SIMPLE"], key=simple_sort)
# MEDIUM tier: DeepSeek V3 first, then 70B models
medium_priority = ["deepseek-v3", "deepseek-chat", "llama-3.3-70b", "llama3.3", "qwen2.5:32b"]
def medium_sort(m):
for i, pat in enumerate(medium_priority):
if pat in m["id"].lower():
return i
return 99
medium_order = sorted(by_tier["MEDIUM"], key=medium_sort)
# COMPLEX tier: OAuth Sonnet first, then other Sonnet, then others
def complex_sort(m):
pid = full_id(m)
if pid == "anthropic/claude-sonnet-4-6": return 0 # OAuth primary
if pid == "anthropic/claude-sonnet-4-5": return 1
if "sonnet-4-6" in m["id"].lower(): return 2
if "sonnet-4-5" in m["id"].lower(): return 3
if "glm-4.7" in m["id"].lower(): return 4
return 5
complex_order = sorted(by_tier["COMPLEX"], key=complex_sort)
# REASONING tier: QwQ/R1-32B first
def reasoning_sort(m):
if "qwq" in m["id"].lower():
return 0
if "r1-distill-qwen-32b" in m["id"].lower():
return 1
if "r1-distill-qwen-14b" in m["id"].lower():
return 2
if "kimi" in m["id"].lower():
return 3
return 4
reasoning_order = sorted(by_tier["REASONING"], key=reasoning_sort)
# CRITICAL tier: Opus 4.6 first
def critical_sort(m):
pid = full_id(m)
if pid == "anthropic/claude-opus-4-6": return 0 # OAuth primary
if pid == "anthropic/claude-opus-4-5": return 1
if "opus-4-6" in m["id"].lower(): return 2
if "opus-4-5" in m["id"].lower(): return 3
if "253b" in m["id"].lower(): return 4
return 5
critical_order = sorted(by_tier["CRITICAL"], key=critical_sort)
return {
"SIMPLE": {
"description": "Monitoring, summaries, checks — free/cheap/fast models",
"primary": full_id(simple_order[0]) if simple_order else "",
"fallbacks": [full_id(m) for m in simple_order[1:]] + [full_id(m) for m in medium_order[:2]],
},
"MEDIUM": {
"description": "Code fixes, research, analysis — mid-range models",
"primary": full_id(medium_order[0]) if medium_order else "",
"fallbacks": [full_id(m) for m in medium_order[1:]] + [full_id(m) for m in complex_order[:1]],
},
"COMPLEX": {
"description": "Features, architecture, debugging — high quality models",
"primary": full_id(complex_order[0]) if complex_order else "",
"fallbacks": [full_id(m) for m in complex_order[1:]] + [full_id(m) for m in critical_order[:1]],
},
"REASONING": {
"description": "Proofs, formal logic, deep analysis — thinking models",
"primary": full_id(reasoning_order[0]) if reasoning_order else "",
"fallbacks": [full_id(m) for m in reasoning_order[1:]] + [full_id(m) for m in complex_order[:1]],
},
"CRITICAL": {
"description": "Security, production, high-stakes — best available models",
"primary": full_id(critical_order[0]) if critical_order else "",
"fallbacks": [full_id(m) for m in critical_order[1:]] + [full_id(m) for m in complex_order[:1]],
},
}
def main():
with open(CONFIG) as f:
config = json.load(f)
models = config.get("models", [])
print(f"Fixing tiers for {len(models)} models...\n")
# Assign tiers
tier_counts = {}
for m in models:
m["tier"] = assign_tier(m)
tier_counts[m["tier"]] = tier_counts.get(m["tier"], 0) + 1
# Print assignments
for tier in ["SIMPLE", "MEDIUM", "COMPLEX", "REASONING", "CRITICAL"]:
print(f" {tier}: {tier_counts.get(tier, 0)} models")
for m in models:
if m["tier"] == tier:
print(f" - {full_id(m)}")
# Build tier configs
config["tiers"] = build_tiers(models)
config["models"] = models
print("\nTier primary models:")
for tier, cfg in config["tiers"].items():
print(f" {tier}: {cfg['primary']}")
if cfg["fallbacks"]:
print(f" fallbacks: {cfg['fallbacks'][:3]}")
with open(CONFIG, "w") as f:
json.dump(config, f, indent=2)
print(f"\n✅ Saved to {CONFIG}")
if __name__ == "__main__":
main()
```
### scripts/provider_health.py
```python
#!/usr/bin/env python3
"""
Provider Health Registry — Proactive Health-Based Routing
Tracks per-provider state so the router can make intelligent decisions
BEFORE sending a request, not after it blows up.
State tracked per provider:
- last_429_at: timestamp of most recent rate-limit error
- consecutive_429s: how many in a row
- active_sessions: count of currently running spawns/crons
- cooldown_until: epoch timestamp, skip provider until this time
- total_failures: lifetime failure counter
Used by spawn_helper.py to skip degraded providers proactively.
Storage: ~/.openclaw/workspace/memory/provider-health.json
"""
from __future__ import annotations
import json
import os
import time
import fcntl
from pathlib import Path
from typing import Any
HEALTH_FILE = Path(__file__).parent.parent.parent.parent / ".openclaw" / "workspace" / "memory" / "provider-health.json"
# Fallback to relative path
if not HEALTH_FILE.parent.exists():
HEALTH_FILE = Path(__file__).parent.parent / "provider-health.json"
# How long to cool down after N consecutive 429s (seconds)
COOLDOWN_SCHEDULE = {
1: 60, # 1st 429 → 1 min cooldown
2: 300, # 2nd → 5 min
3: 900, # 3rd → 15 min
4: 3600, # 4th → 1 hour
}
MAX_COOLDOWN = 3600 * 4 # 4 hours max
# Max concurrent sessions per provider before routing elsewhere
MAX_CONCURRENT_PER_PROVIDER = 1
def _load() -> dict[str, Any]:
if not HEALTH_FILE.exists():
return {}
try:
with open(HEALTH_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
def _save(data: dict[str, Any]) -> None:
HEALTH_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = HEALTH_FILE.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
tmp.replace(HEALTH_FILE)
def _provider_key(model_id: str) -> str:
"""Extract provider prefix from model_id like 'anthropic-proxy-1/claude-sonnet-4-6'."""
return model_id.split("/")[0] if "/" in model_id else model_id
def record_429(model_id: str) -> None:
"""Call this when a provider returns 429. Updates cooldown."""
data = _load()
key = _provider_key(model_id)
now = time.time()
entry = data.get(key, {})
consecutive = entry.get("consecutive_429s", 0) + 1
cooldown_secs = COOLDOWN_SCHEDULE.get(consecutive, MAX_COOLDOWN)
cooldown_until = now + cooldown_secs
entry.update({
"last_429_at": now,
"consecutive_429s": consecutive,
"cooldown_until": cooldown_until,
"total_failures": entry.get("total_failures", 0) + 1,
"last_updated": now,
})
data[key] = entry
_save(data)
def record_success(model_id: str) -> None:
"""Call this on successful response. Resets consecutive 429 counter."""
data = _load()
key = _provider_key(model_id)
now = time.time()
entry = data.get(key, {})
entry.update({
"consecutive_429s": 0,
"cooldown_until": 0,
"last_success_at": now,
"last_updated": now,
})
data[key] = entry
_save(data)
def session_start(model_id: str) -> None:
"""Call when a spawn/session begins on this provider."""
data = _load()
key = _provider_key(model_id)
now = time.time()
entry = data.get(key, {})
active = entry.get("active_sessions", 0) + 1
entry.update({
"active_sessions": active,
"last_updated": now,
})
data[key] = entry
_save(data)
def session_end(model_id: str) -> None:
"""Call when a spawn/session finishes on this provider."""
data = _load()
key = _provider_key(model_id)
now = time.time()
entry = data.get(key, {})
active = max(0, entry.get("active_sessions", 1) - 1)
entry.update({
"active_sessions": active,
"last_updated": now,
})
data[key] = entry
_save(data)
def is_healthy(model_id: str, check_concurrency: bool = True) -> tuple[bool, str]:
"""
Returns (is_healthy, reason).
Use before routing to proactively skip degraded providers.
"""
data = _load()
key = _provider_key(model_id)
entry = data.get(key, {})
now = time.time()
# Check cooldown
cooldown_until = entry.get("cooldown_until", 0)
if cooldown_until > now:
remaining = int(cooldown_until - now)
return False, f"rate-limited, cooldown {remaining}s remaining"
# Check concurrency
if check_concurrency:
active = entry.get("active_sessions", 0)
if active >= MAX_CONCURRENT_PER_PROVIDER:
return False, f"{active} active session(s) on this provider — concurrent limit reached"
return True, "ok"
def get_status(provider_key: str | None = None) -> dict[str, Any]:
"""Get health status for one or all providers."""
data = _load()
now = time.time()
def _enrich(key: str, entry: dict) -> dict:
cooldown_until = entry.get("cooldown_until", 0)
in_cooldown = cooldown_until > now
return {
**entry,
"provider": key,
"in_cooldown": in_cooldown,
"cooldown_remaining_s": max(0, int(cooldown_until - now)) if in_cooldown else 0,
"healthy": not in_cooldown and entry.get("active_sessions", 0) < MAX_CONCURRENT_PER_PROVIDER,
}
if provider_key:
entry = data.get(provider_key, {})
return _enrich(provider_key, entry)
return {k: _enrich(k, v) for k, v in data.items()}
def pick_healthy(candidates: list[str], check_concurrency: bool = True) -> str | None:
"""
Given an ordered list of model_ids, return the first one that is healthy.
Returns None if all are degraded.
"""
for model_id in candidates:
healthy, _ = is_healthy(model_id, check_concurrency=check_concurrency)
if healthy:
return model_id
return None
if __name__ == "__main__":
import sys
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
if cmd == "status":
status = get_status()
if not status:
print("No provider health data yet.")
else:
for key, info in status.items():
icon = "🟢" if info["healthy"] else "🔴"
cooldown = f" (cooldown {info['cooldown_remaining_s']}s)" if info["in_cooldown"] else ""
active = info.get("active_sessions", 0)
fails = info.get("total_failures", 0)
print(f"{icon} {key}: active={active} failures={fails}{cooldown}")
elif cmd == "record-429" and len(sys.argv) > 2:
record_429(sys.argv[2])
print(f"Recorded 429 for {sys.argv[2]}")
elif cmd == "record-success" and len(sys.argv) > 2:
record_success(sys.argv[2])
print(f"Recorded success for {sys.argv[2]}")
elif cmd == "session-start" and len(sys.argv) > 2:
session_start(sys.argv[2])
print(f"Session started on {sys.argv[2]}")
elif cmd == "session-end" and len(sys.argv) > 2:
session_end(sys.argv[2])
print(f"Session ended on {sys.argv[2]}")
elif cmd == "is-healthy" and len(sys.argv) > 2:
healthy, reason = is_healthy(sys.argv[2])
print(f"{'healthy' if healthy else 'DEGRADED'}: {reason}")
sys.exit(0 if healthy else 1)
elif cmd == "pick" and len(sys.argv) > 2:
candidates = sys.argv[2:]
chosen = pick_healthy(candidates)
print(chosen or "NONE")
else:
print("Usage: provider_health.py <status|record-429|record-success|session-start|session-end|is-healthy|pick> [model_id...]")
```
### scripts/router.py
```python
#!/usr/bin/env python3
"""
Intelligent Router CLI
A tool for classifying tasks and recommending appropriate LLM models.
Python 3.8+ compatible, no external dependencies.
Features:
- 15-dimension weighted scoring system
- REASONING tier for formal logic and proofs
- Automatic fallback chains (up to 3 attempts)
- Agentic task detection
- Confidence-based routing
"""
import json
import math
import os
import re
import sys
from pathlib import Path
class IntelligentRouter:
"""Main router class for task classification and model recommendation."""
# Weighted scoring dimensions (15 total, sum = 1.0)
SCORING_WEIGHTS = {
'reasoning_markers': 0.18,
'code_presence': 0.15,
'multi_step_patterns': 0.12,
'agentic_task': 0.10,
'technical_terms': 0.10,
'token_count': 0.08,
'creative_markers': 0.05,
'question_complexity': 0.05,
'constraint_count': 0.04,
'imperative_verbs': 0.03,
'output_format': 0.03,
'simple_indicators': 0.02,
'domain_specificity': 0.02,
'reference_complexity': 0.02,
'negation_complexity': 0.01
}
# Keywords and patterns for each dimension
REASONING_KEYWORDS = [
'prove', 'theorem', 'proof', 'derive', 'derivation', 'formal',
'verify', 'verification', 'logic', 'logical', 'induction', 'deduction',
'lemma', 'corollary', 'axiom', 'postulate', 'qed', 'step by step',
'show that', 'demonstrate that', 'mathematically', 'rigorously'
]
CODE_KEYWORDS = ['lint', 'refactor', 'bug fix', 'code review', 'software', 'application', 'component', 'module', 'package', 'library']
CODE_PATTERNS = [
r'`[^`]+`', # inline code
r'```[\s\S]*?```', # code blocks
r'\bdef\b', r'\bclass\b', r'\bimport\b', r'\bfrom\b',
r'\breturn\b', r'\bif\b.*:\s*$', r'\.py\b', r'\.js\b', r'\.java\b',
r'\.cpp\b', r'\.rs\b', r'\.go\b', r'\bAPI\b', r'\bJSON\b', r'\bSQL\b',
r'\b(python|javascript|java|rust|golang|c\+\+|typescript|ruby|php)\s+\w+',
r'\bwrite\s+.*?(function|code|script|class|method|program)',
r'\bcode\s+(for|to|that)',
r'\bprogram(ming)?\b',
r'\b(coding|development|implementation)\b'
]
AGENTIC_KEYWORDS = [
'run', 'test', 'fix', 'deploy', 'edit', 'build', 'create', 'implement',
'execute', 'refactor', 'migrate', 'integrate', 'setup', 'configure',
'install', 'compile', 'debug', 'troubleshoot'
]
MULTI_STEP_PATTERNS = [
r'\bfirst\b.*\bthen\b', r'\bstep\s+\d+', r'\d+\.\s+\w+', # numbered lists
r'\bnext\b', r'\bafter\s+that\b', r'\bfinally\b', r'\bsubsequently\b',
r'\band then\b', r'\bfollowed by\b', r',\s*then\b', r'\bthen\s+\w+\s+it\b'
]
SIMPLE_INDICATORS = [
'check', 'get', 'fetch', 'list', 'show', 'display', 'status',
'what is', 'how much', 'tell me', 'find', 'search', 'summarize'
]
TECHNICAL_TERMS = [
'algorithm', 'architecture', 'optimization', 'performance', 'scalability',
'database', 'security', 'authentication', 'encryption', 'protocol',
'framework', 'library', 'dependency', 'middleware', 'endpoint',
'microservice', 'container', 'docker', 'kubernetes', 'pipeline'
]
CREATIVE_MARKERS = [
'creative', 'imaginative', 'story', 'poem', 'narrative', 'write a',
'compose', 'brainstorm', 'innovative', 'original', 'artistic'
]
IMPERATIVE_VERBS = [
'analyze', 'evaluate', 'compare', 'assess', 'investigate', 'examine',
'review', 'validate', 'verify', 'optimize', 'improve', 'enhance',
'design', 'architect', 'plan', 'structure', 'model', 'prototype',
'audit', 'inspect', 'assess'
]
CRITICAL_KEYWORDS = [
'security', 'production', 'deploy', 'release', 'financial', 'payment',
'vulnerability', 'exploit', 'breach', 'audit', 'compliance', 'regulatory',
'critical', 'urgent', 'emergency', 'live', 'mainnet'
]
ARCHITECTURE_KEYWORDS = [
'architecture', 'architect', 'design system', 'system design',
'scalable', 'distributed', 'microservices', 'service mesh',
'high availability', 'fault tolerant', 'load balancing',
'api gateway', 'event driven', 'message queue', 'service oriented'
]
CONSTRAINT_KEYWORDS = [
'must', 'should', 'require', 'need', 'constraint', 'limit', 'restriction',
'only', 'exactly', 'precisely', 'specifically', 'without', 'except'
]
# Token estimates for different task complexities
TOKEN_ESTIMATES = {
'SIMPLE': {'input': 500, 'output': 200},
'MEDIUM': {'input': 2000, 'output': 1000},
'COMPLEX': {'input': 5000, 'output': 3000},
'REASONING': {'input': 3000, 'output': 2000},
'CRITICAL': {'input': 8000, 'output': 5000}
}
def __init__(self, config_path=None):
"""Initialize router with config file."""
if config_path is None:
# Default to config.json in the skill directory
script_dir = Path(__file__).parent
config_path = script_dir.parent / 'config.json'
self.config_path = Path(config_path)
self.config = self._load_config()
def _load_config(self):
"""Load and parse configuration file."""
if not self.config_path.exists():
raise FileNotFoundError(
f"Configuration file not found: {self.config_path}\n"
f"Please create a config.json file with your model definitions."
)
try:
with open(self.config_path, 'r') as f:
config = json.load(f)
if 'models' not in config:
raise ValueError("Configuration must contain a 'models' array")
return config
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in config file: {e}")
@staticmethod
def _full_id(model: dict) -> str:
"""Return provider/id string for a model record."""
p = model.get("provider", "")
i = model.get("id", "")
return f"{p}/{i}" if p else i
@staticmethod
def _model_matches(model: dict, lookup_id: str) -> bool:
"""Match a model record by bare id OR full provider/id."""
if model.get("id") == lookup_id:
return True
p = model.get("provider", "")
i = model.get("id", "")
return (f"{p}/{i}" == lookup_id) if p else False
def _find_model(self, models: list, lookup_id: str, default=None):
"""Find a model by bare id or provider/id, return default if not found."""
return next((m for m in models if self._model_matches(m, lookup_id)), default)
def _count_matches(self, text, patterns, use_regex=False):
"""Count pattern matches in text (case-insensitive).
Args:
text: Text to search
patterns: List of patterns (keywords or regex)
use_regex: If True, treat all patterns as regex. If False, treat as keywords.
"""
text_lower = text.lower()
count = 0
for pattern in patterns:
if use_regex:
# Regex pattern
try:
count += len(re.findall(pattern, text, re.IGNORECASE | re.MULTILINE))
except:
# If regex fails, try as keyword
count += text_lower.count(pattern.lower())
else:
# Simple keyword
count += text_lower.count(pattern.lower())
return count
def _calculate_dimension_scores(self, task_description):
"""Calculate scores for all 15 dimensions."""
text = task_description
text_lower = text.lower()
scores = {}
# 1. Reasoning markers (0.18)
reasoning_count = self._count_matches(text, self.REASONING_KEYWORDS)
scores['reasoning_markers'] = min(reasoning_count / 3.0, 1.0)
# 2. Code presence (0.15)
code_count = self._count_matches(text, self.CODE_PATTERNS, use_regex=True) + self._count_matches(text, self.CODE_KEYWORDS)
scores['code_presence'] = min(code_count / 3.0, 1.0)
# 3. Multi-step patterns (0.12)
multi_step_count = self._count_matches(text, self.MULTI_STEP_PATTERNS, use_regex=True)
# Detect multi-component indicators ("with X, Y, and Z" or "across N services")
multi_component_patterns = [r'with\s+\w+[,\s]+\w+\s+and', r'across\s+\d+\s+(services|components|systems)']
multi_step_count += self._count_matches(text, multi_component_patterns, use_regex=True)
scores['multi_step_patterns'] = min(multi_step_count / 2.0, 1.0)
# 4. Agentic task (0.10)
agentic_count = self._count_matches(text, self.AGENTIC_KEYWORDS)
# Architecture design is inherently agentic (multi-step planning)
arch_verbs = ['design', 'architect', 'plan', 'structure']
arch_verb_count = self._count_matches(text, arch_verbs)
if arch_verb_count > 0:
agentic_count += arch_verb_count * 2 # Architecture verbs count double
scores['agentic_task'] = min(agentic_count / 3.0, 1.0)
# 5. Technical terms (0.10)
tech_count = self._count_matches(text, self.TECHNICAL_TERMS)
# Boost for architecture keywords (strong COMPLEX signal)
arch_count = self._count_matches(text, self.ARCHITECTURE_KEYWORDS)
if arch_count > 0:
tech_count += arch_count * 2 # Architecture keywords count double
scores['technical_terms'] = min(tech_count / 4.0, 1.0)
# 6. Token count (0.08) - estimate based on word count
word_count = len(text.split())
token_estimate = word_count * 1.3 # rough estimate
scores['token_count'] = min(token_estimate / 1000.0, 1.0)
# 7. Creative markers (0.05)
creative_count = self._count_matches(text, self.CREATIVE_MARKERS)
scores['creative_markers'] = min(creative_count / 2.0, 1.0)
# 8. Question complexity (0.05)
question_marks = text.count('?')
question_words = len(re.findall(r'\b(who|what|when|where|why|how)\b', text_lower))
scores['question_complexity'] = min((question_marks + question_words) / 3.0, 1.0)
# 9. Constraint count (0.04)
constraint_count = self._count_matches(text, self.CONSTRAINT_KEYWORDS)
scores['constraint_count'] = min(constraint_count / 3.0, 1.0)
# 10. Imperative verbs (0.03)
imperative_count = self._count_matches(text, self.IMPERATIVE_VERBS)
scores['imperative_verbs'] = min(imperative_count / 2.0, 1.0)
# 11. Output format (0.03) - structured output requests
format_patterns = [r'\bjson\b', r'\btable\b', r'\blist\b', r'\bmarkdown\b', r'\bformat\b']
format_count = self._count_matches(text, format_patterns)
scores['output_format'] = min(format_count / 2.0, 1.0)
# 12. Simple indicators (0.02) - inverted (high = simple)
simple_count = self._count_matches(text, self.SIMPLE_INDICATORS)
scores['simple_indicators'] = max(0, 1.0 - min(simple_count / 2.0, 1.0))
# 13. Domain specificity (0.02)
domain_patterns = [r'\b[A-Z]{2,}\b', r'\b\w+\.\w+\b'] # acronyms, dotted notation
domain_count = self._count_matches(text, domain_patterns, use_regex=True)
# Add architecture-specific domain terms
arch_domain_terms = ['kubernetes', 'docker', 'redis', 'kafka', 'rabbitmq',
'graphql', 'grpc', 'rest api', 'websocket', 'oauth']
domain_count += self._count_matches(text, arch_domain_terms)
scores['domain_specificity'] = min(domain_count / 3.0, 1.0)
# 14. Reference complexity (0.02)
ref_patterns = [r'\bthe\s+\w+\s+(?:above|below|mentioned|previous)\b', r'\bthis\s+\w+\b']
ref_count = self._count_matches(text, ref_patterns)
scores['reference_complexity'] = min(ref_count / 2.0, 1.0)
# 15. Negation complexity (0.01)
negation_patterns = [r'\bnot\b', r'\bno\b', r'\bnever\b', r'\bwithout\b', r'\bexcept\b']
negation_count = self._count_matches(text, negation_patterns)
scores['negation_complexity'] = min(negation_count / 3.0, 1.0)
return scores
def _calculate_weighted_score(self, dimension_scores):
"""Calculate final weighted score from dimension scores."""
weighted_sum = 0.0
for dimension, score in dimension_scores.items():
weight = self.SCORING_WEIGHTS.get(dimension, 0.0)
weighted_sum += weight * score
return weighted_sum
def _score_to_confidence(self, score):
"""Convert weighted score to confidence using sigmoid function.
Formula: confidence = 1 / (1 + exp(-8 * (score - 0.5)))
This creates a smooth S-curve:
- score 0.0 → confidence ~0.02
- score 0.25 → confidence ~0.12
- score 0.5 → confidence ~0.50
- score 0.75 → confidence ~0.88
- score 1.0 → confidence ~0.98
"""
return 1.0 / (1.0 + math.exp(-8.0 * (score - 0.5)))
def _classify_by_score(self, score, confidence, is_agentic, dimension_scores=None, task_text=""):
"""Classify task tier based on weighted score and confidence."""
# Check for CRITICAL keywords (security, production, financial)
if task_text:
critical_count = self._count_matches(task_text, self.CRITICAL_KEYWORDS)
if critical_count >= 2:
# Multiple critical keywords → force CRITICAL tier
return 'CRITICAL'
elif critical_count == 1:
# Single critical keyword → boost to at least COMPLEX
if score < 0.5:
score = 0.5
# Check for REASONING tier first (special logic)
# REASONING requires high reasoning_markers score specifically
if dimension_scores and dimension_scores.get('reasoning_markers', 0) >= 0.6:
# Strong reasoning markers detected (prove, theorem, derive, etc.)
# Confidence threshold of ~0.7 (score ~0.6) for REASONING tier
if score >= 0.10 or confidence >= 0.30:
return 'REASONING'
# Check for complex agentic tasks (multi-step + agentic + code)
# Only apply bumps when the raw score shows genuine complexity (>= 0.15).
# Low scores (< 0.15) indicate keyword noise, not real agentic work — leave as SIMPLE.
if is_agentic and dimension_scores and score >= 0.15:
code_score = dimension_scores.get('code_presence', 0)
multi_step = dimension_scores.get('multi_step_patterns', 0)
# Multi-step agentic tasks with code → COMPLEX tier
if multi_step > 0.3 and code_score > 0:
if score < 0.5:
score = 0.5 # Bump to COMPLEX tier
# Genuine agentic tasks → at least MEDIUM (not triggered by substring matches)
elif score < 0.4:
score = 0.4 # Ensure minimum MEDIUM tier
# Score-based classification
if score < 0.3:
return 'SIMPLE'
elif score < 0.5:
return 'MEDIUM'
elif score < 0.75:
return 'COMPLEX'
else:
return 'CRITICAL'
def classify_task(self, task_description, return_details=False):
"""
Classify a task into a tier using 15-dimension weighted scoring.
Args:
task_description: The task to classify
return_details: If True, return detailed scoring breakdown
Returns:
If return_details=False: tier name (SIMPLE/MEDIUM/COMPLEX/REASONING/CRITICAL)
If return_details=True: dict with tier, scores, confidence, and reasoning
"""
# Calculate dimension scores
dimension_scores = self._calculate_dimension_scores(task_description)
# Calculate weighted score
weighted_score = self._calculate_weighted_score(dimension_scores)
# Convert to confidence
confidence = self._score_to_confidence(weighted_score)
# Check if task is agentic (lowered threshold from 0.5 to 0.3)
is_agentic = dimension_scores['agentic_task'] > 0.3 or dimension_scores['multi_step_patterns'] > 0.5
# Classify
tier = self._classify_by_score(weighted_score, confidence, is_agentic, dimension_scores, task_description)
if not return_details:
return tier
return {
'tier': tier,
'confidence': round(confidence, 4),
'weighted_score': round(weighted_score, 4),
'is_agentic': is_agentic,
'dimension_scores': {k: round(v, 3) for k, v in dimension_scores.items()},
'top_dimensions': self._get_top_dimensions(dimension_scores, n=5)
}
def _get_top_dimensions(self, dimension_scores, n=5):
"""Get top N contributing dimensions."""
weighted_contributions = {}
for dim, score in dimension_scores.items():
weighted_contributions[dim] = score * self.SCORING_WEIGHTS[dim]
sorted_dims = sorted(weighted_contributions.items(), key=lambda x: x[1], reverse=True)
return [(dim, round(contrib, 4)) for dim, contrib in sorted_dims[:n]]
def get_models_by_tier(self, tier):
"""Get all models for a specific tier."""
return [
model for model in self.config['models']
if model.get('tier') == tier
]
def recommend_model(self, task_description, use_fallback=False, fallback_index=0):
"""
Classify task and recommend the best model for it.
Args:
task_description: The task to classify
use_fallback: If True, use fallback chain instead of primary
fallback_index: Which fallback in chain to use (0 = first fallback)
Returns:
Dict with tier, recommended model, fallback chain, and reasoning.
"""
# Get detailed classification
classification = self.classify_task(task_description, return_details=True)
tier = classification['tier']
models = self.get_models_by_tier(tier)
if not models:
return {
'tier': tier,
'model': None,
'fallback_chain': [],
'classification': classification,
'reasoning': f"No models configured for {tier} tier"
}
# Get routing rules
routing_rules = self.config.get('routing_rules', {}).get(tier, {})
primary_id = routing_rules.get('primary')
fallback_chain = routing_rules.get('fallback_chain', [])
# Find primary model
primary = None
if primary_id:
primary = self._find_model(models, primary_id, models[0] if models else None)
else:
primary = models[0]
# Determine which model to return
if use_fallback and fallback_chain:
if fallback_index < len(fallback_chain):
fallback_id = fallback_chain[fallback_index]
recommended = self._find_model(self.config['models'], fallback_id, primary)
else:
recommended = primary # Exhausted fallbacks
else:
recommended = primary
return {
'tier': tier,
'model': recommended,
'fallback_chain': fallback_chain,
'classification': classification,
'reasoning': self._explain_tier(tier, classification)
}
def _explain_tier(self, tier, classification):
"""Provide reasoning for tier classification."""
base_explanations = {
'SIMPLE': 'Routine monitoring, status checks, or simple data fetching',
'MEDIUM': 'Moderate complexity tasks like code fixes or research',
'COMPLEX': 'Multi-file development, debugging, or architectural work',
'REASONING': 'Formal logic, mathematical proofs, or step-by-step derivations',
'CRITICAL': 'Security-sensitive, production, or high-stakes operations'
}
explanation = base_explanations.get(tier, 'General purpose task')
# Add top contributing dimensions
if classification and 'top_dimensions' in classification:
top_dims = classification['top_dimensions'][:3]
dim_names = [dim.replace('_', ' ') for dim, _ in top_dims]
explanation += f" (key factors: {', '.join(dim_names)})"
if classification and classification.get('is_agentic'):
explanation += " [Agentic task detected]"
return explanation
def estimate_cost(self, task_description):
"""
Estimate the cost of running a task based on its complexity.
Returns dict with tier, token estimates, and cost breakdown.
"""
classification = self.classify_task(task_description, return_details=True)
tier = classification['tier']
models = self.get_models_by_tier(tier)
if not models:
return {
'tier': tier,
'classification': classification,
'error': f"No models configured for {tier} tier"
}
model = models[0]
tokens = self.TOKEN_ESTIMATES.get(tier, self.TOKEN_ESTIMATES['MEDIUM'])
# Calculate costs (per million tokens → actual tokens)
input_cost = (tokens['input'] / 1_000_000) * model['input_cost_per_m']
output_cost = (tokens['output'] / 1_000_000) * model['output_cost_per_m']
total_cost = input_cost + output_cost
return {
'tier': tier,
'model': model['alias'],
'estimated_tokens': tokens,
'classification': classification,
'costs': {
'input': round(input_cost, 6),
'output': round(output_cost, 6),
'total': round(total_cost, 6)
},
'currency': 'USD'
}
def list_models(self):
"""List all configured models grouped by tier."""
tiers = {}
for model in self.config['models']:
tier = model.get('tier', 'UNKNOWN')
if tier not in tiers:
tiers[tier] = []
tiers[tier].append(model)
return tiers
def health_check(self):
"""Validate configuration file and report health status."""
issues = []
# Check if models exist
if not self.config.get('models'):
issues.append("No models defined in configuration")
# Validate each model
required_fields = ['id', 'alias', 'tier', 'input_cost_per_m', 'output_cost_per_m']
for i, model in enumerate(self.config.get('models', [])):
for field in required_fields:
if field not in model:
issues.append(f"Model {i}: missing required field '{field}'")
# Check tier validity
tier = model.get('tier')
if tier not in ['SIMPLE', 'MEDIUM', 'COMPLEX', 'REASONING', 'CRITICAL']:
issues.append(f"Model {i} ({model.get('id')}): invalid tier '{tier}'")
# Check tier coverage
configured_tiers = set(m.get('tier') for m in self.config.get('models', []))
all_tiers = set(['SIMPLE', 'MEDIUM', 'COMPLEX', 'REASONING', 'CRITICAL'])
missing_tiers = all_tiers - configured_tiers
if missing_tiers:
issues.append(f"Missing models for tiers: {', '.join(sorted(missing_tiers))}")
# Validate fallback chains
routing_rules = self.config.get('routing_rules', {})
for tier, rules in routing_rules.items():
if 'fallback_chain' in rules:
for fallback_id in rules['fallback_chain']:
if not any(self._model_matches(m, fallback_id) for m in self.config.get('models', [])):
issues.append(f"Tier {tier}: fallback model '{fallback_id}' not found in models")
return {
'status': 'healthy' if not issues else 'unhealthy',
'issues': issues,
'model_count': len(self.config.get('models', [])),
'config_path': str(self.config_path)
}
def main():
"""CLI entry point."""
if len(sys.argv) < 2:
print("Intelligent Router CLI v2.0 (with weighted scoring & REASONING tier)")
print("\nUsage:")
print(" router.py classify <task> Classify a task and recommend a model")
print(" router.py models List all configured models by tier")
print(" router.py health Check configuration health")
print(" router.py cost-estimate <task> Estimate cost for a task")
print(" router.py score <task> Show detailed scoring breakdown")
print("\nExamples:")
print(' router.py classify "fix lint errors in utils.js"')
print(' router.py score "prove that sqrt(2) is irrational step by step"')
print(' router.py cost-estimate "build authentication system"')
sys.exit(1)
command = sys.argv[1]
try:
router = IntelligentRouter()
if command == 'classify':
if len(sys.argv) < 3:
print("Error: Task description required")
print('Usage: router.py classify "task description"')
sys.exit(1)
task = ' '.join(sys.argv[2:])
result = router.recommend_model(task)
print(f"Task: {task}")
print(f"\nClassification: {result['tier']}")
print(f"Confidence: {result['classification']['confidence']:.2%}")
print(f"Weighted Score: {result['classification']['weighted_score']:.3f}")
print(f"Reasoning: {result['reasoning']}")
if result['model']:
model = result['model']
print(f"\nRecommended Model:")
print(f" ID: {model['id']}")
print(f" Alias: {model['alias']}")
print(f" Provider: {model['provider']}")
print(f" Cost: ${model['input_cost_per_m']:.2f}/${model['output_cost_per_m']:.2f} per M tokens")
if model.get('agentic'):
print(f" Agentic: Yes")
if 'notes' in model:
print(f" Notes: {model['notes']}")
if result['fallback_chain']:
print(f"\nFallback Chain:")
for i, fb_id in enumerate(result['fallback_chain'], 1):
print(f" {i}. {fb_id}")
else:
print(f"\n⚠️ {result['reasoning']}")
elif command == 'score':
if len(sys.argv) < 3:
print("Error: Task description required")
print('Usage: router.py score "task description"')
sys.exit(1)
task = ' '.join(sys.argv[2:])
classification = router.classify_task(task, return_details=True)
print(f"Task: {task}")
print(f"\nClassification: {classification['tier']}")
print(f"Confidence: {classification['confidence']:.2%}")
print(f"Weighted Score: {classification['weighted_score']:.3f}")
print(f"Agentic Task: {'Yes' if classification['is_agentic'] else 'No'}")
print(f"\nTop Contributing Dimensions:")
for dim, contrib in classification['top_dimensions']:
dim_name = dim.replace('_', ' ').title()
print(f" {dim_name}: {contrib:.4f}")
print(f"\nAll Dimension Scores:")
for dim, score in sorted(classification['dimension_scores'].items(), key=lambda x: x[1], reverse=True):
weight = router.SCORING_WEIGHTS[dim]
dim_name = dim.replace('_', ' ').title()
print(f" {dim_name}: {score:.3f} (weight: {weight:.2f})")
elif command == 'models':
tiers = router.list_models()
print("Configured Models by Tier:\n")
for tier in ['SIMPLE', 'MEDIUM', 'COMPLEX', 'REASONING', 'CRITICAL']:
if tier in tiers:
print(f"{tier}:")
for model in tiers[tier]:
cost_str = f"${model['input_cost_per_m']:.2f}/${model['output_cost_per_m']:.2f}/M"
agentic_flag = " [Agentic]" if model.get('agentic') else ""
print(f" • {model['alias']} ({model['id']}) - {cost_str}{agentic_flag}")
print()
elif command == 'health':
result = router.health_check()
print(f"Configuration Health Check")
print(f"Config: {result['config_path']}")
print(f"Status: {result['status'].upper()}")
print(f"Models: {result['model_count']}")
if result['issues']:
print(f"\nIssues found:")
for issue in result['issues']:
print(f" ⚠️ {issue}")
else:
print("\n✅ Configuration is valid")
elif command == 'cost-estimate':
if len(sys.argv) < 3:
print("Error: Task description required")
print('Usage: router.py cost-estimate "task description"')
sys.exit(1)
task = ' '.join(sys.argv[2:])
result = router.estimate_cost(task)
print(f"Task: {task}")
print(f"\nCost Estimate:")
print(f" Tier: {result['tier']}")
print(f" Confidence: {result['classification']['confidence']:.2%}")
if 'error' in result:
print(f" Error: {result['error']}")
else:
print(f" Model: {result['model']}")
print(f" Estimated Tokens: {result['estimated_tokens']['input']} in / {result['estimated_tokens']['output']} out")
print(f" Input Cost: ${result['costs']['input']:.6f}")
print(f" Output Cost: ${result['costs']['output']:.6f}")
print(f" Total Cost: ${result['costs']['total']:.6f} {result['currency']}")
else:
print(f"Unknown command: {command}")
print("Available commands: classify, score, models, health, cost-estimate")
sys.exit(1)
except FileNotFoundError as e:
print(f"Error: {e}")
sys.exit(1)
except ValueError as e:
print(f"Configuration Error: {e}")
sys.exit(1)
except Exception as e:
print(f"Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()
```
### scripts/router_policy.py
```python
#!/usr/bin/env python3
"""
Router Policy Enforcer v1.0 — Intelligent Router Skill
Pre-write-time enforcement for cron jobs and sub-agent spawns.
Catches bad model assignments BEFORE they are created, not after they fail.
Usage:
# Validate a cron payload JSON
python3 router_policy.py check '{"kind":"agentTurn","model":"ollama-gpu-server/glm-4.7-flash","message":"check server"}'
# Get the correct model for a task (enforced recommendation)
python3 router_policy.py recommend "monitor alphastrike service health"
# Check all current cron jobs for policy violations
python3 router_policy.py audit
# Show known bad models and why
python3 router_policy.py blocklist
Exit codes:
0 = OK / compliant
1 = policy violation found
2 = usage error
"""
import json
import sys
import subprocess
from pathlib import Path
# ── Config ────────────────────────────────────────────────────────────────────
SKILL_DIR = Path(__file__).parent.parent
CONFIG_FILE = SKILL_DIR / "config.json"
# Models that must never be used in isolated cron jobs.
# Reason: these are network-dependent local models that become SPOFs.
BLOCKED_MODELS: dict[str, str] = {
"ollama-gpu-server/glm-4.7-flash":
"GPU server Ollama binds to 127.0.0.1 by default — unreachable over LAN. "
"Use anthropic-proxy-4/glm-4.7 or anthropic-proxy-6/glm-4.7 instead.",
"ollama-gpu-server/qwen2.5:7b":
"Same issue: GPU server Ollama localhost-only by default.",
"ollama/qwen2.5:7b":
"Local Ollama may not be running in isolated cron context. Use NIM fallback.",
"ollama/llama3.3":
"Local Ollama may not be running in isolated cron context. Use NIM fallback.",
"ollama/llama3.2:3b":
"Local Ollama may not be running in isolated cron context. Use NIM fallback.",
}
# Tier → preferred model for cron jobs (cloud-only, no local dependency)
CRON_TIER_MODELS: dict[str, str] = {
"SIMPLE": "anthropic-proxy-6/glm-4.7", # $0.50/M, always available, alternates with proxy-4
"MEDIUM": "nvidia-nim/meta/llama-3.3-70b-instruct", # $0.40/M, capable
"COMPLEX": "anthropic/claude-sonnet-4-6", # $3/M, full coding ability
"REASONING": "nvidia-nim/moonshotai/kimi-k2-thinking", # $1/M, 1T MoE specialist
"CRITICAL": "anthropic/claude-opus-4-6", # $5/M, highest capability
}
# Alternate SIMPLE model to distribute load (use for every other SIMPLE cron)
SIMPLE_ALT = "anthropic-proxy-4/glm-4.7"
# ── Helpers ───────────────────────────────────────────────────────────────────
def load_router_config() -> dict:
if not CONFIG_FILE.exists():
return {}
with open(CONFIG_FILE) as f:
return json.load(f)
def classify_task(task: str) -> str:
"""Call router.py to classify a task. Returns tier string."""
result = subprocess.run(
[sys.executable, str(SKILL_DIR / "scripts" / "router.py"), "classify", task],
capture_output=True, text=True
)
for line in result.stdout.splitlines():
for tier in ("SIMPLE", "MEDIUM", "COMPLEX", "REASONING", "CRITICAL"):
if tier in line:
return tier
return "SIMPLE" # safe default
def check_payload(payload: dict) -> list[str]:
"""
Validate a cron/spawn payload. Returns list of violations (empty = OK).
"""
violations = []
model = payload.get("model", "")
# Rule 1: model must be set
if not model:
violations.append(
"VIOLATION: No model specified. "
"Every cron/spawn must set 'model' explicitly. "
"No model → default Sonnet → expensive waste."
)
return violations # can't check further without a model
# Rule 2: model must not be in blocklist
if model in BLOCKED_MODELS:
reason = BLOCKED_MODELS[model]
task = payload.get("message", "")
tier = classify_task(task) if task else "SIMPLE"
recommended = CRON_TIER_MODELS.get(tier, CRON_TIER_MODELS["SIMPLE"])
violations.append(
f"VIOLATION: Blocked model '{model}'.\n"
f" Reason: {reason}\n"
f" Task tier: {tier}\n"
f" Recommended: {recommended}"
)
# Rule 3: CRITICAL tasks should not use cheap models
task = payload.get("message", "")
if task:
tier = classify_task(task)
if tier == "CRITICAL" and model not in (
"anthropic/claude-opus-4-6",
"anthropic-proxy-1/claude-opus-4-6",
):
violations.append(
f"WARNING: Task classified as CRITICAL but using '{model}'. "
f"Consider anthropic/claude-opus-4-6 for high-stakes tasks."
)
return violations
def recommend_model(task: str, alternate: bool = False) -> dict:
"""
Return the recommended model for a task with full context.
"""
tier = classify_task(task)
model = CRON_TIER_MODELS.get(tier, CRON_TIER_MODELS["SIMPLE"])
if tier == "SIMPLE" and alternate:
model = SIMPLE_ALT
return {
"tier": tier,
"model": model,
"task": task,
"note": f"Cron-safe cloud model. No local GPU dependency.",
}
# ── Commands ──────────────────────────────────────────────────────────────────
def cmd_check(payload_json: str) -> int:
try:
payload = json.loads(payload_json)
except json.JSONDecodeError as e:
print(f"ERROR: Invalid JSON — {e}", file=sys.stderr)
return 2
violations = check_payload(payload)
if violations:
for v in violations:
print(v)
return 1
model = payload.get("model", "")
print(f"✅ OK — model '{model}' is policy-compliant.")
return 0
def cmd_recommend(task: str, alternate: bool = False) -> int:
result = recommend_model(task, alternate=alternate)
print(f"Tier: {result['tier']}")
print(f"Model: {result['model']}")
print(f"Note: {result['note']}")
return 0
def cmd_audit() -> int:
"""
Audit all current OpenClaw cron jobs for policy violations.
Reads crons via openclaw CLI.
"""
try:
result = subprocess.run(
["openclaw", "cron", "list", "--json"],
capture_output=True, text=True, timeout=15
)
if result.returncode != 0:
# Fallback: try reading via gateway API directly
print("WARNING: openclaw CLI not available. Run from inside OpenClaw session.")
return 2
crons = json.loads(result.stdout).get("jobs", [])
except (json.JSONDecodeError, FileNotFoundError, subprocess.TimeoutExpired) as e:
print(f"ERROR: Could not load cron list — {e}")
return 2
violations_found = 0
for cron in crons:
payload = cron.get("payload", {})
if payload.get("kind") != "agentTurn":
continue
violations = check_payload(payload)
if violations:
violations_found += 1
name = cron.get("name", cron.get("id", "unknown"))
cron_id = cron.get("id", "?")
print(f"\n[CRON] {name} ({cron_id[:8]}...)")
for v in violations:
print(f" {v}")
if violations_found == 0:
print(f"✅ All {len(crons)} cron jobs are policy-compliant.")
else:
print(f"\n⚠️ {violations_found} cron job(s) have policy violations.")
return 1 if violations_found > 0 else 0
def cmd_blocklist() -> int:
print("Blocked models (never use in cron/spawn payloads):\n")
for model, reason in BLOCKED_MODELS.items():
print(f" ❌ {model}")
print(f" {reason}\n")
return 0
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
args = sys.argv[1:]
if not args or args[0] in ("-h", "--help"):
print(__doc__)
return 0
cmd = args[0]
if cmd == "check":
if len(args) < 2:
print("Usage: router_policy.py check '<json_payload>'", file=sys.stderr)
return 2
return cmd_check(args[1])
elif cmd == "recommend":
if len(args) < 2:
print("Usage: router_policy.py recommend 'task description'", file=sys.stderr)
return 2
alternate = "--alt" in args
return cmd_recommend(args[1], alternate=alternate)
elif cmd == "audit":
return cmd_audit()
elif cmd == "blocklist":
return cmd_blocklist()
else:
print(f"Unknown command: {cmd}", file=sys.stderr)
print("Commands: check | recommend | audit | blocklist", file=sys.stderr)
return 2
if __name__ == "__main__":
sys.exit(main())
```
### scripts/setup_discovery_cron.sh
```bash
#!/bin/bash
# Setup hourly model discovery cron job
set -e
echo "Setting up hourly model discovery cron..."
# Create cron job via OpenClaw
cat > /tmp/model-discovery-cron.json << 'EOF'
{
"name": "Model Discovery Refresh",
"schedule": {
"kind": "every",
"everyMs": 3600000
},
"payload": {
"kind": "systemEvent",
"text": "Run: bash skills/intelligent-router/scripts/auto_refresh_models.sh",
"model": "ollama/glm-4.7-flash"
},
"sessionTarget": "main",
"enabled": true
}
EOF
# Add cron job
if command -v openclaw &> /dev/null; then
openclaw cron add --job /tmp/model-discovery-cron.json
echo "✓ Cron job added: Model Discovery Refresh (every hour)"
rm /tmp/model-discovery-cron.json
else
echo "⚠️ OpenClaw CLI not found. Please run manually:"
echo " openclaw cron add --job /tmp/model-discovery-cron.json"
fi
echo ""
echo "Next steps:"
echo " 1. Run discovery now: python3 skills/intelligent-router/scripts/discover_models.py --auto-update"
echo " 2. Check results: cat skills/intelligent-router/discovered-models.json"
echo " 3. Monitor cron: openclaw cron list"
```
### scripts/spawn_helper.py
```python
#!/usr/bin/env python3
"""
Intelligent Router - Spawn Helper (Enforced Core Skill)
MANDATORY: Call this before ANY sessions_spawn or cron job creation.
It classifies the task and outputs the exact model to use.
Usage (show command):
python3 skills/intelligent-router/scripts/spawn_helper.py "task description"
Usage (just get model id):
python3 skills/intelligent-router/scripts/spawn_helper.py --model-only "task description"
Usage (validate payload has model set):
python3 skills/intelligent-router/scripts/spawn_helper.py --validate '{"kind":"agentTurn","message":"..."}'
"""
import sys
import json
import subprocess
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent
CONFIG_FILE = SCRIPT_DIR.parent / "config.json"
TIER_COLORS = {
"SIMPLE": "🟢",
"MEDIUM": "🟡",
"COMPLEX": "🟠",
"REASONING": "🔵",
"CRITICAL": "🔴",
}
def load_config():
if not CONFIG_FILE.exists():
raise FileNotFoundError(f"Router config not found: {CONFIG_FILE}")
with open(CONFIG_FILE) as f:
return json.load(f)
_CODING_PATTERNS = [
# direct fix/debug/implement verbs
r"\bimplement\b", r"\brefactor\b", r"\bdebug\b", r"\bfix\b",
r"\bwrite\s+\w+\s+(code|script|function|class|module|test)",
# code artifacts
r"\bcode\b", r"\bbugs?\b", r"\bunit\s+test", r"\bintegration\s+test",
r"\btdd\b", r"\btest\s+(coverage|suite|passing)\b",
r"\btests?\s+pass", r"\bpytest\b", r"\brspec\b",
# languages / ecosystems
r"\bpython\b", r"\btypescript\b", r"\bjavascript\b", r"\breact\b",
r"\brust\b", r"\bgolang\b", r"\bgo\s+module", r"\bpallet\b",
r"\bsmart\s+contract", r"\bsolidity\b",
# structural terms
r"\bapi\s+(client|server|endpoint)", r"\bmicroservice",
r"\bwire\s+(up|into)", r"\brepo\b", r"\brepository\b",
r"\bcoverage\b", r"\blint\b", r"\bmypy\b", r"\bruff\b",
r"\bpyproject\b", r"\bcargo\b", r"\bpackage\.json\b",
]
def _is_coding_task(task_description: str) -> bool:
"""Return True if task description has clear coding intent."""
import re
text = task_description.lower()
for pattern in _CODING_PATTERNS:
if re.search(pattern, text):
return True
return False
# [llmfit-integration-start]
# Hardware fitness filtering for fallback chains.
# Added by: uv run python skills/llmfit/scripts/integrate.py
# Do NOT remove the marker comments — they allow re-patching to be idempotent.
import functools as _functools
_HARDWARE_FITS_FILE = Path(__file__).parent.parent.parent / "llmfit" / "data" / "hardware_fits.json"
_DEPRIORITIZE_FITS = {"marginal", "none"} # fits to push to end of fallback chain
@_functools.lru_cache(maxsize=1)
def _load_hardware_fits() -> dict:
"""Load llmfit hardware fitness cache (cached for process lifetime)."""
if not _HARDWARE_FITS_FILE.exists():
return {}
try:
with open(_HARDWARE_FITS_FILE) as _f:
return json.load(_f)
except Exception:
return {}
def get_hardware_fit(model_id: str) -> str:
"""
Return the canonical fit string for a model_id (e.g. "good", "marginal", "none").
Looks up the hardware_fit field added by integrate.py in config.json.
Falls back to "unknown" if not found.
"""
try:
cfg = load_config()
for entry in cfg.get("models", []):
eid = entry.get("id", "")
provider = entry.get("provider", "")
# Match by full "provider/id" or bare "id"
full_id = f"{provider}/{eid}" if provider else eid
if model_id in (eid, full_id) or full_id.endswith(model_id):
hw = entry.get("hardware_fit", {})
return hw.get("fit", "unknown")
except Exception:
pass
return "unknown"
def rerank_fallback_chain(chain: list) -> list:
"""
Move models with fit="marginal" or fit="none" to the end of the fallback chain.
Models with unknown/good/perfect fit keep their original order.
This does NOT remove any models — just reranks for hardware awareness.
"""
fits = [(model_id, get_hardware_fit(model_id)) for model_id in chain]
preferred = [mid for mid, fit in fits if fit not in _DEPRIORITIZE_FITS]
deprioritized = [mid for mid, fit in fits if fit in _DEPRIORITIZE_FITS]
return preferred + deprioritized
# [llmfit-integration-end]
text = task_description.lower()
return any(re.search(p, text) for p in _CODING_PATTERNS)
def _get_complex_primary() -> str:
"""Return the forced COMPLEX primary from tier_overrides, or config primary."""
try:
with open(CONFIG_FILE) as f:
cfg = json.load(f)
override = cfg.get("tier_overrides", {}).get("COMPLEX", {})
if override.get("forced_primary"):
return override["forced_primary"]
return cfg.get("routing_rules", {}).get("COMPLEX", {}).get("primary", "")
except Exception:
return ""
def classify_task(task_description):
"""Run router.py classify and return (tier, full_model_id, confidence).
full_model_id is always provider/id (e.g. 'ollama-gpu-server/glm-4.7-flash'),
which is the format required by sessions_spawn(model=...) and cron payloads.
User override: coding tasks always route to COMPLEX (Sonnet 4.6 per tier_overrides).
"""
result = subprocess.run(
[sys.executable, str(SCRIPT_DIR / "router.py"), "classify", task_description],
capture_output=True, text=True, check=True
)
lines = result.stdout.strip().split('\n')
tier = None
bare_id = None
provider = None
confidence = None
for line in lines:
if line.startswith("Classification:"):
tier = line.split(":", 1)[1].strip()
elif " ID:" in line:
bare_id = line.split(":", 1)[1].strip()
elif " Provider:" in line:
provider = line.split(":", 1)[1].strip()
elif line.startswith("Confidence:"):
confidence = line.split(":", 1)[1].strip()
# Combine provider + id for the full model identifier
if bare_id and provider:
model_id = f"{provider}/{bare_id}"
else:
model_id = bare_id
# User rule: ALL coding tasks → COMPLEX (Sonnet 4.6 via tier_overrides)
if tier in ("SIMPLE", "MEDIUM") and _is_coding_task(task_description):
complex_primary = _get_complex_primary()
if complex_primary:
tier = "COMPLEX"
model_id = complex_primary
confidence = "OVERRIDE (coding task → COMPLEX)"
return tier, model_id, confidence
def validate_payload(payload_json):
"""
Validate a cron job payload has the model field set.
Returns (ok: bool, message: str)
"""
try:
payload = json.loads(payload_json) if isinstance(payload_json, str) else payload_json
except json.JSONDecodeError as e:
return False, f"Invalid JSON payload: {e}"
if payload.get("kind") != "agentTurn":
return True, "Non-agentTurn payload — model not required"
model = payload.get("model")
if not model:
return False, (
"❌ VIOLATION: agentTurn payload missing 'model' field!\n"
" Without model, OpenClaw defaults to Sonnet = expensive waste.\n"
" Fix: add \"model\": \"<model-id>\" to the payload.\n"
" Run: python3 skills/intelligent-router/scripts/spawn_helper.py \"<task>\" to get the right model."
)
# Check if Sonnet/Opus is used for a non-critical payload
expensive = ["claude-sonnet", "claude-opus", "claude-3"]
for keyword in expensive:
if keyword in model.lower():
msg = payload.get("message", "")[:80]
return None, (
f"⚠️ WARNING: Expensive model '{model}' set for potentially simple task.\n"
f" Task preview: {msg}...\n"
f" Consider: python3 skills/intelligent-router/scripts/spawn_helper.py \"{msg}\""
)
return True, f"✅ Model set: {model}"
def _health_check_and_reroute(model_id: str, config: dict, tier: str) -> str:
"""
Proactive health-based routing.
If the chosen provider is rate-limited or has too many active sessions,
walk the fallback chain until we find a healthy one.
Returns the best healthy model_id available.
"""
try:
from provider_health import is_healthy, pick_healthy
except ImportError:
sys.path.insert(0, str(SCRIPT_DIR))
try:
from provider_health import is_healthy, pick_healthy
except ImportError:
return model_id # health module not available, pass through
healthy, reason = is_healthy(model_id)
if healthy:
return model_id
# Primary is degraded — walk fallback chain
rules = config.get("routing_rules", {}).get(tier, {})
fallback_chain = rules.get("fallback_chain", [])
candidates = [model_id] + fallback_chain
chosen = pick_healthy(candidates)
if chosen and chosen != model_id:
print(f"⚠️ {model_id.split('/')[0]} is degraded ({reason})", file=sys.stderr)
print(f" → Rerouting to: {chosen}", file=sys.stderr)
return chosen
# All degraded — return original and let caller handle failure
print(f"⚠️ All providers degraded for tier {tier}, using {model_id} anyway", file=sys.stderr)
return model_id
def main():
args = sys.argv[1:]
if not args:
print(__doc__)
sys.exit(1)
# --validate mode
if args[0] == "--validate":
if len(args) < 2:
print("Usage: spawn_helper.py --validate '<payload_json>'")
sys.exit(1)
ok, msg = validate_payload(args[1])
print(msg)
sys.exit(0 if ok else 1)
# --model-only mode (just print the model id)
if args[0] == "--model-only":
if len(args) < 2:
print("Usage: spawn_helper.py --model-only 'task description'")
sys.exit(1)
task = " ".join(args[1:])
config = load_config()
tier, model_id, _ = classify_task(task)
if not model_id:
rules = config.get("routing_rules", {}).get(tier, {})
model_id = rules.get("primary", "anthropic-proxy-4/glm-4.7")
# Health check — skip degraded providers proactively
model_id = _health_check_and_reroute(model_id, config, tier)
print(model_id)
sys.exit(0)
# Default: classify and show spawn command
task = " ".join(args)
config = load_config()
tier, model_id, confidence = classify_task(task)
if not model_id:
rules = config.get("routing_rules", {}).get(tier, {})
model_id = rules.get("primary", "anthropic-proxy-4/glm-4.7")
# Health check — skip degraded providers proactively
model_id = _health_check_and_reroute(model_id, config, tier)
icon = TIER_COLORS.get(tier, "⚪")
fallback_chain = config.get("routing_rules", {}).get(tier, {}).get("fallback_chain", [])
fallback_chain = rerank_fallback_chain(fallback_chain) # [llmfit-rerank-applied]
print(f"\n{icon} Task classified as: {tier} (confidence: {confidence})")
print(f"💰 Recommended model: {model_id}")
if fallback_chain:
print(f"🔄 Fallbacks: {' → '.join(fallback_chain[:2])}")
print(f"\n📋 Use in sessions_spawn:")
print(f""" sessions_spawn(
task=\"{task[:60]}{'...' if len(task)>60 else ''}\",
model=\"{model_id}\",
label=\"<label>\"
)""")
print(f"\n📋 Use in cron job payload:")
print(f""" {{
"kind": "agentTurn",
"message": "...",
"model": "{model_id}"
}}""")
if __name__ == "__main__":
main()
```
### scripts/tier_classifier.py
```python
#!/usr/bin/env python3
"""
Capability-Based Tier Classifier v2.0
Assigns model tiers using multiple quality signals from provider metadata.
NO hard-coded name-based tier heuristics.
## Why v1 Was Wrong
v1 used cost as the primary capability signal:
cost < $0.60/M → SIMPLE
This breaks in 2026 where cheap SOTA models (DeepSeek V3.2 at $0.40/M, Llama-4-Maverick at
$0.40/M) are equal or better than expensive models from 2024. Cost ≠ capability.
## v2 Approach
Four signals, combined into a single capability_score [0–1]:
1. effective_params — extracted from model ID (7b, 70b, 405b, MoE-adjusted)
Largest single signal. Bigger = more capable.
2. context_window — larger window = more capable (long doc, multi-file)
3. reasoning_flag — model is a dedicated reasoning/thinking specialist
4. cost_input — expensive = likely quality (weak signal, last resort fallback)
SIMPLE tier is ONLY for:
- Local/Ollama models (zero API cost, free to use)
- Tiny cloud models (< 10B effective params AND cost < $0.30/M)
Everything else is MEDIUM or above based on capability_score.
"Pick the top ones": within each tier, models are ranked by score. Primary = best.
Routing prefers CHEAPER model within same tier for cost efficiency.
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from collections import defaultdict
# ── Tier Capability Score Thresholds ──────────────────────────────────────────
# Scores are normalised [0, 1]. Boundaries tuned to real-world model landscape.
TIER_THRESHOLDS = {
# SIMPLE: local/free OR tiny (< 10B, cheap). Score 0.0–0.24
# MEDIUM: capable mid-range (13B–70B cloud). Score 0.25–0.49
# COMPLEX: high quality (70B+ cloud, Sonnet-class). Score 0.50–0.79
# REASONING: specialist thinking models. Always REASONING regardless of score
# CRITICAL: flagship (Opus-class, >$8/M). Score 0.80+
}
SIMPLE_PARAMS_MAX = 10 # < 10B effective params → candidate for SIMPLE
SIMPLE_COST_MAX = 0.30 # also must be cheap
CRITICAL_COST_MIN = 8.0 # ≥ $8/M input → CRITICAL
COMPLEX_SCORE_MIN = 0.50 # score ≥ 0.50 → COMPLEX
MEDIUM_SCORE_MIN = 0.25 # score ≥ 0.25 → MEDIUM
# Context window large enough to imply high capability
CONTEXT_LARGE = 200_000 # e.g. 200K+ → adds to score
CONTEXT_HUGE = 500_000 # 500K+ → strong COMPLEX signal
# ── Reasoning-Specialist Detection ────────────────────────────────────────────
# Identifies models BUILT for reasoning (not just ones that support thinking mode).
# Kept minimal and specific — only patterns that are unambiguous reasoning-only models.
REASONING_SPECIALIST_PATTERNS = [
r"\br1\b", # DeepSeek R1 family (r1, r1-distill)
r"\bqwq\b", # Qwen QwQ (thinking-only model)
r"-thinking\b", # "kimi-k2-thinking", "qwen3-next-...-thinking"
r"\breasoning\b", # "phi-4-mini-flash-reasoning"
r"^r\d", # starts with r + digit (r1, r2)
]
LOCAL_PROVIDERS = {"ollama", "ollama-gpu-server"}
# Path to known-model param lookup (model ID regex → effective param count in B)
_KNOWN_PARAMS_FILE = Path(__file__).parent.parent / "known-model-params.json"
def _load_known_params() -> dict:
"""Load known model parameter counts from companion JSON file."""
if _KNOWN_PARAMS_FILE.exists():
with open(_KNOWN_PARAMS_FILE) as f:
return json.load(f)
return {}
def extract_effective_params(model_id: str, cost_input: float = 0.0) -> float:
"""
Extract effective parameter count (billions) from model ID.
Handles:
- Dense: 7b, 13b, 70b, 405b
- MoE: 8x7b → 56B dense-equiv (MoE actual active ~12.5B — we use 0.25x multiplier for density)
- Sub-billion: 0.5b, 3.8b
- Shorthand: 32b, 253b
When param count cannot be extracted from ID (closed models like Claude, GLM),
estimates from cost bracket as a fallback. This only fires for truly unknown sizes.
"""
mid = model_id.lower()
# MoE pattern: NxMb (e.g. 8x7b, 8x22b)
moe = re.search(r"(\d+)x(\d+\.?\d*)b", mid)
if moe:
experts = int(moe.group(1))
per_expert = float(moe.group(2))
# MoE effective quality ≈ total params at 0.4x (active params ratio is higher than 1/N)
return experts * per_expert * 0.4
# Dense: extract largest param count in ID
matches = re.findall(r"(\d+\.?\d*)b(?!\w)", mid)
if matches:
return max(float(m) for m in matches)
# Check known-model param table (ground truth for models where ID has no param count).
# This is a curated list of real published sizes — NOT tier heuristics.
# Add entries here when you know the real param count for a closed-source model.
KNOWN_PARAMS = _load_known_params()
for pattern, params in KNOWN_PARAMS.items():
if re.search(pattern, mid):
return params
# Unknown ID with no known params — estimate from cost bracket.
# Cost is the only remaining proxy for closed models (Claude, GLM, etc.).
# $8+/M → flagship (Opus-class, ~400B+) → 400B
# $3+/M → advanced (Sonnet-class, ~200B+) → 200B
# $1+/M → capable (mid-range, ~50B+) → 50B
# $0.3+/M → efficient (GLM-class, ~15B) → 15B
# <$0.3/M → micro or free → 7B
if cost_input >= 8.0:
return 400.0
elif cost_input >= 3.0:
return 200.0
elif cost_input >= 1.0:
return 50.0
elif cost_input >= 0.3:
return 15.0
else:
return 7.0
def is_reasoning_specialist(model_id: str, reasoning_flag: bool) -> bool:
"""True only for models DESIGNED for reasoning, not general models with thinking mode."""
if not reasoning_flag:
return False
mid = model_id.lower()
return any(re.search(p, mid) for p in REASONING_SPECIALIST_PATTERNS)
def capability_score(
model_id: str,
context_window: int,
cost_input: float,
reasoning: bool,
is_local: bool,
effective_params: float,
) -> float:
"""
Compute a normalised capability score [0, 1].
Weights (sum = 1.0):
effective_params: 0.50 — single strongest signal
context_window: 0.20 — long context = more capable
cost_input: 0.20 — expensive = likely quality (weak but universal)
reasoning: 0.10 — dedicated reasoning specialist bonus
"""
# Params score: log-scale normalised to [0, 1] anchored at 405B = 1.0
if effective_params > 0:
import math
# log2(1) = 0, log2(405) ≈ 8.66
params_score = min(math.log2(max(effective_params, 1)) / math.log2(405), 1.0)
else:
params_score = 0.3 # unknown size — assume mid-range
# Context score
ctx_score = min(context_window / 1_000_000, 1.0) # 1M ctx = 1.0
# Cost score: proxy for quality, log-scale, $100/M = 1.0
if is_local:
cost_score = 0.0 # local = free; use as SIMPLE signal, not quality
else:
import math
cost_score = min(math.log1p(cost_input) / math.log1p(100), 1.0)
# Reasoning bonus
reasoning_score = 0.5 if is_reasoning_specialist(model_id, reasoning) else 0.0
score = (
0.50 * params_score +
0.20 * ctx_score +
0.20 * cost_score +
0.10 * reasoning_score
)
return round(score, 4)
def assign_tier(
model_id: str,
context_window: int,
cost_input: float,
reasoning: bool,
is_local: bool,
effective_params: float,
cap_score: float,
) -> str:
"""
Assign tier based on capability score + hard rules.
Evaluated in order (first match wins).
"""
# CRITICAL: flagship cost (≥ $8/M) — regardless of score
if cost_input >= CRITICAL_COST_MIN:
return "CRITICAL"
# CRITICAL: huge context flagship (e.g. Opus with 1M ctx)
if context_window >= CONTEXT_HUGE and cost_input >= 3.0:
return "CRITICAL"
# REASONING: dedicated thinking/specialist models
if is_reasoning_specialist(model_id, reasoning):
return "REASONING"
# SIMPLE: local (always free) — regardless of quality
if is_local:
return "SIMPLE"
# SIMPLE: tiny cloud models (< 10B AND cheap)
if effective_params > 0 and effective_params < SIMPLE_PARAMS_MAX and cost_input < SIMPLE_COST_MAX:
return "SIMPLE"
# Score-based classification for everything else
if cap_score >= 0.70:
return "COMPLEX"
if cap_score >= COMPLEX_SCORE_MIN:
return "COMPLEX"
if cap_score >= MEDIUM_SCORE_MIN:
return "MEDIUM"
# Fallthrough: very cheap cloud models with unknown size → MEDIUM
# (we never put unknowns in SIMPLE — too risky for quality)
return "MEDIUM"
def score_model(
provider: str,
model_id: str,
context_window: int,
cost_input: float,
cost_output: float,
reasoning: bool,
is_local: bool,
) -> dict:
"""
Full scoring pipeline for one model.
Returns {"tier": str, "score": float, "signals": dict}
"""
effective_params = extract_effective_params(model_id, cost_input)
cap = capability_score(
model_id, context_window, cost_input, reasoning, is_local, effective_params
)
tier = assign_tier(
model_id, context_window, cost_input, reasoning, is_local, effective_params, cap
)
signals = {
"provider": provider,
"model_id": model_id,
"effective_params_b": effective_params,
"cost_input": cost_input,
"context_window": context_window,
"reasoning_flag": reasoning,
"is_local": is_local,
"is_reasoning_specialist": is_reasoning_specialist(model_id, reasoning),
"capability_score": cap,
}
return {"tier": tier, "score": cap, "signals": signals}
def classify_from_openclaw_config(config_path: str = None) -> list[dict]:
"""
Read all model metadata from OpenClaw config and classify every model.
Returns list of model dicts with tier assigned from real capability signals.
"""
if config_path is None:
config_path = Path.home() / ".openclaw" / "openclaw.json"
with open(config_path) as f:
oc_config = json.load(f)
providers = oc_config.get("models", {}).get("providers", {})
classified = []
for provider_name, provider_cfg in providers.items():
is_local = provider_name in LOCAL_PROVIDERS
base_url = provider_cfg.get("baseUrl", "")
for model in provider_cfg.get("models", []):
model_id = model.get("id", "")
context_window = model.get("contextWindow", 8192)
cost = model.get("cost", {})
cost_input = cost.get("input", 0.0)
cost_output = cost.get("output", 0.0)
reasoning = model.get("reasoning", False)
result = score_model(
provider=provider_name,
model_id=model_id,
context_window=context_window,
cost_input=cost_input,
cost_output=cost_output,
reasoning=reasoning,
is_local=is_local,
)
classified.append({
"id": model_id,
"alias": model.get("name", model_id),
"provider": provider_name,
"base_url": base_url,
"tier": result["tier"],
"score": result["score"],
"context_window": context_window,
"input_cost_per_m": cost_input,
"output_cost_per_m": cost_output,
"reasoning": reasoning,
"is_local": is_local,
"modalities": model.get("input", ["text"]),
"capabilities": ["agentic"] if model.get("agentic") else [],
"effective_params_b": result["signals"]["effective_params_b"],
"signals": result["signals"],
})
return classified
# ── Provider preference for primary selection ──────────────────────────────────
# Lower = more preferred. Used as tiebreaker within same score+tier.
PROVIDER_PREFERENCE = {
"ollama-gpu-server": 0, # dedicated local GPU — most preferred for SIMPLE
"anthropic": 1, # OAuth — most reliable for COMPLEX/CRITICAL
"anthropic-proxy-1": 2,
"anthropic-proxy-4": 3, # z.ai cheap proxy (good for SIMPLE cloud fallback)
"anthropic-proxy-6": 4,
"nvidia-nim": 5, # NIM — good coverage across tiers
"anthropic-proxy-2": 6,
"anthropic-proxy-5": 7,
"ollama": 8, # local CPU — slow, lowest priority
}
def build_tier_config(classified: list[dict]) -> dict:
"""
Build per-tier routing config from classified models.
For each tier:
- Primary = highest capability score model (with provider preference as tiebreaker)
- Fallbacks = remaining models sorted score desc, with cost preference as secondary sort
"Top model" selection: score is the primary sort. Within same score bracket (±0.05),
prefer the cheaper model (cost efficiency). This surfaces the best quality at lowest cost.
"""
by_tier = defaultdict(list)
for m in classified:
by_tier[m["tier"]].append(m)
def full_id(m: dict) -> str:
p = m.get("provider", "")
i = m.get("id", "")
return f"{p}/{i}" if p else i
def is_vision_only(m: dict) -> bool:
mid = m.get("id", "").lower()
modalities = m.get("modalities", ["text"])
return "vision" in mid and "text" not in modalities
def sort_key_for_tier(tier: str, m: dict):
"""
Sorting key per tier to pick the best primary + fallback ordering.
SIMPLE:
- Local GPU first (free, fast)
- Then cheap cloud text models (no vision-only)
- Sort: (is_local_desc, is_vision_asc, cost_asc, score_desc)
MEDIUM/COMPLEX:
- Best score first
- Within ±0.05 score bracket, prefer cheaper (cost efficiency)
- Provider reliability as final tiebreaker
REASONING:
- Best score (bigger reasoning models = better)
CRITICAL:
- OAuth first (most reliable for prod)
- Then by score
"""
score = m.get("score", 0.0)
cost = m.get("input_cost_per_m", 0.0)
pref = PROVIDER_PREFERENCE.get(m.get("provider", ""), 99)
if tier == "SIMPLE":
# Prefer ≥7B models over tiny ones — sub-7B can't reliably do
# agent work (tool calls, Telegram, monitoring scripts).
params = m.get("effective_params_b", 0) or 0
is_tiny = 1 if params < 7 else 0
vision_penalty = 1 if is_vision_only(m) else 0
local_bonus = 0 if m.get("is_local") else 1
return (is_tiny, local_bonus, vision_penalty, cost, pref, -score)
if tier == "CRITICAL":
# OAuth providers first, then by score desc
return (pref, -score)
# MEDIUM: score bracket 0.10 — within same quality band, pick cheapest
if tier == "MEDIUM":
score_bracket = round(score / 0.10) * 0.10
return (-score_bracket, cost, pref)
# COMPLEX: wider 0.15 bracket so provider reliability beats marginal score gaps.
# Within bracket: reliability (provider pref) FIRST, then newer version, then cost.
if tier == "COMPLEX":
score_bracket = round(score / 0.15) * 0.15
# Extract version for tiebreaking (prefer newer: 4.6 > 4.5)
ver_match = re.search(r"(\d+)[._-](\d+)", m.get("id", ""))
ver = float(f"{ver_match.group(1)}.{ver_match.group(2)}") if ver_match else 0.0
return (-score_bracket, pref, -ver, cost)
# REASONING: wider 0.15 bracket, prefer by score then cost
score_bracket = round(score / 0.15) * 0.15
return (-score_bracket, cost, pref)
tier_descriptions = {
"SIMPLE": "Monitoring, heartbeat, summaries — free/tiny/cheap models",
"MEDIUM": "Code fixes, research, data analysis — capable mid-range",
"COMPLEX": "Features, architecture, debugging — high quality models",
"REASONING": "Formal logic, deep analysis, math — dedicated thinking models",
"CRITICAL": "Security, production, high-stakes — flagship models only",
}
use_for = {
"SIMPLE": ["monitoring", "status checks", "summaries", "alerts",
"heartbeat", "tweet monitoring", "price alerts", "memory consolidation"],
"MEDIUM": ["code fixes", "research", "API integration", "docs",
"general QA", "data analysis", "moderate tasks"],
"COMPLEX": ["feature development", "architecture", "debugging",
"code review", "multi-file changes", "trading strategy"],
"REASONING": ["formal proofs", "deep analysis", "math", "algorithmic design",
"long-horizon planning", "complex logical chains"],
"CRITICAL": ["security review", "production decisions", "financial ops",
"high-stakes analysis", "strategic planning"],
}
configs = {}
for tier in ["SIMPLE", "MEDIUM", "COMPLEX", "REASONING", "CRITICAL"]:
models = sorted(by_tier.get(tier, []), key=lambda m: sort_key_for_tier(tier, m))
if not models:
configs[tier] = {
"description": tier_descriptions[tier],
"primary": "",
"fallbacks": [],
"use_for": use_for.get(tier, []),
}
continue
configs[tier] = {
"description": tier_descriptions[tier],
"primary": full_id(models[0]),
"fallbacks": [full_id(m) for m in models[1:]],
"use_for": use_for.get(tier, []),
}
return configs
def main():
import argparse
parser = argparse.ArgumentParser(description="Capability-based tier classifier v2.0")
parser.add_argument("--config", default=None, help="OpenClaw config path")
parser.add_argument("--json", action="store_true", help="Output raw JSON")
parser.add_argument("--tier", choices=["SIMPLE", "MEDIUM", "COMPLEX", "REASONING", "CRITICAL"],
help="Show only this tier")
args = parser.parse_args()
classified = classify_from_openclaw_config(args.config)
if args.json:
print(json.dumps(classified, indent=2))
return
from collections import defaultdict
by_tier = defaultdict(list)
for m in classified:
by_tier[m["tier"]].append(m)
tiers_to_show = [args.tier] if args.tier else ["SIMPLE", "MEDIUM", "COMPLEX", "REASONING", "CRITICAL"]
print("\n=== Capability-Based Tier Classification v2.0 ===\n")
for tier in tiers_to_show:
models = sorted(by_tier.get(tier, []), key=lambda m: -m["score"])
print(f"{tier} ({len(models)} models):")
for m in models:
params = f" params={m['effective_params_b']:.0f}B" if m['effective_params_b'] > 0 else " params=?"
local_tag = " [local]" if m.get("is_local") else ""
reasoning_tag = " [reasoning-specialist]" if m["signals"].get("is_reasoning_specialist") else ""
print(f" {m['provider']}/{m['id']}"
f"{params}"
f" ctx={m['context_window']//1000}K"
f" cost=${m['input_cost_per_m']}/M"
f" cap={m['score']:.3f}"
f"{local_tag}{reasoning_tag}")
print()
tier_cfg = build_tier_config(classified)
print("Primary models per tier (ranked by capability + cost efficiency):")
for tier, cfg in tier_cfg.items():
primary = cfg.get("primary", "(none)")
fb_count = len(cfg.get("fallbacks", []))
print(f" {tier}: {primary} (+{fb_count} fallbacks)")
if __name__ == "__main__":
main()
```