Back to skills
SkillHub ClubAnalyze Data & AIFull StackBackendData / AI

web3-investor

AI-friendly Web3 investment infrastructure for autonomous agents. Use when (1) discovering and analyzing DeFi/NFT investment opportunities, (2) executing secure transactions via local keystore signer REST API with preview-approve-execute state machine, (3) managing portfolio with dashboards and expiry alerts. Supports base and ethereum chains, configurable security constraints including whitelist protection, transaction limits, and mandatory simulation before execution.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,111
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
B75.6

Install command

npx @skill-hub/cli install openclaw-skills-web3-investor

Repository

openclaw/skills

Skill path: skills/bevanding/web3-investor

AI-friendly Web3 investment infrastructure for autonomous agents. Use when (1) discovering and analyzing DeFi/NFT investment opportunities, (2) executing secure transactions via local keystore signer REST API with preview-approve-execute state machine, (3) managing portfolio with dashboards and expiry alerts. Supports base and ethereum chains, configurable security constraints including whitelist protection, transaction limits, and mandatory simulation before execution.

Open repository

Best for

Primary workflow: Analyze Data & AI.

Technical facets: Full Stack, Backend, Data / AI, Security.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install web3-investor into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding web3-investor to shared team environments
  • Use web3-investor for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: web3-investor
description: AI-friendly Web3 investment infrastructure for autonomous agents. Use when (1) discovering and analyzing DeFi/NFT investment opportunities, (2) executing secure transactions via local keystore signer REST API with preview-approve-execute state machine, (3) managing portfolio with dashboards and expiry alerts. Supports base and ethereum chains, configurable security constraints including whitelist protection, transaction limits, and mandatory simulation before execution.
---

# Web3 Investor Skill

> **Purpose**: Enable AI agents to safely discover, analyze, and execute DeFi investments.
> 
> **Core Philosophy**: Data-driven decisions. No generic advice without real-time discovery.

---

## ⚠️ Critical Rules (MUST FOLLOW)

### Rule 1: Discovery First
**When user asks for investment advice:**
```
❌ WRONG: Give generic advice immediately (e.g., "I recommend Aave")
✅ CORRECT: 
   1. Collect investment preferences (chain, token, risk tolerance)
   2. Run discovery to get real-time data
   3. Analyze data
   4. Provide data-backed recommendations
```

### Rule 2: User's LLM Makes Decisions
- This skill provides **raw data only**
- Investment analysis and recommendations are the responsibility of the user's LLM/agent
- This skill is NOT responsible for investment outcomes

### Rule 3: Risk Acknowledgment
- APY data comes from third-party APIs and may be delayed or inaccurate
- Investment decisions are made at the user's own risk
- Always DYOR (Do Your Own Research)

### Rule 4: Verify Execution Capability Before Trading
**Before attempting any transaction, the agent MUST check signer availability:**
```
❌ WRONG: Directly call preview/execute without checking API
✅ CORRECT:
   1. Check if signer API is reachable (call balances endpoint)
   2. If unreachable → inform user: "Signer service unavailable, please check SETUP.md"
   3. Never proceed with preview if signer is unavailable
```

**Health Check Command**:
```bash
python3 scripts/trading/trade_executor.py balances --network base
# If success → signer is available
# If error E010 → signer unavailable, stop and inform user
```

### Rule 5: Check Payment Capability FIRST (NEW in v0.5.0)
**Before asking user for transaction details, ALWAYS check execution readiness:**
```
❌ WRONG: Ask "How much do you want to invest?" without checking payment capability
✅ CORRECT:
   1. Run: python3 scripts/trading/preflight.py check --network <chain>
   2. Inform user of available payment methods
   3. Then ask for transaction details
```

**Preflight Check Output**:
```json
{
  "recommended": "eip681_payment_link",
  "methods": [
    {"method": "keystore_signer", "status": "unavailable"},
    {"method": "eip681_payment_link", "status": "available"}
  ],
  "message": "⚠️ No local signer. Use EIP-681 payment link."
}
```

**Payment Method Flow**:
| Recommended Method | Action |
|-------------------|--------|
| `keystore_signer` | Use `preview → approve → execute` flow |
| `eip681_payment_link` | Generate EIP-681 link with `eip681_payment.py` |

---

## 🎯 Quick Start for Agents

### Step 1: Collect Investment Preferences (REQUIRED)

Before running discovery, ask the user:

| Preference | Key | Options | Why It Matters |
|------------|-----|---------|----------------|
| **Chain** | `chain` | ethereum, base, arbitrum, optimism | Determines which blockchain to search |
| **Capital Token** | `capital_token` | USDC, USDT, ETH, WBTC, etc. | The token they want to invest |
| **Reward Preference** | `reward_preference` | single / multi / any | Single token rewards vs multiple tokens |
| **Accept IL** | `accept_il` | true / false / any | Impermanent loss tolerance |
| **Underlying Type** | `underlying_preference` | rwa / onchain / mixed / any | Real-world assets vs on-chain |

### Step 2: Run Discovery

```bash
# Basic search
python3 scripts/discovery/find_opportunities.py \
  --chain ethereum \
  --min-apy 5 \
  --limit 20

# With LLM-ready output
python3 scripts/discovery/find_opportunities.py \
  --chain ethereum \
  --llm-ready \
  --output json
```

### Step 3: Filter by Preferences

```python
from scripts.discovery.investment_profile import InvestmentProfile

profile = InvestmentProfile()
profile.set_preferences(
    chain="ethereum",
    capital_token="USDC",
    accept_il=False,
    reward_preference="single"
)
filtered = profile.filter_opportunities(opportunities)
```

### Step 4: Execute Transaction (Choose Payment Method)

#### Option A: Keystore Signer (Production)
```bash
# Preview → Approve → Execute
python3 scripts/trading/trade_executor.py preview \
  --type deposit --protocol aave --asset USDC --amount 1000 --network base

python3 scripts/trading/trade_executor.py approve --preview-id <uuid>

python3 scripts/trading/trade_executor.py execute --approval-id <uuid>
```

#### Option B: EIP-681 Payment Link (Mobile)
```bash
python3 scripts/trading/eip681_payment.py generate \
  --token USDC --to 0x... --amount 10 --network base \
  --qr-output /tmp/payment_qr.png
```

---

## 📁 Project Structure

```
web3-investor/
├── scripts/
│   ├── discovery/           # Opportunity discovery tools
│   ├── trading/             # Transaction execution modules
│   └── portfolio/           # Balance queries
├── config/
│   ├── config.json          # Execution model & security settings
│   └── protocols.json       # Protocol registry
├── references/              # Detailed module documentation
│   ├── discovery.md
│   ├── investment-profile.md
│   ├── trade-executor.md
│   ├── portfolio-indexer.md
│   ├── protocols.md
│   └── risk-framework.md
└── SKILL.md
```

---

## 📚 Module Overview

| Module | Purpose | Details |
|--------|---------|---------|
| **Discovery** | Search DeFi yield opportunities | See [references/discovery.md](references/discovery.md) |
| **Investment Profile** | Preference collection & filtering | See [references/investment-profile.md](references/investment-profile.md) |
| **Trade Executor** | Transaction execution REST API | See [references/trade-executor.md](references/trade-executor.md) |
| **Portfolio Indexer** | On-chain balance queries | See [references/portfolio-indexer.md](references/portfolio-indexer.md) |

---

## 🔍 Discovery Data Sources

### Primary Data Sources

| Source | Type | Use Case |
|--------|------|----------|
| **MCP (AntAlpha)** | Real-time yields | Primary source for DeFi opportunities |
| **DefiLlama** | Protocol TVL/Yields | Fallback and cross-validation |
| **Dune** | On-chain analytics | Custom queries, advanced analysis |

### Dune MCP Integration

Dune provides powerful on-chain analytics through MCP (Model Context Protocol). Use Dune for:
- Custom on-chain data queries
- Protocol-specific analytics
- Historical trend analysis
- Whale wallet tracking

**Configuration** (`config/config.json`):
```json
{
  "discovery": {
    "data_sources": ["mcp", "dune", "defillama"],
    "dune": {
      "mcp_endpoint": "https://api.dune.com/mcp/v1",
      "auth": {
        "header": { "name": "x-dune-api-key", "env_var": "DUNE_API_KEY" },
        "query_param": { "name": "api_key", "env_var": "DUNE_API_KEY" }
      }
    }
  }
}
```

**Environment Setup**:
```bash
# Required for Dune integration
export DUNE_API_KEY="your_dune_api_key"
```

**Authentication Methods**:
1. **Header Auth** (Recommended): `x-dune-api-key: <DUNE_API_KEY>`
2. **Query Param**: `?api_key=<DUNE_API_KEY>`

**Usage Example**:
```bash
# Query Dune for protocol analytics
curl -H "x-dune-api-key: $DUNE_API_KEY" \
  "https://api.dune.com/mcp/v1/query/<query_id>/results"
```

---

## ⚙️ Configuration

### Environment Variables

```bash
# Optional: Alchemy for better RPC
ALCHEMY_API_KEY=your_key_here

# Optional: Debank for portfolio tracking
WEB3_INVESTOR_DEBANK_API_KEY=your_key_here

# Trade Executor: Local API endpoint
WEB3_INVESTOR_API_URL=http://localhost:3000/api
```

### Security Configuration (`config/config.json`)

```json
{
  "security": {
    "max_trade_usd": 10000,
    "max_slippage_percent": 3.0,
    "whitelist_enabled": false,
    "whitelist_tokens": ["USDC", "USDT", "DAI", "WETH", "ETH", "stETH", "rETH"],
    "whitelist_protocols": ["uniswap", "aave", "compound", "lido", "0x"],
    "double_confirm": {
      "enabled": true,
      "large_trade_threshold_usd": 5000
    }
  }
}
```

#### Whitelist Configuration

| Parameter | Default | Description |
|-----------|---------|-------------|
| `whitelist_enabled` | `false` | Enable/disable whitelist filtering in trade execution |
| `whitelist_tokens` | [...] | Allowed tokens for trading |
| `whitelist_protocols` | [...] | Allowed protocols for interaction |

**Note**: When `whitelist_enabled` is `false` (default), trade execution skips whitelist checks, allowing broader protocol/token access. Set to `true` to enforce strict whitelist validation.

### Whitelist Setup

```bash
python3 scripts/trading/whitelist.py \
  --add 0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2 \
  --name "Aave V3 Pool" \
  --limit 10000
```

---

## 📝 Skill Author Template (MUST INCLUDE IN PROMPTS)

When using this skill for transaction generation, include this template:

```
Output structured transaction request (JSON), do not execute directly.
All transactions must go through preview -> approve -> execute.
If transaction parameters cannot be determined, return clarification, do not guess.
```

### Required Output Format

```json
{
  "request_id": "uuid",
  "timestamp": "ISO8601", 
  "network": "base|ethereum",
  "chain_id": 8453|1,
  "type": "transfer|swap|deposit|contract_call",
  "description": "human readable description",
  "transaction": {
    "to": "0x...",
    "value": "0x0",
    "data": "0x...",
    "gas_limit": 250000
  },
  "metadata": {
    "protocol": "uniswap|aave|compound|...",
    "from_token": "USDC",
    "to_token": "WETH", 
    "amount": "5"
  }
}
```

---

## 🆘 Troubleshooting

### Import Errors
Run from workspace root:
```bash
cd /home/admin/.openclaw/workspace
python3 skills/web3-investor/scripts/discovery/find_opportunities.py ...
```

### No Opportunities Found
- Check chain name spelling
- Try lowering `--min-apy` threshold
- Ensure `--max-apy` isn't too restrictive

### Rate Limiting
- DefiLlama has generous limits but can occasionally rate limit
- Add delays between requests if batch processing

---

## 🤝 Contributing

Test donations welcome:
- **Network**: Base Chain
- **Address**: `0x1F3A9A450428BbF161C4C33f10bd7AA1b2599a3e`

---

**Maintainer**: Web3 Investor Skill Team  
**Registry**: https://clawhub.com/skills/web3-investor  
**License**: MIT

---

## Referenced Files

> The following files are referenced in this skill and included for context.

### references/discovery.md

```markdown
# Opportunity Discovery Module

> **Purpose**: Search DeFi yield opportunities across multiple sources with real-time data.

---

## What It Does

Searches DeFi yield opportunities across multiple sources with real-time data.

## Key Features

- **Risk Signals**: Each opportunity includes structured risk data:
  - `reward_type`: "none" | "single" | "multi"
  - `has_il_risk`: true | false (impermanent loss)
  - `underlying_type`: "rwa" | "onchain" | "mixed" | "unknown"
- **Actionable Addresses**: Contract addresses ready for execution
- **LLM-Ready Output**: Structured JSON optimized for AI analysis

## Data Sources (Priority Order)

| Source | Priority | Type | API Key Required |
|--------|----------|------|------------------|
| **MCP (AntAlpha)** | Primary | Real-time yields | No |
| **Dune Analytics** | Secondary | On-chain analytics | Yes (`DUNE_API_KEY`) |
| **DefiLlama** | Fallback | Protocol TVL/Yields | No |
| **Protocol Registry** | Static | Known protocol metadata | No |

### MCP (AntAlpha)
- **Endpoint**: `https://mcp.prime.antalpha.com/mcp`
- **Fallback**: `http://47.85.100.251:3000`
- **Use Case**: Real-time DeFi opportunity discovery
- **Configuration**: See `config/config.json` → `discovery.mcp_url`

### Dune Analytics
- **MCP Endpoint**: `https://api.dune.com/mcp/v1`
- **Auth Methods**:
  1. Header: `x-dune-api-key: <DUNE_API_KEY>`
  2. Query param: `?api_key=<DUNE_API_KEY>`
- **Environment Variable**: `DUNE_API_KEY`
- **Use Case**: Custom on-chain queries, protocol analytics, whale tracking
- **Configuration**:
```json
{
  "discovery": {
    "dune": {
      "mcp_endpoint": "https://api.dune.com/mcp/v1",
      "auth": {
        "header": { "name": "x-dune-api-key", "env_var": "DUNE_API_KEY" },
        "query_param": { "name": "api_key", "env_var": "DUNE_API_KEY" }
      },
      "enabled": true
    }
  }
}
```

### DefiLlama
- **Endpoint**: `https://yields.llama.fi`
- **Use Case**: Protocol TVL, yield data, cross-validation
- **No API key required**

## Usage Examples

```bash
# Search Ethereum opportunities with min 5% APY
python3 scripts/discovery/find_opportunities.py \
  --chain ethereum \
  --min-apy 5 \
  --limit 20

# Search stablecoin products only
python3 scripts/discovery/find_opportunities.py \
  --chain ethereum \
  --min-apy 3 \
  --max-apy 25 \
  --limit 50

# Output for LLM analysis
python3 scripts/discovery/find_opportunities.py \
  --chain ethereum \
  --llm-ready \
  --output json
```

## Output Format

### Standard Output

```
# DeFi Opportunities on ethereum

## 1. Aave V3 USDC
- APY: 5.23%
- TVL: $1,234,567,890
- Pool: 0x...
- Risk: single reward, no IL
```

### LLM-Ready Output (JSON)

```json
{
  "opportunities": [
    {
      "name": "Aave V3 USDC",
      "chain": "ethereum",
      "apy": 5.23,
      "tvl": 1234567890,
      "pool_address": "0x...",
      "reward_type": "single",
      "has_il_risk": false,
      "underlying_type": "onchain"
    }
  ]
}
```

## Troubleshooting

### No Opportunities Found
- Check chain name spelling (case-sensitive in some cases)
- Try lowering `--min-apy` threshold
- Ensure `--max-apy` isn't too restrictive

### Rate Limiting
- DefiLlama has generous limits but can occasionally rate limit
- Add delays between requests if batch processing
```

### references/investment-profile.md

```markdown
# Investment Profile & Filtering Module

> **Purpose**: Structured preference collection and opportunity filtering.

---

## What It Does

Structured preference collection and opportunity filtering.

## Why Use It

- Ensures consistent question flow across different agents
- Provides type-safe preference storage
- One-shot filtering based on multiple criteria

## Preference Parameters

| Preference | Key | Options | Why It Matters |
|------------|-----|---------|----------------|
| **Chain** | `chain` | ethereum, base, arbitrum, optimism | Determines which blockchain to search |
| **Capital Token** | `capital_token` | USDC, USDT, ETH, WBTC, etc. | The token they want to invest |
| **Reward Preference** | `reward_preference` | single / multi / any | Single token rewards vs multiple tokens (e.g., CRV+CVX) |
| **Accept IL** | `accept_il` | true / false / any | Impermanent loss tolerance for LP products |
| **Underlying Type** | `underlying_preference` | rwa / onchain / mixed / any | Real-world assets vs pure on-chain protocols |

## Code Example

```python
from scripts.discovery.investment_profile import InvestmentProfile

# Create profile
profile = InvestmentProfile()

# Method 1: Direct assignment
profile.chain = "ethereum"
profile.capital_token = "USDC"
profile.accept_il = False
profile.reward_preference = "single"
profile.min_apy = 5
profile.max_apy = 30

# Method 2: Batch setup
profile.set_preferences(
    chain="ethereum",
    capital_token="USDC",
    accept_il=False,
    reward_preference="single",
    underlying_preference="onchain",
    min_apy=5,
    max_apy=30
)

# Filter opportunities
filtered = profile.filter_opportunities(opportunities)

# Get human-readable explanation
print(profile.explain_filtering(len(opportunities), len(filtered)))
```

## Available Questions for UI Building

```python
questions = InvestmentProfile.get_questions()

# Returns structured dict:
{
  "required": [...],      # Must ask: chain, capital_token
  "preference": [...],    # Should ask: reward_preference, accept_il, etc.
  "constraints": [...]    # Optional: min_apy, max_apy, min_tvl
}
```

## Filtering Logic

The filtering applies all set preferences:

1. **Chain filter**: Exact match
2. **Capital token filter**: Match against opportunity's underlying tokens
3. **Reward preference filter**: 
   - `single`: Only single-token rewards
   - `multi`: Multi-token rewards acceptable
   - `any`: No filter
4. **IL tolerance filter**: 
   - `false`: Exclude LP products with IL risk
   - `true`: Include all
   - `any`: No filter
5. **Underlying type filter**: Match RWA/onchain/mixed
6. **APY range filter**: Min/max bounds
7. **TVL filter**: Minimum TVL threshold
```

### references/trade-executor.md

```markdown
# Trade Executor Module (REST API Adapter)

> **Purpose**: Generate executable transaction requests via REST API to local keystore signer.

---

## What It Does

Generates executable transaction requests via REST API to local keystore signer. **This module does NOT hold private keys** — all transactions require explicit approval.

## Execution Model

| Property | Value |
|----------|-------|
| **Wallet Type** | Local keystore signer |
| **Supported Chains** | `base`, `ethereum` |
| **Entry Point** | REST API |
| **State Machine** | `preview` → `approve` → `execute` |

## Security Constraints (MUST FOLLOW)

- ❌ **Cannot skip `approve` step** — every transaction requires manual confirmation
- ✅ **Must simulate before execution** — uses `eth_call` for validation
- ⚠️ **Must return risk warnings** — insufficient balance, missing allowance, invalid route
- 🔒 **Default minimum permissions**:
  - Whitelist chains/protocols/tokens
  - Transaction value limits
  - Max slippage caps

## Payment Methods

### Option A: Keystore Signer (Production)

Requires local signer service running. Best for automated agents with dedicated signing infrastructure.

```bash
# Step 4a: Preview transaction
python3 scripts/trading/trade_executor.py preview \
  --type deposit \
  --protocol aave \
  --asset USDC \
  --amount 1000 \
  --network base

# Step 4b: Approve (returns approval_id)
python3 scripts/trading/trade_executor.py approve \
  --preview-id <uuid-from-preview>

# Step 4c: Execute (broadcasts signed tx)
python3 scripts/trading/trade_executor.py execute \
  --approval-id <uuid-from-approve>
```

### Option B: EIP-681 Payment Link (Mobile Recommended)

Generate a MetaMask-compatible payment link or QR code. Best for mobile users and quick investments without local signer setup.

```bash
# Generate payment link for RWA investment
python3 scripts/trading/eip681_payment.py generate \
  --token USDC \
  --to 0x1F3A9A450428BbF161C4C33f10bd7AA1b2599a3e \
  --amount 10 \
  --network base \
  --qr-output /tmp/payment_qr.png
```

**Output includes:**
- MetaMask deep link (mobile users click to open app)
- QR code PNG file (desktop users scan with phone)
- Raw transaction details (for manual verification)

**Supported tokens:** USDC, USDT, WETH, ETH on Base and Ethereum mainnet.

### Option C: WalletConnect (Roadmap)

Coming in future release. Will support complex DeFi interactions and persistent wallet connections.

## API Endpoints

| Operation | Method | Endpoint | Description |
|-----------|--------|----------|-------------|
| Query Balances | GET | `/api/wallet/balances` | Get wallet token balances |
| Preview Swap | POST | `/api/trades/preview` or `/api/uniswap/preview-swap` or `/api/zerox/preview-swap` | Generate transaction preview |
| Approve | POST | `/api/trades/approve` | Confirm transaction for execution |
| Execute | POST | `/api/trades/execute` | Broadcast signed transaction |
| Check Status | GET | `/api/transactions/{tx_hash}` | Query transaction status |
| Query Allowances | GET | `/api/allowances` | Get token allowances |
| Revoke Preview | POST | `/api/allowances/revoke-preview` | Preview allowance revoke |

## Unified Transaction Request Format

All transaction requests follow this structure:

```json
{
  "request_id": "uuid",
  "timestamp": "ISO8601",
  "network": "base",
  "chain_id": 8453,
  "type": "transfer|swap|deposit|contract_call",
  "description": "human readable",
  "transaction": {
    "to": "0x...",
    "value": "0x0",
    "data": "0x...",
    "gas_limit": 250000
  },
  "metadata": {
    "protocol": "uniswap|0x|aave|...",
    "from_token": "USDC",
    "to_token": "WETH",
    "amount": "5"
  }
}
```

## Response Specifications

### `preview` Response

```json
{
  "preview_id": "uuid",
  "simulation_ok": true|false,
  "risk": {
    "balance_sufficient": true|false,
    "allowance_sufficient": true|false,
    "route_valid": true|false,
    "warnings": ["..."]
  },
  "next_step": "approve" | "clarification"
}
```

### `approve` Response

```json
{
  "approval_id": "uuid",
  "preview_id": "...",
  "approved_at": "ISO8601",
  "expires_at": "ISO8601"
}
```

### `execute` Response

```json
{
  "tx_hash": "0x...",
  "explorer_url": "https://basescan.org/tx/0x...",
  "executed_at": "ISO8601",
  "network": "base"
}
```

### Error Format

```json
{
  "code": "E001-E999",
  "message": "human readable",
  "diagnostics": "technical details"
}
```

## CLI Usage Examples

```bash
# Step 1: Preview a swap
python3 scripts/trading/trade_executor.py preview \
  --type swap \
  --from-token USDC \
  --to-token WETH \
  --amount 5 \
  --network base

# Step 2: Approve the preview
python3 scripts/trading/trade_executor.py approve \
  --preview-id <uuid-from-step-1>

# Step 3: Execute the approved transaction
python3 scripts/trading/trade_executor.py execute \
  --approval-id <uuid-from-step-2>

# Check balances
python3 scripts/trading/trade_executor.py balances \
  --network base

# Check transaction status
python3 scripts/trading/trade_executor.py status \
  --tx-hash 0x...
```

## Health Check (MUST RUN BEFORE TRADING)

```bash
python3 scripts/trading/trade_executor.py balances --network base
# If success → signer is available
# If error E010 → signer unavailable, stop and inform user
```

## Module Usage Guide

| Module | Purpose | Production/Debug |
|--------|---------|------------------|
| `trade_executor.py` | Main execution module, connects to signer service | ✅ Production |
| `eip681_payment.py` | Generate MetaMask payment links & QR codes | ✅ Production |
| `safe_vault.py` | Calldata generation, balance check (no signing) | 🔧 Debug |
| `simulate_tx.py` | Transaction simulation (no signing) | 🔧 Debug |

## Troubleshooting

### Import Errors
If you see `ModuleNotFoundError`, ensure you're running from the workspace root:
```bash
cd /home/admin/.openclaw/workspace
python3 skills/web3-investor/scripts/trading/trade_executor.py ...
```

### Signer Unavailable
- Check if the local signer service is running
- Verify `WEB3_INVESTOR_API_URL` environment variable
- Consult SETUP.md for signer setup instructions
```

### references/portfolio-indexer.md

```markdown
# Portfolio Indexer Module

> **Purpose**: Query on-chain balances for specified addresses.

---

## What It Does

Queries on-chain balances for specified addresses.

## Supported Chains

- Ethereum mainnet
- Base
- Arbitrum (partial)

## Usage

```bash
# Query portfolio
python3 scripts/portfolio/indexer.py \
  --address 0x... \
  --chain ethereum \
  --output json
```

## Output Format

### JSON Output

```json
{
  "address": "0x...",
  "chain": "ethereum",
  "timestamp": "2026-03-05T15:00:00Z",
  "balances": [
    {
      "token": "USDC",
      "symbol": "USDC",
      "balance": "1000.50",
      "balance_usd": 1000.50,
      "contract": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
    },
    {
      "token": "WETH",
      "symbol": "WETH",
      "balance": "0.5",
      "balance_usd": 1500.00,
      "contract": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
    }
  ],
  "total_usd": 2500.50
}
```

## Environment Variables

```bash
# Optional: Alchemy for better RPC
ALCHEMY_API_KEY=your_key_here

# Optional: Debank for portfolio tracking
WEB3_INVESTOR_DEBANK_API_KEY=your_key_here
```

## Troubleshooting

### No Balances Found
- Verify the address is correct
- Check if the chain is supported
- Ensure RPC endpoint is accessible

### Slow Response
- Alchemy API provides faster responses than public RPC
- Debank API provides aggregated portfolio data across chains
```

### scripts/trading/trade_executor.py

```python
#!/usr/bin/env python3
"""
Trade Executor - REST API adapter for local keystore signer (v0.3.0)

DESIGN PHILOSOPHY (Agent-First Design):
This module is designed for AGENTS with LLM and programming capabilities.
It provides a REFERENCE implementation, not a rigid standard.

If your signer service uses different API paths or response formats:
1. Modify config/config.json to map your endpoints
2. Or modify this file's api_request() function
3. Or implement an adapter layer in your signer service

Architecture:
- This module ONLY generates executable transaction requests, does NOT hold private keys
- All transactions must go through: preview -> approve -> execute
- Supports: base, ethereum chains
- Entry point: REST API endpoints

Security Constraints:
- Cannot skip 'approve' step
- Must simulate before execution (eth_call)
- Must return risk warnings (insufficient balance, missing approval, invalid route)
- Default minimum permissions: whitelist chains/protocols/tokens, limits, max slippage

Usage:
    python3 trade_executor.py preview --type swap --from-token USDC --to-token WETH --amount 5 --network base
    python3 trade_executor.py approve --preview-id <id>
    python3 trade_executor.py execute --approval-id <id>
    python3 trade_executor.py status --tx-hash 0x...

For API specification, see: SIGNER_API_SPEC.md
For setup guide, see: SETUP.md
"""

import argparse
import json
import os
import sys
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
import urllib.request
import urllib.error

# ============================================================================
# Configuration
# ============================================================================

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SKILL_DIR = os.path.dirname(os.path.dirname(SCRIPT_DIR))  # trading -> scripts -> skill_dir
CONFIG_PATH = os.path.join(SKILL_DIR, "config", "config.json")

# Add scripts directory to path for imports (utils and schemas are in scripts/)
sys.path.insert(0, os.path.dirname(SCRIPT_DIR))
try:
    from utils.preflight import PreflightChecker, PreflightError, load_config as load_preflight_config
    PREFLIGHT_AVAILABLE = True
except ImportError as e:
    PREFLIGHT_AVAILABLE = False

try:
    from utils.rpc_manager import RPCManager, get_rpc_manager
    RPC_MANAGER_AVAILABLE = True
except ImportError:
    RPC_MANAGER_AVAILABLE = False

try:
    from schemas.output_schema import (
        PreviewOutput, ApprovalResult, ExecutionResult,
        InputInfo, QuoteInfo, RiskWarning, RiskLevel,
        create_preview_output, create_approval_result, create_execution_result
    )
    SCHEMA_AVAILABLE = True
except ImportError:
    SCHEMA_AVAILABLE = False

# API Base URL (from environment or default)
API_BASE_URL = os.environ.get("WEB3_INVESTOR_API_URL", "http://localhost:3000/api")

# Supported chains
SUPPORTED_CHAINS = {
    "base": {"chain_id": 8453, "name": "Base", "explorer": "https://basescan.org"},
    "ethereum": {"chain_id": 1, "name": "Ethereum", "explorer": "https://etherscan.io"},
}

# Default security settings
DEFAULT_SECURITY = {
    "max_slippage_percent": 3.0,
    "whitelist_enabled": False,
    "whitelist_chains": ["base", "ethereum"],
    "whitelist_protocols": ["uniswap", "aave", "compound", "lido", "0x"],
    "whitelist_tokens": ["USDC", "USDT", "DAI", "WETH", "ETH", "stETH", "rETH"],
    "max_trade_value_usd": 10000,
}

# Error codes
ERROR_CODES = {
    "INSUFFICIENT_BALANCE": {"code": "E001", "message": "Insufficient balance for transaction"},
    "INSUFFICIENT_ALLOWANCE": {"code": "E002", "message": "Token allowance insufficient, approval required"},
    "INVALID_ROUTE": {"code": "E003", "message": "No valid route found for swap"},
    "CHAIN_NOT_SUPPORTED": {"code": "E004", "message": "Chain not in whitelist"},
    "PROTOCOL_NOT_SUPPORTED": {"code": "E005", "message": "Protocol not in whitelist"},
    "TOKEN_NOT_SUPPORTED": {"code": "E006", "message": "Token not in whitelist"},
    "EXCEEDS_LIMIT": {"code": "E007", "message": "Transaction exceeds configured limit"},
    "PREVIEW_FAILED": {"code": "E008", "message": "Transaction simulation failed"},
    "APPROVAL_REQUIRED": {"code": "E009", "message": "Cannot execute without approval"},
    "API_UNAVAILABLE": {"code": "E010", "message": "Local API service unavailable"},
    "PREFLIGHT_FAILED": {"code": "E014", "message": "Pre-flight checks failed"},
    "UNKNOWN_ERROR": {"code": "E999", "message": "Unknown error occurred"},
}


# ============================================================================
# Unified Transaction Request Format
# ============================================================================

def create_transaction_request(
    network: str,
    tx_type: str,
    to: str,
    value: str = "0x0",
    data: str = "0x",
    gas_limit: int = 250000,
    description: str = "",
    metadata: Optional[Dict] = None
) -> Dict[str, Any]:
    """
    Create a unified transaction request format.
    
    Args:
        network: Chain name (base, ethereum)
        tx_type: Transaction type (transfer, swap, deposit, contract_call)
        to: Target address
        value: Value in hex (0x...)
        data: Calldata in hex
        gas_limit: Estimated gas limit
        description: Human readable description
        metadata: Additional protocol info (protocol, from_token, to_token, amount)
    
    Returns:
        Unified transaction request dict
    """
    if network not in SUPPORTED_CHAINS:
        raise ValueError(f"Unsupported network: {network}. Supported: {list(SUPPORTED_CHAINS.keys())}")
    
    request = {
        "request_id": str(uuid.uuid4()),
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "network": network,
        "chain_id": SUPPORTED_CHAINS[network]["chain_id"],
        "type": tx_type,
        "description": description,
        "transaction": {
            "to": to,
            "value": value,
            "data": data,
            "gas_limit": gas_limit
        },
        "metadata": metadata or {}
    }
    
    return request


# ============================================================================
# API Client
# ============================================================================

def api_request(
    method: str,
    endpoint: str,
    data: Optional[Dict] = None,
    timeout: int = 30
) -> Dict[str, Any]:
    """
    Make a request to the local signer API.
    
    This is the core HTTP client. If your signer service uses different
    API paths or response formats, modify this function or update config.json.
    
    Args:
        method: HTTP method (GET, POST)
        endpoint: API endpoint (without /api prefix)
        data: Request body for POST
        timeout: Request timeout in seconds
    
    Returns:
        API response as dict with 'success' field
    """
    url = f"{API_BASE_URL}{endpoint}"
    headers = {"Content-Type": "application/json"}
    
    try:
        if method.upper() == "GET":
            req = urllib.request.Request(url, headers=headers, method="GET")
        else:
            body = json.dumps(data or {}).encode("utf-8")
            req = urllib.request.Request(url, data=body, headers=headers, method="POST")
        
        with urllib.request.urlopen(req, timeout=timeout) as response:
            result = json.loads(response.read().decode("utf-8"))
            return {"success": True, "data": result}
    
    except urllib.error.URLError as e:
        return {
            "success": False,
            "error": ERROR_CODES["API_UNAVAILABLE"],
            "diagnostics": f"Cannot connect to {url}: {str(e)}"
        }
    except urllib.error.HTTPError as e:
        try:
            error_body = json.loads(e.read().decode("utf-8"))
            return {
                "success": False,
                "error": error_body.get("error", ERROR_CODES["UNKNOWN_ERROR"]),
                "diagnostics": error_body.get("message", f"HTTP {e.code}")
            }
        except:
            return {
                "success": False,
                "error": ERROR_CODES["UNKNOWN_ERROR"],
                "diagnostics": f"HTTP {e.code}: {e.reason}"
            }
    except Exception as e:
        return {
            "success": False,
            "error": ERROR_CODES["UNKNOWN_ERROR"],
            "diagnostics": str(e)
        }


# ============================================================================
# Wallet Operations
# ============================================================================

def get_wallet_balances(
    chain: Optional[str] = None,
    tokens: Optional[List[str]] = None
) -> Dict[str, Any]:
    """
    Query wallet balances from local API.
    
    GET /api/wallet/balances
    """
    params = []
    if chain:
        params.append(f"chain={chain}")
    if tokens:
        params.append(f"tokens={','.join(tokens)}")
    
    endpoint = "/wallet/balances"
    if params:
        endpoint += "?" + "&".join(params)
    
    return api_request("GET", endpoint)


# ============================================================================
# Trade Operations (State Machine: Preview -> Approve -> Execute)
# ============================================================================

def run_preflight_checks(transaction_type: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    """
    Run pre-flight checks if available and enabled.
    
    Returns None if checks pass, or error dict if checks fail.
    """
    if not PREFLIGHT_AVAILABLE:
        return None
    
    try:
        config = load_preflight_config(CONFIG_PATH)
        preflight_config = config.get("preflight", {})
        
        if not preflight_config.get("enabled", True):
            return None
        
        checker = PreflightChecker(config)
        report = checker.run(transaction_type, params)
        
        if not report.critical_passed:
            return {
                "success": False,
                "error": {
                    "code": "E014",
                    "message": "Pre-flight checks failed",
                    "details": {
                        "summary": report.summary,
                        "failed_checks": [
                            {
                                "name": r.name,
                                "message": r.message,
                                "fix_hint": r.fix_hint,
                                "severity": r.severity.value
                            }
                            for r in report.results if not r.passed
                        ]
                    }
                },
                "diagnostics": report.summary
            }
        
        return None
    
    except Exception as e:
        # Pre-flight errors should not block execution, just log
        return None


def preview_swap(
    from_token: str,
    to_token: str,
    amount: str,
    network: str = "base",
    slippage: float = 0.5,
    protocol: str = "auto"
) -> Dict[str, Any]:
    """
    Preview a swap transaction.
    
    POST /api/trades/preview (or /api/uniswap/preview-swap, /api/zerox/preview-swap)
    
    Returns:
        {
            "preview_id": "uuid",
            "simulation_ok": true/false,
            "risk": {...},
            "next_step": "approve" or "clarification",
            ...
        }
    """
    # Pre-flight checks
    params = {
        "type": "swap",
        "network": network,
        "from_token": from_token,
        "to_token": to_token,
        "amount": amount
    }
    preflight_error = run_preflight_checks("swap", params)
    if preflight_error:
        return preflight_error
    
    # Security check: whitelist (only if enabled)
    security = _load_security_config()
    
    # Skip whitelist checks if whitelist_enabled is False (default for backward compatibility)
    if security.get("whitelist_enabled", False):
        if network not in security.get("whitelist_chains", []):
            return {
                "success": False,
                "error": ERROR_CODES["CHAIN_NOT_SUPPORTED"],
                "diagnostics": f"Chain '{network}' not in whitelist. Allowed: {security['whitelist_chains']}"
            }
        
        if from_token.upper() not in security.get("whitelist_tokens", []):
            return {
                "success": False,
                "error": ERROR_CODES["TOKEN_NOT_SUPPORTED"],
                "diagnostics": f"Token '{from_token}' not in whitelist. Allowed: {security['whitelist_tokens']}"
            }
        
        if to_token.upper() not in security.get("whitelist_tokens", []):
            return {
                "success": False,
                "error": ERROR_CODES["TOKEN_NOT_SUPPORTED"],
                "diagnostics": f"Token '{to_token}' not in whitelist. Allowed: {security['whitelist_tokens']}"
            }
    
    # Try different preview endpoints
    endpoints_to_try = [
        "/trades/preview",
        "/uniswap/preview-swap",
        "/zerox/preview-swap"
    ]
    
    payload = {
        "from_token": from_token,
        "to_token": to_token,
        "amount": amount,
        "network": network,
        "slippage_percent": slippage,
        "protocol": protocol
    }
    
    last_error = None
    for endpoint in endpoints_to_try:
        result = api_request("POST", endpoint, payload)
        
        if result.get("success"):
            # Standardize response format
            data = result.get("data", {})
            
            # Check if double confirmation is required
            preview_result_basic = {
                "success": True,
                "preview_id": data.get("preview_id", str(uuid.uuid4())),
                "simulation_ok": data.get("simulation_ok", True),
                "risk": _extract_risk_info(data),
                "next_step": "approve" if data.get("simulation_ok", True) else "clarification",
                "transaction": data.get("transaction"),
                "rate": data.get("rate"),
                "estimated_output": data.get("estimated_output"),
                "gas_estimate": data.get("gas_estimate"),
                "warnings": data.get("warnings", [])
            }
            
            # Add double confirm check
            requires_double, reasons = _check_double_confirm_required(params, preview_result_basic)
            preview_result_basic["requires_double_confirm"] = requires_double
            if requires_double:
                preview_result_basic["double_confirm_reasons"] = reasons
                security = _load_security_config()
                confirm_phrase = security.get("double_confirm", {}).get("confirm_phrase", "CONFIRM_HIGH_RISK")
                preview_result_basic["confirm_instruction"] = f"To approve, use: --confirm \"{confirm_phrase}\""
            
            # Create standardized output if schema available
            if SCHEMA_AVAILABLE:
                input_info = InputInfo(
                    token=from_token,
                    amount=amount,
                    chain=network,
                    token_out=to_token
                )
                
                quote_info = QuoteInfo(
                    estimated_output=data.get("estimated_output"),
                    rate=data.get("rate"),
                    slippage_bps=int(slippage * 100),
                    gas_estimate=data.get("gas_estimate")
                )
                
                risk_warnings = []
                if data.get("warnings"):
                    for warning in data["warnings"]:
                        risk_warnings.append(RiskWarning(
                            type="general",
                            level=RiskLevel.MEDIUM,
                            message=warning
                        ))
                
                standardized = create_preview_output(
                    input_info=input_info,
                    quote_info=quote_info,
                    risk_warnings=risk_warnings,
                    requires_double_confirm=requires_double,
                    double_confirm_reasons=reasons,
                    confirm_instruction=preview_result_basic.get("confirm_instruction"),
                    preview_id=data.get("preview_id", str(uuid.uuid4())),
                    raw_transaction=data.get("transaction")
                )
                
                # Merge standardized format with basic result
                preview_result_basic["_standardized"] = standardized.to_dict()
            
            return preview_result_basic
        
        last_error = result
    
    # All endpoints failed
    return {
        "success": False,
        "error": last_error.get("error", ERROR_CODES["PREVIEW_FAILED"]) if last_error else ERROR_CODES["PREVIEW_FAILED"],
        "diagnostics": last_error.get("diagnostics", "All preview endpoints failed") if last_error else "No response from API"
    }


def preview_deposit(
    protocol: str,
    asset: str,
    amount: str,
    network: str = "base"
) -> Dict[str, Any]:
    """
    Preview a deposit transaction (for lending/staking).
    
    POST /api/trades/preview
    """
    # Pre-flight checks
    params = {
        "type": "deposit",
        "network": network,
        "asset": asset,
        "amount": amount
    }
    preflight_error = run_preflight_checks("deposit", params)
    if preflight_error:
        return preflight_error
    
    security = _load_security_config()
    
    if network not in security.get("whitelist_chains", []):
        return {
            "success": False,
            "error": ERROR_CODES["CHAIN_NOT_SUPPORTED"],
            "diagnostics": f"Chain '{network}' not in whitelist."
        }
    
    payload = {
        "type": "deposit",
        "protocol": protocol,
        "asset": asset,
        "amount": amount,
        "network": network
    }
    
    result = api_request("POST", "/trades/preview", payload)
    
    if result.get("success"):
        data = result.get("data", {})
        return {
            "success": True,
            "preview_id": data.get("preview_id", str(uuid.uuid4())),
            "simulation_ok": data.get("simulation_ok", True),
            "risk": _extract_risk_info(data),
            "next_step": "approve" if data.get("simulation_ok", True) else "clarification",
            "transaction": data.get("transaction"),
            "warnings": data.get("warnings", [])
        }
    
    return result


def approve_transaction(preview_id: str, confirm_phrase: str = None) -> Dict[str, Any]:
    """
    Approve a previewed transaction.
    
    POST /api/trades/approve
    
    Args:
        preview_id: Preview ID from preview step
        confirm_phrase: Confirmation phrase for high-risk transactions
    
    Returns:
        {
            "approval_id": "uuid",
            "preview_id": "...",
            ...
        }
    """
    if not preview_id:
        return {
            "success": False,
            "error": {"code": "E011", "message": "preview_id is required"},
            "diagnostics": "Cannot approve without preview_id"
        }
    
    # Check if double confirmation is required
    # In a real implementation, we would retrieve the preview result from storage
    # For now, we check the confirm_phrase if provided
    security = _load_security_config()
    double_confirm_config = security.get("double_confirm", {})
    
    if double_confirm_config.get("enabled", False):
        required_phrase = double_confirm_config.get("confirm_phrase", "CONFIRM_HIGH_RISK")
        
        # If confirm_phrase is provided, validate it
        if confirm_phrase is not None:
            if confirm_phrase != required_phrase:
                return {
                    "success": False,
                    "error": {"code": "E015", "message": "Invalid confirmation phrase"},
                    "diagnostics": f"Expected: {required_phrase}"
                }
    
    result = api_request("POST", "/trades/approve", {"preview_id": preview_id})
    
    if result.get("success"):
        data = result.get("data", {})
        
        # Create standardized output if schema available
        if SCHEMA_AVAILABLE:
            standardized = create_approval_result(
                success=True,
                approval_id=data.get("approval_id"),
                preview_id=preview_id
            )
            return {
                "success": True,
                "approval_id": data.get("approval_id"),
                "preview_id": preview_id,
                "approved_at": data.get("approved_at", datetime.utcnow().isoformat() + "Z"),
                "expires_at": data.get("expires_at"),
                "_standardized": standardized.to_dict()
            }
        
        return {
            "success": True,
            "approval_id": data.get("approval_id"),
            "preview_id": preview_id,
            "approved_at": data.get("approved_at", datetime.utcnow().isoformat() + "Z"),
            "expires_at": data.get("expires_at")
        }
    
    return result


def execute_transaction(approval_id: str) -> Dict[str, Any]:
    """
    Execute an approved transaction.
    
    POST /api/trades/execute
    
    Returns:
        {
            "tx_hash": "0x...",
            "explorer_url": "https://...",
            ...
        }
    """
    if not approval_id:
        return {
            "success": False,
            "error": {"code": "E012", "message": "approval_id is required"},
            "diagnostics": "Cannot execute without approval_id"
        }
    
    result = api_request("POST", "/trades/execute", {"approval_id": approval_id})
    
    if result.get("success"):
        data = result.get("data", {})
        tx_hash = data.get("tx_hash", "")
        network = data.get("network", "base")
        explorer_url = _build_explorer_url(tx_hash, network)
        
        # Create standardized output if schema available
        if SCHEMA_AVAILABLE:
            standardized = create_execution_result(
                success=True,
                tx_hash=tx_hash,
                network=network,
                explorer_url=explorer_url
            )
            return {
                "success": True,
                "tx_hash": tx_hash,
                "explorer_url": explorer_url,
                "executed_at": data.get("executed_at", datetime.utcnow().isoformat() + "Z"),
                "network": network,
                "_standardized": standardized.to_dict()
            }
        
        return {
            "success": True,
            "tx_hash": tx_hash,
            "explorer_url": explorer_url,
            "executed_at": data.get("executed_at", datetime.utcnow().isoformat() + "Z"),
            "network": network
        }
    
    return result


def get_transaction_status(tx_hash: str) -> Dict[str, Any]:
    """
    Check transaction status.
    
    GET /api/transactions/{tx_hash}
    """
    if not tx_hash:
        return {
            "success": False,
            "error": {"code": "E013", "message": "tx_hash is required"},
            "diagnostics": "Cannot check status without tx_hash"
        }
    
    result = api_request("GET", f"/transactions/{tx_hash}")
    
    if result.get("success"):
        data = result.get("data", {})
        return {
            "success": True,
            "tx_hash": tx_hash,
            "status": data.get("status", "unknown"),
            "block_number": data.get("block_number"),
            "gas_used": data.get("gas_used"),
            "explorer_url": _build_explorer_url(tx_hash, data.get("network", "base"))
        }
    
    return result


# ============================================================================
# Allowance Management
# ============================================================================

def get_allowances(
    chain: str = "base",
    token: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get current token allowances.
    
    GET /api/allowances
    """
    params = [f"chain={chain}"]
    if token:
        params.append(f"token={token}")
    
    endpoint = "/allowances?" + "&".join(params)
    return api_request("GET", endpoint)


def preview_revoke_allowance(
    token: str,
    spender: str,
    chain: str = "base"
) -> Dict[str, Any]:
    """
    Preview an allowance revoke transaction.
    
    POST /api/allowances/revoke-preview
    """
    payload = {
        "token": token,
        "spender": spender,
        "chain": chain
    }
    
    result = api_request("POST", "/allowances/revoke-preview", payload)
    
    if result.get("success"):
        data = result.get("data", {})
        return {
            "success": True,
            "preview_id": data.get("preview_id"),
            "current_allowance": data.get("current_allowance"),
            "spender": spender,
            "token": token,
            "next_step": "approve"
        }
    
    return result


# ============================================================================
# Helper Functions
# ============================================================================

def _load_security_config() -> Dict[str, Any]:
    """Load security configuration from config.json."""
    config_path = os.path.join(SKILL_DIR, "config", "config.json")
    
    if os.path.exists(config_path):
        with open(config_path) as f:
            config = json.load(f)
            return {**DEFAULT_SECURITY, **config.get("security", {})}
    
    return DEFAULT_SECURITY.copy()


def _check_double_confirm_required(
    params: Dict[str, Any],
    preview_result: Dict[str, Any]
) -> tuple:
    """
    Check if double confirmation is required.
    
    Returns:
        (requires_double_confirm, reasons) tuple
    """
    security = _load_security_config()
    double_confirm_config = security.get("double_confirm", {})
    
    if not double_confirm_config.get("enabled", False):
        return False, []
    
    reasons = []
    
    # Check 1: Large trade threshold
    large_trade_threshold = double_confirm_config.get("large_trade_threshold_usd", 5000)
    
    if params.get("amount"):
        try:
            amount = float(params["amount"])
            if amount >= large_trade_threshold:
                reasons.append(f"Large trade: {amount} >= threshold {large_trade_threshold}")
        except (ValueError, TypeError):
            pass
    
    # Check 2: High slippage
    high_slippage_threshold = double_confirm_config.get("high_slippage_threshold_bps", 100)
    slippage = params.get("slippage", 0.5)
    slippage_bps = slippage * 100  # Convert percent to basis points
    
    if slippage_bps >= high_slippage_threshold:
        reasons.append(f"High slippage: {slippage}% >= threshold {high_slippage_threshold/100}%")
    
    # Check 3: New protocol (first use)
    if double_confirm_config.get("new_protocol_confirm", True):
        protocol = params.get("protocol", "")
        if protocol and protocol not in ["auto", "uniswap", "aave", "compound"]:
            reasons.append(f"Protocol '{protocol}' requires confirmation (first use)")
    
    return len(reasons) > 0, reasons


def _extract_risk_info(data: Dict) -> Dict[str, Any]:
    """Extract risk information from API response."""
    return {
        "balance_sufficient": data.get("balance_sufficient", True),
        "allowance_sufficient": data.get("allowance_sufficient", True),
        "route_valid": data.get("route_valid", True),
        "slippage_within_limit": data.get("slippage_within_limit", True),
        "warnings": data.get("risk_warnings", [])
    }


def _build_explorer_url(tx_hash: str, network: str) -> str:
    """Build block explorer URL for transaction."""
    if network in SUPPORTED_CHAINS:
        base = SUPPORTED_CHAINS[network]["explorer"]
        return f"{base}/tx/{tx_hash}"
    return f"https://etherscan.io/tx/{tx_hash}"


def _format_output(data: Dict, json_output: bool = False) -> str:
    """Format output for display."""
    if json_output:
        return json.dumps(data, indent=2, ensure_ascii=False)
    
    if not data.get("success", False):
        error = data.get("error", {})
        diagnostics = data.get("diagnostics", "")
        return f"❌ Error [{error.get('code', '???')}]: {error.get('message', 'Unknown')}\n   Diagnostics: {diagnostics}"
    
    lines = []
    
    # Preview result
    if "preview_id" in data:
        lines.append(f"📋 Preview ID: {data['preview_id']}")
        lines.append(f"   Simulation: {'✅ OK' if data.get('simulation_ok') else '❌ Failed'}")
        
        if data.get("rate"):
            lines.append(f"   Rate: {data['rate']}")
        if data.get("estimated_output"):
            lines.append(f"   Estimated Output: {data['estimated_output']}")
        
        risk = data.get("risk", {})
        if risk.get("warnings"):
            lines.append(f"   ⚠️ Warnings: {', '.join(risk['warnings'])}")
        
        # Double confirmation info
        if data.get("requires_double_confirm"):
            lines.append(f"   🔒 Double Confirmation Required")
            for reason in data.get("double_confirm_reasons", []):
                lines.append(f"      - {reason}")
            if data.get("confirm_instruction"):
                lines.append(f"   {data['confirm_instruction']}")
        
        lines.append(f"   Next Step: {data.get('next_step', 'approve')}")
    
    # Approval result
    elif "approval_id" in data:
        lines.append(f"✅ Approval ID: {data['approval_id']}")
        lines.append(f"   Preview ID: {data['preview_id']}")
        lines.append(f"   Approved At: {data.get('approved_at', 'N/A')}")
        lines.append(f"   Next Step: execute --approval-id {data['approval_id']}")
    
    # Execution result
    elif "tx_hash" in data:
        lines.append(f"🚀 Transaction Submitted!")
        lines.append(f"   TX Hash: {data['tx_hash']}")
        lines.append(f"   Explorer: {data['explorer_url']}")
    
    # Balance result
    elif "balances" in data or data.get("data", {}).get("balances"):
        balances = data.get("balances") or data.get("data", {}).get("balances", [])
        lines.append("💰 Wallet Balances:")
        for bal in balances:
            lines.append(f"   {bal.get('symbol', '?')}: {bal.get('balance', '0')} ({bal.get('chain', '?')})")
    
    return "\n".join(lines)


# ============================================================================
# CLI
# ============================================================================

def main():
    parser = argparse.ArgumentParser(
        description="Trade Executor - REST API adapter for local keystore signer (v0.3.0)",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Preview a swap
  python3 trade_executor.py preview --type swap --from-token USDC --to-token WETH --amount 5 --network base
  
  # Approve a preview
  python3 trade_executor.py approve --preview-id <uuid>
  
  # Execute an approved transaction
  python3 trade_executor.py execute --approval-id <uuid>
  
  # Check balances
  python3 trade_executor.py balances --network base
  
  # Check transaction status
  python3 trade_executor.py status --tx-hash 0x...
"""
    )
    
    subparsers = parser.add_subparsers(dest="command", help="Commands")
    
    # Preview command
    preview_parser = subparsers.add_parser("preview", help="Preview a transaction")
    preview_parser.add_argument("--type", choices=["swap", "deposit", "transfer"], default="swap", help="Transaction type")
    preview_parser.add_argument("--from-token", help="Source token (for swap)")
    preview_parser.add_argument("--to-token", help="Destination token (for swap)")
    preview_parser.add_argument("--amount", required=True, help="Amount to trade")
    preview_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network")
    preview_parser.add_argument("--protocol", default="auto", help="Protocol to use")
    preview_parser.add_argument("--slippage", type=float, default=0.5, help="Slippage tolerance %%")
    preview_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # For deposit
    preview_parser.add_argument("--asset", help="Asset to deposit (for deposit)")
    
    # Approve command
    approve_parser = subparsers.add_parser("approve", help="Approve a previewed transaction")
    approve_parser.add_argument("--preview-id", required=True, help="Preview ID from preview step")
    approve_parser.add_argument("--confirm", help="Confirmation phrase for high-risk transactions")
    approve_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Execute command
    execute_parser = subparsers.add_parser("execute", help="Execute an approved transaction")
    execute_parser.add_argument("--approval-id", required=True, help="Approval ID from approve step")
    execute_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Status command
    status_parser = subparsers.add_parser("status", help="Check transaction status")
    status_parser.add_argument("--tx-hash", required=True, help="Transaction hash")
    status_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Balances command
    balances_parser = subparsers.add_parser("balances", help="Query wallet balances")
    balances_parser.add_argument("--network", choices=["base", "ethereum"], help="Filter by network")
    balances_parser.add_argument("--tokens", help="Comma-separated token list")
    balances_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Allowances command
    allowances_parser = subparsers.add_parser("allowances", help="Query or manage allowances")
    allowances_parser.add_argument("--revoke", action="store_true", help="Preview revoke allowance")
    allowances_parser.add_argument("--token", help="Token address or symbol")
    allowances_parser.add_argument("--spender", help="Spender address")
    allowances_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network")
    allowances_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Preflight check command
    preflight_parser = subparsers.add_parser("preflight", help="Run pre-flight checks")
    preflight_parser.add_argument("--type", choices=["swap", "deposit", "transfer"], default="swap", help="Transaction type to check")
    preflight_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network")
    preflight_parser.add_argument("--from-token", help="Source token (for swap)")
    preflight_parser.add_argument("--amount", help="Amount")
    preflight_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # RPC status command
    rpc_parser = subparsers.add_parser("rpc", help="Check RPC status and fallback")
    rpc_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network to check")
    rpc_parser.add_argument("--test", action="store_true", help="Test RPC request with fallback")
    rpc_parser.add_argument("--json", action="store_true", help="JSON output")
    
    args = parser.parse_args()
    
    if args.command == "preview":
        if args.type == "swap":
            if not args.from_token or not args.to_token:
                parser.error("--from-token and --to-token are required for swap")
            result = preview_swap(
                from_token=args.from_token,
                to_token=args.to_token,
                amount=args.amount,
                network=args.network,
                slippage=args.slippage,
                protocol=args.protocol
            )
        elif args.type == "deposit":
            if not args.asset or not args.protocol:
                parser.error("--asset and --protocol are required for deposit")
            result = preview_deposit(
                protocol=args.protocol,
                asset=args.asset,
                amount=args.amount,
                network=args.network
            )
        else:
            result = {"success": False, "error": ERROR_CODES["UNKNOWN_ERROR"], "diagnostics": f"Unsupported type: {args.type}"}
        
        print(_format_output(result, args.json))
    
    elif args.command == "approve":
        result = approve_transaction(args.preview_id, confirm_phrase=args.confirm)
        print(_format_output(result, args.json))
    
    elif args.command == "execute":
        result = execute_transaction(args.approval_id)
        print(_format_output(result, args.json))
    
    elif args.command == "status":
        result = get_transaction_status(args.tx_hash)
        print(_format_output(result, args.json))
    
    elif args.command == "balances":
        tokens = args.tokens.split(",") if args.tokens else None
        result = get_wallet_balances(chain=args.network, tokens=tokens)
        print(_format_output(result, args.json))
    
    elif args.command == "allowances":
        if args.revoke:
            if not args.token or not args.spender:
                parser.error("--token and --spender are required for revoke")
            result = preview_revoke_allowance(args.token, args.spender, args.network)
        else:
            result = get_allowances(chain=args.network, token=args.token)
        print(_format_output(result, args.json))
    
    elif args.command == "preflight":
        # Run pre-flight checks directly
        if PREFLIGHT_AVAILABLE:
            config = load_preflight_config(CONFIG_PATH)
            checker = PreflightChecker(config)
            params = {
                "type": args.type,
                "network": args.network,
                "from_token": args.from_token,
                "amount": args.amount
            }
            report = checker.run(args.type, params)
            
            if args.json:
                result = {
                    "success": report.critical_passed,
                    "all_passed": report.all_passed,
                    "summary": report.summary,
                    "checks": [
                        {
                            "name": r.name,
                            "passed": r.passed,
                            "severity": r.severity.value,
                            "message": r.message,
                            "fix_hint": r.fix_hint
                        }
                        for r in report.results
                    ]
                }
                print(json.dumps(result, indent=2))
            else:
                print(f"\n📋 Pre-flight Check Results")
                print(f"   Summary: {report.summary}")
                print(f"   All Passed: {'✅ Yes' if report.all_passed else '⚠️ No'}")
                print(f"   Critical Passed: {'✅ Yes' if report.critical_passed else '❌ No'}")
                print()
                for r in report.results:
                    status = "✅" if r.passed else "❌"
                    sev_emoji = {"critical": "🔴", "warning": "🟡", "info": "🔵"}[r.severity.value]
                    print(f"{status} {sev_emoji} {r.name}: {r.message}")
                    if not r.passed and r.fix_hint:
                        print(f"   💡 Fix: {r.fix_hint}")
                print()
        else:
            print("❌ Pre-flight module not available")
    
    elif args.command == "rpc":
        # Check RPC status
        if RPC_MANAGER_AVAILABLE:
            # Load config from correct path (from skill root)
            skill_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
            config_path = os.path.join(skill_dir, "config", "config.json")
            with open(config_path) as f:
                rpc_config = json.load(f)
            manager = RPCManager(rpc_config)
            
            if args.test:
                # Test RPC request
                try:
                    result = manager.request(args.network, {
                        "jsonrpc": "2.0",
                        "method": "eth_blockNumber",
                        "params": [],
                        "id": 1
                    })
                    
                    if args.json:
                        print(json.dumps({
                            "success": True,
                            "network": args.network,
                            "active_rpc": manager.get_active_rpc(args.network),
                            "result": result
                        }, indent=2))
                    else:
                        block = int(result.get("result", "0x0"), 16)
                        print(f"\n🔗 RPC Test: {args.network}")
                        print(f"   Active RPC: {manager.get_active_rpc(args.network)}")
                        print(f"   Block Number: #{block}")
                        print(f"   Status: ✅ OK")
                        print()
                
                except Exception as e:
                    if args.json:
                        print(json.dumps({
                            "success": False,
                            "network": args.network,
                            "error": str(e)
                        }, indent=2))
                    else:
                        print(f"\n❌ RPC Test Failed: {e}")
            else:
                # Show status report
                report = manager.get_status_report(args.network)
                
                if args.json:
                    print(json.dumps(report, indent=2))
                else:
                    print(f"\n🔗 RPC Status: {args.network.upper()}")
                    print(f"   Active: {report['active_rpc']}")
                    print(f"   Fallback Enabled: {'✅' if report['fallback_enabled'] else '❌'}")
                    print(f"   Session Sticky: {'✅' if report['session_sticky'] else '❌'}")
                    print()
                    print("   Endpoints:")
                    for ep in report['endpoints']:
                        status = "✅ Active" if ep['is_active'] else "⏸️  Standby"
                        print(f"      {status}: {ep['url']}")
                    print()
        else:
            print("❌ RPC Manager module not available")
    
    else:
        parser.print_help()


if __name__ == "__main__":
    main()
```

### scripts/trading/preflight.py

```python
#!/usr/bin/env python3
"""
Preflight Check Module for Web3 Investor

Purpose: Check execution readiness before attempting transactions.
Returns available payment methods and recommendations.

Usage:
    python preflight.py check
    python preflight.py check --network base
"""

import argparse
import json
import os
import sys
import urllib.request
import urllib.error
from typing import Dict, Any, Optional
from enum import Enum

# ============================================================================
# Configuration
# ============================================================================

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "config.json")

class PaymentMethod(Enum):
    """Available payment methods."""
    KEYSTORE_SIGNER = "keystore_signer"
    EIP681_PAYMENT_LINK = "eip681_payment_link"
    NONE = "none"


# ============================================================================
# Configuration Loading
# ============================================================================

def load_config() -> Dict[str, Any]:
    """Load configuration from config.json."""
    try:
        with open(CONFIG_PATH, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"⚠️ Failed to load config: {e}", file=sys.stderr)
        return {}


# ============================================================================
# Signer API Check
# ============================================================================

def check_signer_api(network: str = "base", timeout: int = 5) -> Dict[str, Any]:
    """
    Check if local signer API is available.
    
    Returns:
        Dict with status, message, and details
    """
    config = load_config()
    api_config = config.get("api", {})
    base_url = api_config.get("url", "http://localhost:3000/api")
    api_timeout = api_config.get("timeout_seconds", timeout)
    
    # Try to reach the balances endpoint
    url = f"{base_url}/wallet/balances?chain={network}"
    
    try:
        req = urllib.request.Request(
            url,
            headers={"User-Agent": "Web3-Investor-Preflight/0.5.0"},
            method='GET'
        )
        
        with urllib.request.urlopen(req, timeout=api_timeout) as response:
            data = json.loads(response.read().decode())
            
            if data.get("success"):
                return {
                    "available": True,
                    "status": "online",
                    "message": "Signer API is available",
                    "url": base_url,
                    "network": network
                }
            else:
                return {
                    "available": False,
                    "status": "error",
                    "message": f"Signer API returned error: {data.get('error', 'Unknown')}",
                    "url": base_url
                }
                
    except urllib.error.URLError as e:
        return {
            "available": False,
            "status": "offline",
            "message": f"Signer API unreachable: {e.reason}",
            "url": base_url
        }
    except Exception as e:
        return {
            "available": False,
            "status": "error",
            "message": f"Unexpected error: {str(e)}",
            "url": base_url
        }


# ============================================================================
# Execution Readiness Check
# ============================================================================

def check_execution_readiness(network: str = "base") -> Dict[str, Any]:
    """
    Check what payment methods are available for transaction execution.
    
    This function should be called BEFORE asking user for transaction details.
    
    Args:
        network: Network to check (default: base)
    
    Returns:
        Dict with recommended payment method and details
    """
    result = {
        "network": network,
        "methods": [],
        "recommended": None,
        "message": ""
    }
    
    # 1. Check local signer API
    signer_status = check_signer_api(network)
    
    if signer_status["available"]:
        result["methods"].append({
            "method": PaymentMethod.KEYSTORE_SIGNER.value,
            "status": "available",
            "description": "Local keystore signer (preview → approve → execute)",
            "details": signer_status
        })
        result["recommended"] = PaymentMethod.KEYSTORE_SIGNER.value
        result["message"] = "✅ Local signer available. Use preview → approve → execute flow."
    else:
        result["methods"].append({
            "method": PaymentMethod.KEYSTORE_SIGNER.value,
            "status": "unavailable",
            "description": "Local keystore signer",
            "details": signer_status
        })
        
        # 2. Fallback to EIP-681
        result["methods"].append({
            "method": PaymentMethod.EIP681_PAYMENT_LINK.value,
            "status": "available",
            "description": "EIP-681 payment link (MetaMask mobile)",
            "details": {
                "supported_wallets": ["MetaMask", "Rainbow", "Trust Wallet"],
                "requires": "Mobile wallet with USDT on the target chain"
            }
        })
        result["recommended"] = PaymentMethod.EIP681_PAYMENT_LINK.value
        result["message"] = "⚠️ No local signer. Use EIP-681 payment link for mobile wallet payment."
    
    return result


def format_readiness_report(readiness: Dict[str, Any]) -> str:
    """Format execution readiness as human-readable report."""
    lines = [
        "=" * 60,
        "🔍 Execution Readiness Check",
        "=" * 60,
        "",
        f"Network: {readiness['network'].upper()}",
        f"Recommended: {readiness['recommended']}",
        "",
        "Available Payment Methods:",
        "-" * 40,
    ]
    
    for method in readiness["methods"]:
        status_icon = "✅" if method["status"] == "available" else "❌"
        lines.append(f"  {status_icon} {method['method']}: {method['status']}")
        lines.append(f"      {method['description']}")
    
    lines.extend([
        "",
        "-" * 60,
        readiness["message"],
        ""
    ])
    
    return "\n".join(lines)


# ============================================================================
# CLI
# ============================================================================

def main():
    parser = argparse.ArgumentParser(
        description="Preflight check for Web3 Investor execution readiness",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    python preflight.py check
    python preflight.py check --network ethereum
        """
    )
    
    subparsers = parser.add_subparsers(dest="command", help="Command to run")
    
    # Check command
    check_parser = subparsers.add_parser("check", help="Check execution readiness")
    check_parser.add_argument("--network", default="base", help="Network to check")
    check_parser.add_argument("--json", action="store_true", help="Output as JSON")
    
    args = parser.parse_args()
    
    if args.command == "check":
        readiness = check_execution_readiness(args.network)
        
        if args.json:
            # Convert Enum to string for JSON serialization
            output = {
                "network": readiness["network"],
                "methods": readiness["methods"],
                "recommended": readiness["recommended"],
                "message": readiness["message"]
            }
            print(json.dumps(output, indent=2))
        else:
            print(format_readiness_report(readiness))
    else:
        parser.print_help()


if __name__ == "__main__":
    main()
```

### scripts/discovery/find_opportunities.py

```python
#!/usr/bin/env python3
"""
Find investment opportunities from DefiLlama API.

Version: 0.2.0 - Refactored for LLM-based risk analysis

Key Changes:
- Removed local risk scoring (calculate_risk_score)
- Added actionable_addresses for execution readiness
- Integrated protocol registry from references/protocols.md
- Enhanced null/type safety for external API data

Usage:
    python find_opportunities.py --min-apy 5 --chain ethereum
    python find_opportunities.py --protocol aave-v3 --output json
    python find_opportunities.py --llm-ready  # Output optimized for LLM analysis
"""

import argparse
import json
import os
import re
import sys
from datetime import datetime
from typing import Optional, List, Dict, Any
import urllib.request
import urllib.error

# ============================================================================
# Configuration
# ============================================================================

DEFILLAMA_BASE = "https://yields.llama.fi"
DEFILLAMA_PROTOCOLS = "https://api.llama.fi/protocols"
MAX_SAFE_APY_DEFAULT = 100  # Filter out extreme risks/scams by default

# Paths
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROTOCOLS_MD_PATH = os.path.join(SCRIPT_DIR, "..", "..", "references", "protocols.md")
PROTOCOLS_JSON_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "protocols.json")

# Chain mapping (DefiLlama naming)
CHAIN_MAPPING = {
    "ethereum": "Ethereum",
    "eth": "Ethereum",
    "base": "Base",
    "basechain": "Base",
    "arbitrum": "Arbitrum",
    "arb": "Arbitrum",
    "optimism": "Optimism",
    "op": "Optimism",
}


# ============================================================================
# Protocol Registry (Static Knowledge Layer)
# ============================================================================

def parse_protocols_md() -> Dict[str, Dict[str, Any]]:
    """
    Parse protocols.md into a static registry.
    
    Returns dict keyed by protocol name (lowercase).
    """
    registry = {}
    
    if not os.path.exists(PROTOCOLS_MD_PATH):
        return registry
    
    try:
        with open(PROTOCOLS_MD_PATH, "r", encoding="utf-8") as f:
            content = f.read()
        
        # Parse protocol sections (### Protocol Name)
        pattern = r"### ([A-Za-z0-9\s\-\.]+)\n((?:(?!\n### ).)*?)(?=\n### |$)"
        matches = re.findall(pattern, content, re.DOTALL)
        
        for name, section in matches:
            name_lower = name.lower().strip()
            info = {"name": name.strip()}
            
            # Extract contract address
            contract_match = re.search(r"\*\*Contract\*\*:\s*(`?0x[a-fA-F0-9]{40}`?)", section)
            if contract_match:
                addr = contract_match.group(1).strip("`")
                info["primary_contract"] = addr
            
            # Extract category
            category_match = re.search(r"\*\*Category\*\*:\s*(\w+)", section)
            if category_match:
                info["category"] = category_match.group(1).lower()
            
            # Extract risk level (for reference, not computed)
            risk_match = re.search(r"\*\*Risk Level\*\*:\s*(\w+)", section)
            if risk_match:
                info["registry_risk"] = risk_match.group(1).lower()
            
            # Extract docs URL
            docs_match = re.search(r"\*\*Docs\*\*:\s*(https?://[^\s]+)", section)
            if docs_match:
                info["docs_url"] = docs_match.group(1)
            
            # Extract TVL (for reference)
            tvl_match = re.search(r"\*\*TVL\*\*:\s*>\s*\$?([\d.]+[BMK]?)", section)
            if tvl_match:
                info["registry_tvl"] = tvl_match.group(1)
            
            registry[name_lower] = info
            
            # Also add alternative keys
            if "aave" in name_lower:
                registry["aave-v3"] = info
                registry["aave v3"] = info
            if "compound" in name_lower:
                registry["compound-v3"] = info
                registry["compound v3"] = info
        
    except Exception as e:
        print(f"⚠️ Failed to parse protocols.md: {e}", file=sys.stderr)
    
    return registry


def load_protocols_json() -> Dict[str, Dict[str, Any]]:
    """Load protocols from JSON config if exists."""
    if not os.path.exists(PROTOCOLS_JSON_PATH):
        return {}
    
    try:
        with open(PROTOCOLS_JSON_PATH, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        return {}


def get_protocol_registry() -> Dict[str, Dict[str, Any]]:
    """
    Merge static registry from MD and JSON.
    JSON takes precedence for overrides.
    """
    registry = parse_protocols_md()
    json_registry = load_protocols_json()
    registry.update(json_registry)
    return registry


# ============================================================================
# MCP Server Integration (v0.5.0 - Multi-server support)
# ============================================================================

CONFIG_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "config.json")

# Simple in-memory cache for MCP results
_MCP_CACHE: Dict[str, Dict[str, Any]] = {}
_MCP_CACHE_TIME: Dict[str, float] = {}

def load_config() -> Dict[str, Any]:
    """Load configuration from config.json."""
    try:
        with open(CONFIG_PATH, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"⚠️ Failed to load config: {e}", file=sys.stderr)
        return {}


def convert_rwa_product(product: Dict[str, Any], default_chain: str = "Base", server_name: str = "") -> Dict[str, Any]:
    """
    Convert RWA product from MCP to standard opportunity format.
    
    RWA products have different structure than DeFi pools.
    This function normalizes them for consistent handling.
    
    Note: RWA products define their own chain (usually Base for USDT payments),
    not inherited from search parameters.
    """
    try:
        # Extract key fields from RWA product
        product_id = product.get("id", "unknown")
        name = product.get("name", "RWA Product")
        expected_yield = product.get("expectedYieldAnnual", 0)
        term_days = product.get("productTerm", 0)
        min_subscription = product.get("minSubscriptionUsdt", 0)
        receiving_address = product.get("receivingAddress", "")
        
        # RWA products typically run on Base chain for USDT payments
        # The receiving address determines the actual chain
        # TODO: In future, MCP should return chain info with the product
        product_chain = product.get("chain", "Base")  # Default to Base for RWA products
        
        # Convert annual yield to APY percentage
        apy = expected_yield * 100 if expected_yield else 0
        
        # Build standardized opportunity record
        return {
            "pool": f"rwa-{product_id}",
            "protocol": "rwa-mcp",
            "protocol_name": f"RWA: {name}",
            "mcp_server": server_name,
            "chain": product_chain,  # Use product's own chain
            "symbol": "USDT",  # RWA products use USDT
            "apy": round(apy, 2),
            "apy_base": round(apy, 2),
            "apy_reward": 0.0,
            "tvl_usd": product.get("fundraisingScale", 0),
            "underlying_tokens": [],  # RWA doesn't have on-chain tokens
            "reward_tokens": [],
            "stablecoin": True,  # USDT is stablecoin
            "actionable_addresses": {
                "deposit_contract_candidates": [receiving_address] if receiving_address else [],
                "underlying_token_addresses": [],
                "reward_token_addresses": [],
                "has_actionable_address": bool(receiving_address),
                "primary_contract": receiving_address,
                "protocol_registry_match": False,
                "docs_url": None
            },
            "url": f"https://mcp.prime.antalpha.com/product/{product_id}",
            "audited": True,  # RWA products are typically audited
            "risk_signals": {
                "has_il_risk": False,  # RWA has no IL risk
                "reward_type": "none",
                "underlying_type": "rwa",
                "category": "RWA",
                "stablecoin": True,
                "product_term_days": term_days,
                "min_subscription_usdt": min_subscription,
                "product_info": {
                    "name": name,
                    "term": term_days,
                    "start_date": product.get("startDate"),
                    "maturity_date": product.get("maturityDate"),
                    "asset_info": product.get("assetInformation", "")[:100]
                }
            }
        }
    except Exception as e:
        print(f"⚠️ Failed to convert RWA product: {e}", file=sys.stderr)
        return None


def fetch_mcp_server(server_config: Dict[str, Any], chain: str = "Ethereum") -> List[Dict[str, Any]]:
    """
    Fetch yield opportunities from a single MCP server.
    
    Returns list of opportunities or empty list on failure.
    """
    server_name = server_config.get("name", "unknown")
    primary_url = server_config.get("primary_url", "")
    fallback_url = server_config.get("fallback_url", "")
    timeout = server_config.get("timeout_seconds", 30)
    
    if not primary_url and not fallback_url:
        print(f"⚠️ MCP server '{server_name}': No URLs configured", file=sys.stderr)
        return []
    
    # MCP JSON-RPC request format
    payload = {
        "jsonrpc": "2.0",
        "method": "tools/call",
        "params": {
            "name": "list_products",
            "arguments": {}
        },
        "id": 1
    }
    
    def try_mcp_request(url: str) -> Optional[List[Dict[str, Any]]]:
        """Try to fetch from a single URL."""
        try:
            req = urllib.request.Request(
                url,
                data=json.dumps(payload).encode('utf-8'),
                headers={
                    "Content-Type": "application/json",
                    "Accept": "application/json, text/event-stream",
                    "User-Agent": "Web3-Investor/0.5.0"
                },
                method='POST'
            )
            
            with urllib.request.urlopen(req, timeout=timeout) as response:
                raw_data = response.read().decode()
                
                # Parse SSE format
                result = None
                for line in raw_data.split('\n'):
                    if line.startswith('data: '):
                        json_str = line[6:]
                        try:
                            result = json.loads(json_str)
                            break
                        except json.JSONDecodeError:
                            continue
                
                if not result:
                    return None
                
                # Parse MCP response structure
                if "result" in result and "content" in result.get("result", {}):
                    content = result["result"]["content"]
                    if isinstance(content, list) and len(content) > 0:
                        opportunities = []
                        for item in content:
                            if item.get("type") == "text":
                                text_data = item.get("text", "")
                                try:
                                    parsed = json.loads(text_data)
                                    if isinstance(parsed, list):
                                        for product in parsed:
                                            opp = convert_rwa_product(product, chain, server_name)
                                            if opp:
                                                opportunities.append(opp)
                                    elif isinstance(parsed, dict):
                                        opp = convert_rwa_product(parsed, chain, server_name)
                                        if opp:
                                            opportunities.append(opp)
                                except json.JSONDecodeError:
                                    continue
                        return opportunities
                return None
        except urllib.error.URLError as e:
            print(f"⚠️ MCP '{server_name}' request to {url} failed: {e}", file=sys.stderr)
            return None
        except Exception as e:
            print(f"⚠️ MCP '{server_name}' unexpected error: {e}", file=sys.stderr)
            return None
    
    # Try primary URL first
    if primary_url:
        result = try_mcp_request(primary_url)
        if result is not None:
            print(f"✅ MCP '{server_name}': {len(result)} products from primary URL", file=sys.stderr)
            return result
    
    # Fall back to secondary URL
    if fallback_url:
        result = try_mcp_request(fallback_url)
        if result is not None:
            print(f"✅ MCP '{server_name}': {len(result)} products from fallback URL", file=sys.stderr)
            return result
    
    print(f"❌ MCP '{server_name}': All endpoints failed", file=sys.stderr)
    return []


def fetch_mcp_yields(chain: str = "Ethereum") -> List[Dict[str, Any]]:
    """
    Fetch yield opportunities from all configured MCP servers.
    
    Supports multiple servers with caching for performance.
    """
    config = load_config()
    mcp_config = config.get("discovery", {}).get("mcp", {})
    
    if not mcp_config.get("enabled", False):
        print("📊 MCP integration disabled in config", file=sys.stderr)
        return []
    
    servers = mcp_config.get("servers", [])
    if not servers:
        print("⚠️ No MCP servers configured", file=sys.stderr)
        return []
    
    cache_ttl = mcp_config.get("cache_ttl_seconds", 300)
    all_opportunities = []
    
    for server in servers:
        server_name = server.get("name", "unknown")
        cache_key = f"{server_name}:{chain}"
        
        # Check cache
        current_time = datetime.now().timestamp()
        if cache_key in _MCP_CACHE and cache_key in _MCP_CACHE_TIME:
            if current_time - _MCP_CACHE_TIME[cache_key] < cache_ttl:
                cached = _MCP_CACHE[cache_key]
                print(f"📦 MCP '{server_name}': Using cached data ({len(cached)} products)", file=sys.stderr)
                all_opportunities.extend(cached)
                continue
        
        # Fetch fresh data
        opportunities = fetch_mcp_server(server, chain)
        
        # Update cache
        if opportunities:
            _MCP_CACHE[cache_key] = opportunities
            _MCP_CACHE_TIME[cache_key] = current_time
            all_opportunities.extend(opportunities)
    
    return all_opportunities


# ============================================================================
# DefiLlama API (Dynamic Data Layer)
# ============================================================================

def fetch_yields() -> Dict[str, Any]:
    """Fetch all yield data from DefiLlama."""
    url = f"{DEFILLAMA_BASE}/pools"
    try:
        req = urllib.request.Request(url, headers={"User-Agent": "Web3-Investor/0.2.0"})
        with urllib.request.urlopen(req, timeout=30) as response:
            return json.loads(response.read().decode())
    except urllib.error.URLError as e:
        print(f"❌ Error fetching yields: {e}", file=sys.stderr)
        return {"data": []}
    except Exception as e:
        print(f"❌ Unexpected error fetching yields: {e}", file=sys.stderr)
        return {"data": []}


def fetch_protocols() -> List[Dict[str, Any]]:
    """Fetch protocol list from DefiLlama."""
    try:
        req = urllib.request.Request(
            DEFILLAMA_PROTOCOLS, 
            headers={"User-Agent": "Web3-Investor/0.2.0"}
        )
        with urllib.request.urlopen(req, timeout=30) as response:
            return json.loads(response.read().decode())
    except urllib.error.URLError as e:
        print(f"❌ Error fetching protocols: {e}", file=sys.stderr)
        return []
    except Exception as e:
        print(f"❌ Unexpected error fetching protocols: {e}", file=sys.stderr)
        return []


# ============================================================================
# Data Extraction & Normalization
# ============================================================================

def safe_get(data: Any, key: str, default: Any = None) -> Any:
    """
    Safely get value from dict with null/type protection.
    
    Handles:
    - None data
    - Missing keys
    - Null values
    """
    if data is None:
        return default
    if not isinstance(data, dict):
        return default
    value = data.get(key, default)
    return value if value is not None else default


def safe_float(value: Any, default: float = 0.0) -> float:
    """Safely convert to float with null protection."""
    if value is None:
        return default
    try:
        return float(value)
    except (ValueError, TypeError):
        return default


def safe_list(value: Any) -> List[Any]:
    """Safely convert to list with null protection."""
    if value is None:
        return []
    if isinstance(value, list):
        return value
    return []


def safe_str(value: Any, default: str = "") -> str:
    """Safely convert to string with null protection."""
    if value is None:
        return default
    return str(value)


def normalize_chain(chain: str) -> str:
    """Normalize chain name to DefiLlama format."""
    return CHAIN_MAPPING.get(chain.lower(), chain)


# ============================================================================
# Actionable Addresses Extraction
# ============================================================================

def extract_actionable_addresses(
    pool: Dict[str, Any],
    protocol: Dict[str, Any],
    registry: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
    """
    Extract actionable addresses for execution readiness.
    
    Priority:
    1. Pool-level addresses (from DefiLlama)
    2. Protocol-level addresses (from DefiLlama)
    3. Registry addresses (from protocols.md/json)
    """
    result = {
        "deposit_contract_candidates": [],
        "underlying_token_addresses": [],
        "reward_token_addresses": [],
        "has_actionable_address": False,
        "primary_contract": None,
        "protocol_registry_match": False,
        "docs_url": None
    }
    
    # 1. Pool-level addresses
    pool_addr = safe_get(pool, "pool")
    if pool_addr and pool_addr.startswith("0x"):
        result["deposit_contract_candidates"].append(pool_addr)
    
    # Underlying tokens
    underlying = safe_list(safe_get(pool, "underlyingTokens"))
    result["underlying_token_addresses"] = [
        addr for addr in underlying 
        if isinstance(addr, str) and addr.startswith("0x")
    ]
    
    # Reward tokens
    rewards = safe_list(safe_get(pool, "rewardTokens"))
    result["reward_token_addresses"] = [
        addr for addr in rewards 
        if isinstance(addr, str) and addr.startswith("0x")
    ]
    
    # 2. Protocol-level addresses (governance, tokens)
    protocol_tokens = safe_list(safe_get(protocol, "tokens"))
    for token in protocol_tokens:
        if isinstance(token, dict):
            addr = safe_get(token, "address")
            if addr and addr.startswith("0x"):
                result["underlying_token_addresses"].append(addr)
    
    # 3. Registry lookup
    protocol_name = safe_get(pool, "project", "").lower()
    registry_entry = registry.get(protocol_name, {})
    
    if registry_entry:
        result["protocol_registry_match"] = True
        
        primary = registry_entry.get("primary_contract")
        if primary and primary not in result["deposit_contract_candidates"]:
            result["deposit_contract_candidates"].insert(0, primary)
            result["primary_contract"] = primary
        
        docs = registry_entry.get("docs_url")
        if docs:
            result["docs_url"] = docs
    
    # Determine if actionable
    result["has_actionable_address"] = bool(
        result["deposit_contract_candidates"] or 
        result["underlying_token_addresses"]
    )
    
    return result


# ============================================================================
# Risk Data Collection (for LLM Analysis)
# ============================================================================

def detect_reward_type(pool: Dict[str, Any]) -> str:
    """Detect if product has single or multi-token rewards."""
    reward_tokens = safe_list(safe_get(pool, "rewardTokens"))
    if not reward_tokens:
        return "none"
    elif len(reward_tokens) == 1:
        return "single"
    else:
        return "multi"


def detect_il_risk(pool: Dict[str, Any], category: str) -> bool:
    """Detect if product has impermanent loss risk."""
    underlying = safe_list(safe_get(pool, "underlyingTokens"))
    
    # DEX LP products typically have IL risk
    dex_categories = ["dex", "dexs", "amm", "lp"]
    if any(dex in category.lower() for dex in dex_categories):
        return True
    
    # Multi-asset pools with volatile tokens have IL risk
    if len(underlying) >= 2 and not safe_get(pool, "stablecoin", False):
        return True
    
    return False


def detect_underlying_type(pool: Dict[str, Any], category: str, symbol: str) -> str:
    """Detect underlying asset type (RWA vs on-chain)."""
    # RWA keywords
    rwa_keywords = ["tbill", "treasury", "bond", "real-world", "rwa", "usdyc", "usdy"]
    symbol_lower = symbol.lower()
    
    for keyword in rwa_keywords:
        if keyword in symbol_lower:
            return "rwa"
    
    # Check category
    if "rwa" in category.lower():
        return "rwa"
    
    # Pure lending/yield protocols are on-chain
    onchain_categories = ["lending", "yield", "staking", "liquid staking"]
    if any(oc in category.lower() for oc in onchain_categories):
        return "onchain"
    
    # Mixed or unknown
    underlying = safe_list(safe_get(pool, "underlyingTokens"))
    if len(underlying) > 1:
        return "mixed"
    
    return "unknown"


def collect_risk_signals(
    pool: Dict[str, Any],
    protocol: Dict[str, Any],
    registry: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
    """
    Collect risk-related signals WITHOUT computing a score.
    
    LLM will analyze these signals to determine risk.
    """
    protocol_name = safe_get(pool, "project", "").lower()
    registry_entry = registry.get(protocol_name, {})
    category = registry_entry.get("category") or safe_get(protocol, "category", "")
    symbol = safe_str(safe_get(pool, "symbol"))
    
    return {
        # Protocol maturity signals
        "audits": safe_list(safe_get(protocol, "audits")),
        "audit_count": len(safe_list(safe_get(protocol, "audits"))),
        "has_audit": bool(safe_get(protocol, "audits")),
        "audit_notes": safe_str(safe_get(protocol, "audit_note")),
        
        # TVL signals
        "tvl_usd": safe_float(safe_get(pool, "tvlUsd")),
        "protocol_tvl": safe_float(safe_get(protocol, "tvl")),
        
        # Age/maturity signals
        "protocol_slug": safe_get(protocol, "slug"),
        "known_protocol": bool(registry_entry),
        "category": category,
        
        # APY signals (high APY may indicate risk)
        "apy": safe_float(safe_get(pool, "apy")),
        "apy_base": safe_float(safe_get(pool, "apyBase")),
        "apy_reward": safe_float(safe_get(pool, "apyReward")),
        "apy_composition": "base+rewards" if safe_get(pool, "apyReward") else "base_only",
        
        # Asset type signals
        "stablecoin": safe_get(pool, "stablecoin", False),
        "symbol": symbol,
        
        # Chain signals
        "chain": safe_str(safe_get(pool, "chain")),
        
        # NEW in v0.2.1: Investment preference signals
        "reward_type": detect_reward_type(pool),  # "none" | "single" | "multi"
        "has_il_risk": detect_il_risk(pool, category),  # True | False
        "underlying_type": detect_underlying_type(pool, category, symbol),  # "rwa" | "onchain" | "mixed" | "unknown"
        
        # Governance signals
        "governance": safe_get(protocol, "governance"),
        "has_gov_token": bool(safe_get(protocol, "governance")),
        
        # Registry reference (if available)
        "registry_risk": registry_entry.get("registry_risk"),
        "registry_category": registry_entry.get("category"),
    }


# ============================================================================
# Main Opportunity Finder
# ============================================================================

def check_execution_readiness_inline(network: str = "base") -> Dict[str, Any]:
    """
    Inline version of execution readiness check.
    Returns available payment methods without full preflight module import.
    """
    config = load_config()
    api_config = config.get("api", {})
    base_url = api_config.get("url", "http://localhost:3000/api")
    
    # Quick check if signer API is available
    try:
        req = urllib.request.Request(
            f"{base_url}/wallet/balances?chain={network}",
            headers={"User-Agent": "Web3-Investor/0.5.0"},
            method='GET'
        )
        with urllib.request.urlopen(req, timeout=5) as response:
            data = json.loads(response.read().decode())
            if data.get("success"):
                return {
                    "recommended": "keystore_signer",
                    "methods": ["keystore_signer", "eip681_payment_link"],
                    "message": "✅ Local signer available"
                }
    except:
        pass
    
    return {
        "recommended": "eip681_payment_link",
        "methods": ["eip681_payment_link"],
        "message": "⚠️ No local signer. Use EIP-681 payment link."
    }


def find_opportunities(
    min_apy: float = 0,
    max_apy: float = MAX_SAFE_APY_DEFAULT,
    chain: str = "Ethereum",
    min_tvl: float = 0,
    limit: int = 20,
    allow_high_apy: bool = False,
    include_risk_signals: bool = True,
    include_execution_readiness: bool = True
) -> List[Dict[str, Any]]:
    """
    Find investment opportunities matching criteria.
    
    Returns structured data for LLM-based risk analysis.
    Supports both MCP server and DefiLlama as data sources.
    
    Args:
        min_apy: Minimum APY percentage
        max_apy: Maximum APY percentage (default 100% to filter scams)
        chain: Blockchain to search
        min_tvl: Minimum TVL in USD
        limit: Maximum results to return
        allow_high_apy: If True, allow APY > 100%
        include_risk_signals: Include risk_signals for LLM analysis
        include_execution_readiness: Include execution_readiness for payment method guidance
    
    Returns:
        List of opportunity dicts with actionable_addresses, risk_signals, and execution_readiness
    """
    # Load config to check MCP settings
    config = load_config()
    mcp_config = config.get("discovery", {}).get("mcp", {})
    
    # Step 1: Fetch from MCP servers (with caching)
    mcp_opportunities = []
    if mcp_config.get("enabled", False):
        mcp_opportunities = fetch_mcp_yields(chain=chain)
    
    # Step 2: Fetch from DefiLlama API
    yields_data = fetch_yields()
    protocols_data = fetch_protocols()
    
    if not yields_data.get("data"):
        print("⚠️ No yield data received from DefiLlama", file=sys.stderr)
        # If MCP has data but DefiLlama fails, return MCP data only
        if mcp_opportunities:
            return mcp_opportunities[:limit]
        return []
    
    # Build protocol lookup
    protocol_map = {}
    for p in protocols_data:
        slug = safe_get(p, "slug", safe_get(p, "name", "")).lower()
        protocol_map[slug] = p
        protocol_map[safe_get(p, "name", "").lower()] = p
    
    # Load static registry
    registry = get_protocol_registry()
    
    opportunities = []
    normalized_chain = normalize_chain(chain)
    
    for pool in yields_data.get("data", []):
        # Filter by chain
        pool_chain = safe_str(safe_get(pool, "chain", "")).lower()
        if normalized_chain.lower() not in pool_chain and chain.lower() not in pool_chain:
            continue
        
        # Filter by APY
        apy = safe_float(safe_get(pool, "apy"))
        if apy < min_apy:
            continue
        if not allow_high_apy and apy > max_apy:
            continue
        
        # Filter by TVL
        tvl = safe_float(safe_get(pool, "tvlUsd"))
        if tvl < min_tvl:
            continue
        
        # Get protocol info
        protocol_name = safe_str(safe_get(pool, "project"))
        protocol = protocol_map.get(protocol_name.lower(), {})
        
        # Extract actionable addresses
        actionable = extract_actionable_addresses(pool, protocol, registry)
        
        # Build opportunity record
        opp = {
            # Core identifiers
            "pool": safe_str(safe_get(pool, "pool")),
            "protocol": protocol_name,
            "protocol_name": safe_get(protocol, "name", protocol_name),
            "chain": safe_get(pool, "chain", "Ethereum"),
            "symbol": safe_str(safe_get(pool, "symbol")),
            
            # Yield metrics
            "apy": round(apy, 2),
            "apy_base": round(safe_float(safe_get(pool, "apyBase")), 2),
            "apy_reward": round(safe_float(safe_get(pool, "apyReward")), 2),
            "tvl_usd": tvl,
            
            # Asset info
            "underlying_tokens": safe_list(safe_get(pool, "underlyingTokens")),
            "reward_tokens": safe_list(safe_get(pool, "rewardTokens")),
            "stablecoin": safe_get(pool, "stablecoin", False),
            
            # Execution readiness (NEW in v0.2.0)
            "actionable_addresses": actionable,
            
            # Links
            "url": f"https://defillama.com/yield/pool/{safe_get(pool, 'pool', '')}",
            
            # Legacy fields (for backward compatibility)
            "audited": bool(safe_get(protocol, "audits")),
        }
        
        # Add risk signals for LLM analysis
        if include_risk_signals:
            opp["risk_signals"] = collect_risk_signals(pool, protocol, registry)
        
        opportunities.append(opp)
    
    # Step 3: Merge MCP and DefiLlama data
    if mcp_opportunities:
        # Use pool address as unique key for deduplication
        existing_pools = {opp["pool"] for opp in opportunities}
        added_count = 0
        for mcp_opp in mcp_opportunities:
            pool_id = mcp_opp.get("pool")
            if pool_id and pool_id not in existing_pools:
                opportunities.append(mcp_opp)
                existing_pools.add(pool_id)
                added_count += 1
        if added_count > 0:
            print(f"📈 Added {added_count} unique opportunities from MCP", file=sys.stderr)
    
    total_count = len(opportunities)
    mcp_count = len(mcp_opportunities)
    defi_count = total_count - mcp_count + (len([o for o in mcp_opportunities if o.get("pool") in {opp["pool"] for opp in opportunities}]) if mcp_opportunities else 0)
    
    print(f"📊 Total: {total_count} opportunities ({mcp_count} from MCP + ~{defi_count} from DefiLlama)", file=sys.stderr)
    
    # Sort by APY (descending)
    opportunities.sort(key=lambda x: x["apy"], reverse=True)
    opportunities = opportunities[:limit]
    
    # Attach execution readiness to each opportunity (NEW in v0.5.0)
    if include_execution_readiness:
        readiness = check_execution_readiness_inline(chain.lower())
        for opp in opportunities:
            opp["execution_readiness"] = readiness
    
    return opportunities


# ============================================================================
# Output Formatting
# ============================================================================

def format_report(opportunities: List[Dict], output_format: str = "markdown") -> str:
    """Format opportunities as report."""
    if not opportunities:
        return "No opportunities found matching criteria."
    
    if output_format == "json":
        return json.dumps(opportunities, indent=2, ensure_ascii=False)
    
    # Markdown format
    lines = [
        "# 💰 Investment Opportunities",
        f"\n**Generated**: {datetime.now().isoformat()}",
        f"**Found**: {len(opportunities)} pools\n",
        "---\n"
    ]
    
    for i, opp in enumerate(opportunities, 1):
        lines.extend([
            f"## {i}. {opp['protocol']} - {opp['symbol']}",
            "",
            f"| Metric | Value |",
            f"|--------|-------|",
            f"| APY | **{opp['apy']}%** |",
            f"| TVL | ${opp['tvl_usd']:,.0f} |",
            f"| Chain | {opp['chain']} |",
            f"| Stablecoin | {'✅' if opp['stablecoin'] else '❌'} |",
        ])
        
        # Show actionable status
        actionable = opp.get("actionable_addresses", {})
        if actionable.get("has_actionable_address"):
            lines.append(f"| Actionable | ✅ Ready for execution |")
            if actionable.get("primary_contract"):
                lines.append(f"| Primary Contract | `{actionable['primary_contract']}` |")
        else:
            lines.append(f"| Actionable | ⚠️ Needs address lookup |")
        
        lines.extend([
            "",
            f"**Pool**: `{opp['pool']}`",
            f"**Link**: {opp['url']}",
            ""
        ])
        
        # Show risk signals summary
        risk = opp.get("risk_signals", {})
        if risk:
            lines.extend([
                f"**Risk Signals**:",
                f"- Audited: {'✅' if risk.get('has_audit') else '❌'}",
                f"- Known Protocol: {'✅' if risk.get('known_protocol') else '❌'}",
                f"- APY Composition: {risk.get('apy_composition', 'unknown')}",
                ""
            ])
    
    return "\n".join(lines)


def format_llm_prompt(opportunities: List[Dict]) -> str:
    """
    Format opportunities as LLM-ready prompt for risk analysis.
    
    Output is optimized for LLM consumption.
    """
    if not opportunities:
        return "No opportunities to analyze."
    
    lines = [
        "# DeFi Investment Opportunities - Risk Analysis Request",
        "",
        "Please analyze the following investment opportunities and provide:",
        "1. Risk assessment (Low/Medium/High) with reasoning",
        "2. Key risk factors to watch",
        "3. Recommended actions",
        "",
        "---\n"
    ]
    
    for i, opp in enumerate(opportunities, 1):
        lines.append(f"## Opportunity {i}")
        lines.append("```json")
        lines.append(json.dumps(opp, indent=2, ensure_ascii=False))
        lines.append("```\n")
    
    return "\n".join(lines)


# ============================================================================
# CLI
# ============================================================================

def main():
    parser = argparse.ArgumentParser(
        description="Find DeFi investment opportunities (v0.2.0 - LLM-ready)",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python find_opportunities.py --min-apy 5 --chain ethereum
  python find_opportunities.py --llm-ready --output json
  python find_opportunities.py --allow-high-apy --limit 50
        """
    )
    
    parser.add_argument("--min-apy", type=float, default=0, 
                        help="Minimum APY %%")
    parser.add_argument("--max-apy", type=float, default=MAX_SAFE_APY_DEFAULT, 
                        help=f"Maximum APY %% (default {MAX_SAFE_APY_DEFAULT}%)")
    parser.add_argument("--allow-high-apy", action="store_true",
                        help="Allow APY > 100%% (high risk)")
    parser.add_argument("--chain", default="Ethereum", 
                        help="Blockchain to search")
    parser.add_argument("--min-tvl", type=float, default=0, 
                        help="Minimum TVL in USD")
    parser.add_argument("--limit", type=int, default=20, 
                        help="Maximum results")
    parser.add_argument("--output", choices=["markdown", "json"], default="markdown",
                        help="Output format")
    parser.add_argument("--llm-ready", action="store_true",
                        help="Output LLM-ready prompt for risk analysis")
    parser.add_argument("--no-risk-signals", action="store_true",
                        help="Exclude risk signals from output")
    
    args = parser.parse_args()
    
    # Safety warning
    if args.allow_high_apy:
        print("⚠️ 警告: --allow-high-apy 已启用,可能包含高风险项目", file=sys.stderr)
    
    opportunities = find_opportunities(
        min_apy=args.min_apy,
        max_apy=args.max_apy,
        chain=args.chain,
        min_tvl=args.min_tvl,
        limit=args.limit,
        allow_high_apy=args.allow_high_apy,
        include_risk_signals=not args.no_risk_signals
    )
    
    # Output
    if args.llm_ready:
        print(format_llm_prompt(opportunities))
    elif args.output == "json":
        print(json.dumps(opportunities, indent=2, ensure_ascii=False))
    else:
        print(format_report(opportunities, "markdown"))


if __name__ == "__main__":
    main()
```

### scripts/trading/eip681_payment.py

```python
#!/usr/bin/env python3
"""
EIP-681 Payment Link Generator (v0.3.3)

Generate EIP-681 compliant payment links and QR codes for MetaMask.

EIP-681 Format:
    ethereum:<to>@<chainId>/transfer?address=<recipient>&uint256=<amount>

Usage:
    python3 eip681_payment.py generate \
      --token USDC \
      --to 0x1F3A9A450428BbF161C4C33f10bd7AA1b2599a3e \
      --amount 10 \
      --network base
    
    # With QR code (for desktop users)
    python3 eip681_payment.py generate \
      --token USDC \
      --to 0x... \
      --amount 10 \
      --network base \
      --qr-output /tmp/payment_qr.png
"""

import argparse
import json
import os
import sys
import subprocess
from typing import Optional, Dict, Any

# ============================================================================
# Configuration
# ============================================================================

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SKILL_DIR = os.path.dirname(SCRIPT_DIR)

# Token addresses by chain
TOKEN_ADDRESSES = {
    "base": {
        "USDC": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
        "USDT": "0xfde4C96c8593536E11F39842a902aB0E47ec4E54",
        "WETH": "0x4200000000000000000000000000000000000006",
        "ETH": "0x0000000000000000000000000000000000000000",  # Native
    },
    "ethereum": {
        "USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
        "USDT": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
        "WETH": "0xC02aaA39b223FE8D0A0e5C4F27ead9083C756Cc2",
        "ETH": "0x0000000000000000000000000000000000000000",  # Native
    }
}

# Chain IDs
CHAIN_IDS = {
    "base": 8453,
    "ethereum": 1,
}

# Token decimals
TOKEN_DECIMALS = {
    "USDC": 6,
    "USDT": 6,
    "WETH": 18,
    "ETH": 18,
}

# QR Code options
QR_SKILL_PATH = os.path.expanduser("~/.openclaw/workspace/skills/qrcode-gen-yn/agent.py")
# Try npm qrcode as fallback


# ============================================================================
# EIP-681 Link Generation
# ============================================================================

def generate_eip681_link(
    token: str,
    to: str,
    amount: float,
    network: str = "base"
) -> Dict[str, Any]:
    """
    Generate EIP-681 compliant payment link.
    
    Args:
        token: Token symbol (USDC, USDT, WETH, ETH)
        to: Recipient address
        amount: Amount to send (human readable, e.g., 10.5)
        network: Network name (base, ethereum)
    
    Returns:
        Dict with link, calldata, and transaction details
    """
    # Validate inputs
    if network not in CHAIN_IDS:
        return {
            "success": False,
            "error": f"Unsupported network: {network}. Supported: {list(CHAIN_IDS.keys())}"
        }
    
    if token.upper() not in TOKEN_ADDRESSES.get(network, {}):
        return {
            "success": False,
            "error": f"Unsupported token: {token} on {network}. Supported: {list(TOKEN_ADDRESSES.get(network, {}).keys())}"
        }
    
    token = token.upper()
    chain_id = CHAIN_IDS[network]
    token_address = TOKEN_ADDRESSES[network][token]
    decimals = TOKEN_DECIMALS.get(token, 18)
    
    # Convert amount to smallest unit
    amount_smallest = int(amount * (10 ** decimals))
    
    # Check if native token (ETH)
    is_native = token == "ETH"
    
    if is_native:
        # Native ETH transfer - simpler format
        link = f"ethereum:{to}@{chain_id}?value={amount_smallest}"
        calldata = None
    else:
        # ERC20 transfer
        # Transfer function: 0xa9059cbb(to, amount)
        to_padded = to[2:].lower().zfill(64)
        amount_hex = hex(amount_smallest)[2:].zfill(64)
        calldata = f"0xa9059cbb{to_padded}{amount_hex}"
        
        # EIP-681 format for ERC20
        link = f"ethereum:{token_address}@{chain_id}/transfer?address={to}&uint256={amount_smallest}"
    
    # MetaMask deep link
    metamask_link = f"https://metamask.app.link/send/{token_address}@{chain_id}/transfer?address={to}&uint256={amount_smallest}"
    
    return {
        "success": True,
        "network": network,
        "chain_id": chain_id,
        "token": token,
        "token_address": token_address,
        "recipient": to,
        "amount": amount,
        "amount_smallest": amount_smallest,
        "calldata": calldata,
        "eip681_link": link,
        "metamask_link": metamask_link,
        "transaction": {
            "to": token_address if not is_native else to,
            "value": f"0x{hex(amount_smallest)[2:]}" if is_native else "0x0",
            "data": calldata if calldata else "0x",
            "chain_id": chain_id
        }
    }


def generate_qr_code(link: str, output_path: str) -> Dict[str, Any]:
    """
    Generate QR code for payment link.
    
    Tries multiple methods in order:
    1. npm qrcode (if available)
    2. Python qrcode-gen-yn skill (if installed)
    
    Args:
        link: EIP-681 link or MetaMask link
        output_path: Path to save QR code PNG
    
    Returns:
        Dict with success status and path (or error with fallback)
    """
    # Ensure output directory exists
    output_dir = os.path.dirname(output_path)
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)
    
    # Method 1: Try npm qrcode (preferred, faster)
    try:
        result = subprocess.run(
            ["npx", "qrcode", "-t", "png", "-o", output_path, link],
            capture_output=True,
            text=True,
            timeout=10
        )
        
        if result.returncode == 0 and os.path.exists(output_path):
            return {
                "success": True,
                "qr_path": output_path,
                "method": "npm-qrcode",
                "link": link
            }
    except subprocess.TimeoutExpired:
        pass
    except FileNotFoundError:
        pass
    except Exception:
        pass
    
    # Method 2: Try Python qrcode-gen-yn skill
    if os.path.exists(QR_SKILL_PATH):
        try:
            result = subprocess.run(
                ["python3", QR_SKILL_PATH, link, output_path],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0 and os.path.exists(output_path):
                return {
                    "success": True,
                    "qr_path": output_path,
                    "method": "python-skill",
                    "link": link
                }
        except subprocess.TimeoutExpired:
            pass
        except Exception:
            pass
    
    # All methods failed - return fallback
    return {
        "success": False,
        "error": "QR code generation unavailable",
        "fallback": "Please display the link directly for mobile users",
        "link": link
    }


def format_output(result: Dict, include_qr: bool = False, qr_path: str = None) -> str:
    """Format result for display."""
    if not result.get("success"):
        return f"❌ Error: {result.get('error', 'Unknown error')}"
    
    lines = [
        "=" * 60,
        "EIP-681 Payment Link Generated",
        "=" * 60,
        "",
        f"Network: {result['network'].upper()} (Chain ID: {result['chain_id']})",
        f"Token: {result['token']} ({result['token_address']})",
        f"Recipient: {result['recipient']}",
        f"Amount: {result['amount']} {result['token']}",
        "",
        "-" * 60,
        "For Mobile Users (MetaMask)",
        "-" * 60,
        f"Click this link: {result['metamask_link']}",
        "",
        "-" * 60,
        "For Desktop Users",
        "-" * 60,
        "Scan QR code with MetaMask mobile app,",
        "or manually copy the transaction details below.",
        "",
        "Transaction Details:",
        f"  To: {result['transaction']['to']}",
        f"  Value: {result['transaction']['value']}",
        f"  Data: {result['calldata'][:30]}..." if result.get('calldata') else "  Data: (none - native transfer)",
        "",
        "-" * 60,
        "Raw EIP-681 Link",
        "-" * 60,
        result['eip681_link'],
    ]
    
    if include_qr and qr_path:
        lines.extend([
            "",
            "-" * 60,
            "QR Code",
            "-" * 60,
            f"Generated at: {qr_path}",
        ])
    
    return "\n".join(lines)


# ============================================================================
# CLI
# ============================================================================

def main():
    parser = argparse.ArgumentParser(
        description="EIP-681 Payment Link Generator (v0.3.3)",
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    
    subparsers = parser.add_subparsers(dest="command", help="Commands")
    
    # Generate command
    gen_parser = subparsers.add_parser("generate", help="Generate EIP-681 payment link")
    gen_parser.add_argument("--token", required=True, help="Token symbol (USDC, USDT, WETH, ETH)")
    gen_parser.add_argument("--to", required=True, help="Recipient address")
    gen_parser.add_argument("--amount", type=float, required=True, help="Amount to send")
    gen_parser.add_argument("--network", default="base", choices=["base", "ethereum"], help="Network")
    gen_parser.add_argument("--qr-output", help="Path to save QR code PNG (optional)")
    gen_parser.add_argument("--json", action="store_true", help="JSON output")
    
    args = parser.parse_args()
    
    if args.command == "generate":
        # Generate EIP-681 link
        result = generate_eip681_link(
            token=args.token,
            to=args.to,
            amount=args.amount,
            network=args.network
        )
        
        # Generate QR code if requested
        if args.qr_output and result.get("success"):
            qr_result = generate_qr_code(result['metamask_link'], args.qr_output)
            result["qr"] = qr_result
        
        # Output
        if args.json:
            print(json.dumps(result, indent=2, ensure_ascii=False))
        else:
            print(format_output(result, include_qr=bool(args.qr_output), qr_path=args.qr_output))
    
    else:
        parser.print_help()


if __name__ == "__main__":
    main()
```

### scripts/trading/whitelist.py

```python
#!/usr/bin/env python3
"""
Manage address whitelist for Safe Vault.

Usage:
    python whitelist.py --list
    python whitelist.py --add 0x... --name "Aave Pool" --max-amount 500
    python whitelist.py --remove 0x...
    python whitelist.py --enable
    python whitelist.py --disable
"""

import argparse
import json
import os
import sys

WHITELIST_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "config", "whitelist.json")


def load_whitelist() -> dict:
    """Load whitelist from file."""
    if os.path.exists(WHITELIST_PATH):
        with open(WHITELIST_PATH) as f:
            return json.load(f)
    return {"enabled": True, "addresses": []}


def save_whitelist(whitelist: dict):
    """Save whitelist to file."""
    os.makedirs(os.path.dirname(WHITELIST_PATH), exist_ok=True)
    with open(WHITELIST_PATH, "w") as f:
        json.dump(whitelist, f, indent=2)


def list_whitelist():
    """List all whitelisted addresses."""
    whitelist = load_whitelist()
    
    print(f"Whitelist Status: {'🟢 ENABLED' if whitelist.get('enabled', True) else '🔴 DISABLED'}")
    print()
    
    addresses = whitelist.get("addresses", [])
    if not addresses:
        print("No addresses in whitelist.")
        return
    
    print(f"{'Address':<44} {'Name':<20} {'Max Amount (USD)':<15}")
    print("-" * 80)
    
    for entry in addresses:
        addr = entry.get("address", "")
        name = entry.get("name", "N/A")
        max_amount = entry.get("max_amount_usd", "default")
        print(f"{addr:<44} {name:<20} {max_amount}")


def add_address(address: str, name: str = "", max_amount: float = None):
    """Add address to whitelist."""
    whitelist = load_whitelist()
    
    # Check if already exists
    address_lower = address.lower()
    for entry in whitelist.get("addresses", []):
        if entry.get("address", "").lower() == address_lower:
            print(f"Address {address} already in whitelist.")
            return
    
    # Add new entry
    entry = {
        "address": address,
        "name": name,
        "added_at": __import__("datetime").datetime.utcnow().isoformat()
    }
    
    if max_amount is not None:
        entry["max_amount_usd"] = max_amount
    
    whitelist.setdefault("addresses", []).append(entry)
    save_whitelist(whitelist)
    
    print(f"✅ Added {address} to whitelist")
    if name:
        print(f"   Name: {name}")
    if max_amount is not None:
        print(f"   Max Amount: ${max_amount}")


def remove_address(address: str):
    """Remove address from whitelist."""
    whitelist = load_whitelist()
    
    address_lower = address.lower()
    original_count = len(whitelist.get("addresses", []))
    
    whitelist["addresses"] = [
        entry for entry in whitelist.get("addresses", [])
        if entry.get("address", "").lower() != address_lower
    ]
    
    if len(whitelist["addresses"]) == original_count:
        print(f"Address {address} not found in whitelist.")
        return
    
    save_whitelist(whitelist)
    print(f"✅ Removed {address} from whitelist")


def enable_whitelist():
    """Enable whitelist checking."""
    whitelist = load_whitelist()
    whitelist["enabled"] = True
    save_whitelist(whitelist)
    print("✅ Whitelist enabled")


def disable_whitelist():
    """Disable whitelist checking (not recommended)."""
    whitelist = load_whitelist()
    whitelist["enabled"] = False
    save_whitelist(whitelist)
    print("⚠️ Whitelist DISABLED - All addresses allowed")


def main():
    parser = argparse.ArgumentParser(description="Manage Safe Vault whitelist")
    parser.add_argument("--list", action="store_true", help="List whitelisted addresses")
    parser.add_argument("--add", metavar="ADDRESS", help="Add address to whitelist")
    parser.add_argument("--remove", metavar="ADDRESS", help="Remove address from whitelist")
    parser.add_argument("--name", default="", help="Name for the address")
    parser.add_argument("--max-amount", type=float, help="Maximum amount in USD")
    parser.add_argument("--enable", action="store_true", help="Enable whitelist")
    parser.add_argument("--disable", action="store_true", help="Disable whitelist")
    
    args = parser.parse_args()
    
    if args.list:
        list_whitelist()
    elif args.add:
        add_address(args.add, args.name, args.max_amount)
    elif args.remove:
        remove_address(args.remove)
    elif args.enable:
        enable_whitelist()
    elif args.disable:
        disable_whitelist()
    else:
        parser.print_help()


if __name__ == "__main__":
    main()
```

### config/config.json

```json
{
  "_comment": "Web3 Investor Skill Configuration - v0.5.0",
  
  "chain": {
    "default": "base",
    "supported": ["base", "ethereum"],
    "rpc": {
      "base": ["https://base.llamarpc.com", "https://base.drpc.org"],
      "ethereum": ["https://eth.llamarpc.com", "https://rpc.flashbots.net/fast", "https://eth.drpc.org"]
    }
  },
  
  "api": {
    "url": "http://localhost:3000/api",
    "timeout_seconds": 30
  },
  
  "security": {
    "max_trade_usd": 10000,
    "max_slippage_percent": 3.0,
    "whitelist_enabled": false,
    "whitelist_tokens": ["USDC", "USDT", "DAI", "WETH", "ETH", "stETH", "rETH"],
    "whitelist_protocols": ["uniswap", "aave", "compound", "lido", "0x"],
    "double_confirm": {
      "enabled": true,
      "large_trade_threshold_usd": 5000,
      "high_slippage_threshold_percent": 1.0,
      "new_protocol_confirm": true,
      "confirm_phrase": "CONFIRM_HIGH_RISK"
    }
  },
  
  "preflight": {
    "enabled": true,
    "min_gas_eth": 0.001,
    "checks": {
      "swap": ["signer_api", "rpc_reachable", "gas_balance", "token_balance"],
      "deposit": ["signer_api", "rpc_reachable", "gas_balance", "token_balance"],
      "transfer": ["signer_api", "rpc_reachable", "gas_balance", "token_balance"]
    }
  },
  
  "rpc": {
    "timeout_seconds": 10,
    "fallback_enabled": true,
    "session_sticky": true
  },
  
  "discovery": {
    "data_sources": ["mcp", "dune", "defillama"],
    "mcp": {
      "enabled": true,
      "cache_ttl_seconds": 300,
      "servers": []
    },
    "dune": {
      "mcp_endpoint": "https://api.dune.com/mcp/v1",
      "auth": {
        "header": {
          "name": "x-dune-api-key",
          "env_var": "DUNE_API_KEY"
        },
        "query_param": {
          "name": "api_key",
          "env_var": "DUNE_API_KEY"
        }
      },
      "enabled": true
    },
    "min_apy": 0,
    "max_results": 20
  }
}
```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "bevanding",
  "slug": "web3-investor",
  "displayName": "Web3 Investor",
  "latest": {
    "version": "0.5.2",
    "publishedAt": 1772937871200,
    "commit": "https://github.com/openclaw/skills/commit/95022d63847ae5e9d8b4dbd0ab18f7759639a00a"
  },
  "history": [
    {
      "version": "0.4.0",
      "publishedAt": 1772724482673,
      "commit": "https://github.com/openclaw/skills/commit/bad10f9133776a4c2bc9c69709610335e085979a"
    },
    {
      "version": "0.3.3",
      "publishedAt": 1772709508630,
      "commit": "https://github.com/openclaw/skills/commit/f90afe4e5f5d8befe011fb366f77948c586e495f"
    },
    {
      "version": "0.2.2",
      "publishedAt": 1772623315892,
      "commit": "https://github.com/openclaw/skills/commit/82b73c383d797f27169fe55638f22cbf17bf4c9a"
    },
    {
      "version": "0.1.0-demo",
      "publishedAt": 1772536273560,
      "commit": "https://github.com/openclaw/skills/commit/8dbab623579dbdb21fdaa4dedbefdf5e13c83da5"
    }
  ]
}

```

### assets/templates/opportunity-report.md

```markdown
# Investment Opportunity Report

Generated: {{timestamp}}
Chain: {{chain}}

---

## Summary

| Metric | Value |
|--------|-------|
| Protocol | {{protocol_name}} |
| Pool/Contract | {{pool_name}} |
| APY | {{apy}}% |
| TVL | ${{tvl}} |
| Risk Level | {{risk_level}} |
| Lock Period | {{lock_period}} |

---

## Protocol Information

- **Name**: {{protocol_name}}
- **Category**: {{category}}
- **Contract Address**: `{{contract_address}}`
- **Chain**: {{chain}}
- **Audit Status**: {{audit_status}}
- **Live Since**: {{launch_date}}

---

## Risk Assessment

| Factor | Score | Notes |
|--------|-------|-------|
| Protocol Maturity | {{maturity_score}}/3 | {{maturity_notes}} |
| Audit Status | {{audit_score}}/3 | {{audit_notes}} |
| TVL Size | {{tvl_score}}/2 | {{tvl_notes}} |
| Decentralization | {{decentralization_score}}/2 | {{decentralization_notes}} |
| **Total** | **{{risk_score}}/10** | {{risk_level}} |

{% if warnings %}
### ⚠️ Warnings
{% for warning in warnings %}
- {{warning}}
{% endfor %}
{% endif %}

---

## Underlying Assets

{% for asset in underlying_assets %}
- **{{asset.symbol}}**: {{asset.name}} ({{asset.percentage}}%)
{% endfor %}

---

## Yield Breakdown

| Source | APY |
|--------|-----|
| Base APY | {{base_apy}}% |
| Reward APY | {{reward_apy}}% |
| **Total APY** | **{{apy}}%** |

---

## Financial Metrics

| Metric | Value |
|--------|-------|
| Min Deposit | {{min_deposit}} |
| Max Deposit | {{max_deposit}} |
| Withdrawal Fee | {{withdrawal_fee}} |
| Entry Fee | {{entry_fee}} |
| Performance Fee | {{performance_fee}} |

---

## How to Invest

### Step 1: Prepare Wallet
Ensure you have {{required_asset}} in your wallet.

### Step 2: Approve Token
```
approve({{contract_address}}, amount)
```

### Step 3: Deposit
```
deposit(amount, recipient_address)
```

### Contract Call Example
```javascript
// Using ethers.js
const contract = new ethers.Contract(
  '{{contract_address}}',
  ABI,
  signer
);
await contract.deposit(ethers.utils.parseUnits('100', 6));
```

---

## Risk-Adjusted Analysis

| Scenario | APY | Risk Score | Risk-Adjusted APY |
|----------|-----|------------|-------------------|
| Optimistic | {{optimistic_apy}}% | {{risk_score}} | {{optimistic_adjusted}}% |
| Base Case | {{apy}}% | {{risk_score}} | {{base_adjusted}}% |
| Pessimistic | {{pessimistic_apy}}% | {{risk_score}} | {{pessimistic_adjusted}}% |

---

## Recommendation

{{recommendation}}

**Verdict**: {{verdict}} (Risk-Adjusted APY: {{risk_adjusted_apy}}%)

---

*Report generated by Web3 Investor Skill*
```

### assets/templates/portfolio-report.md

```markdown
# Portfolio Report

Address: `{{address}}`
Chain: {{chain}}
Generated: {{timestamp}}

---

## Summary

| Metric | Value |
|--------|-------|
| Total Value (USD) | ${{total_value_usd}} |
| 24h Change | {{change_24h}}% |
| 7d Change | {{change_7d}}% |
| Position Count | {{position_count}} |
| Active Protocols | {{protocol_count}} |

---

## Asset Distribution

### By Type

| Type | Value (USD) | Percentage |
|------|-------------|------------|
| Tokens | ${{tokens_value}} | {{tokens_pct}}% |
| DeFi Positions | ${{defi_value}} | {{defi_pct}}% |
| NFTs | ${{nft_value}} | {{nft_pct}}% |
| **Total** | **${{total_value_usd}}** | **100%** |

### By Asset

{% for asset in assets %}
| {{asset.symbol}} | ${{asset.value_usd}} | {{asset.percentage}}% | {{asset.protocol or 'Wallet'}} |
{% endfor %}

---

## DeFi Positions

{% for position in defi_positions %}
### {{position.protocol}} - {{position.name}}

| Field | Value |
|-------|-------|
| Type | {{position.type}} |
| Deposited | {{position.deposited}} {{position.asset}} (${{position.deposited_usd}}) |
| Current Value | ${{position.current_value_usd}} |
| APY | {{position.apy}}% |
| Rewards | {{position.rewards}} |
| Health Factor | {{position.health_factor}} |

{% if position.expiry %}
**⏰ Expires**: {{position.expiry}} ({{position.days_until_expiry}} days)
{% endif %}

{% endfor %}

---

## Expiring Positions

{% if expiring_positions %}
| Protocol | Position | Expires | Days Left | Value |
|----------|----------|---------|-----------|-------|
{% for pos in expiring_positions %}
| {{pos.protocol}} | {{pos.name}} | {{pos.expiry}} | {{pos.days_left}} | ${{pos.value}} |
{% endfor %}
{% else %}
No positions expiring in the next 30 days.
{% endif %}

---

## Yield Summary

| Source | APY | Value | Annual Yield |
|--------|-----|-------|--------------|
{% for yield in yield_summary %}
| {{yield.source}} | {{yield.apy}}% | ${{yield.value}} | ${{yield.annual_yield}} |
{% endfor %}
| **Total** | **{{weighted_apy}}%** | **${{total_value_usd}}** | **${{total_annual_yield}}** |

---

## Risk Exposure

| Risk Type | Exposure | Details |
|-----------|----------|---------|
| Smart Contract | {{sc_risk}} | {{sc_protocols}} |
| Liquidation | {{liq_risk}} | {{liq_details}} |
| Impermanent Loss | {{il_risk}} | {{il_details}} |
| Oracle | {{oracle_risk}} | {{oracle_details}} |

---

## Recommendations

{% for rec in recommendations %}
{{rec.priority}}. **{{rec.title}}**: {{rec.description}}
{% endfor %}

---

## Transaction History (Last 7 Days)

| Date | Action | Protocol | Amount | Asset |
|------|--------|----------|--------|-------|
{% for tx in recent_transactions %}
| {{tx.date}} | {{tx.action}} | {{tx.protocol}} | {{tx.amount}} | {{tx.asset}} |
{% endfor %}

---

*Report generated by Web3 Investor Skill*
```

### references/mcp-servers.md

```markdown
# MCP Servers for Web3 Data

This document lists MCP (Model Context Protocol) servers that provide Web3 data.

## What is MCP?

Model Context Protocol is a standardized way for AI models to access external data sources. MCP servers act as bridges between AI agents and blockchain data.

## Recommended MCP Servers

### 1. Ethereum MCP
- **Repository**: https://github.com/your-org/ethereum-mcp
- **Capabilities**: Balance queries, transaction simulation, contract reads
- **Setup**: `npx ethereum-mcp --rpc-url YOUR_RPC_URL`
- **Free**: Yes (uses your RPC)

### 2. Etherscan MCP
- **Repository**: https://github.com/your-org/etherscan-mcp
- **Capabilities**: Transaction history, contract verification, token info
- **Setup**: `ETHERSCAN_API_KEY=xxx npx etherscan-mcp`
- **Free**: Yes (free tier available)

### 3. DefiLlama MCP
- **Repository**: https://github.com/your-org/defillama-mcp
- **Capabilities**: TVL data, yield rankings, protocol info
- **Setup**: `npx defillama-mcp`
- **Free**: Yes (no API key required)

### 4. Dune Analytics MCP
- **Repository**: https://github.com/your-org/dune-mcp
- **Capabilities**: Custom SQL queries on blockchain data
- **Setup**: `DUNE_API_KEY=xxx npx dune-mcp`
- **Free**: Free tier available (limited queries)

## Integration Pattern

```python
# Check MCP availability first, fall back to direct API
async def get_tvl(protocol: str) -> float:
    if mcp_available("defillama"):
        return await mcp_query("defillama", "get_tvl", protocol)
    else:
        return await defillama_api_get_tvl(protocol)
```

## Adding New MCP Servers

When adding new MCP servers:
1. Verify the server supports standard MCP protocol
2. Test with `mcp-inspector` tool
3. Document required environment variables
4. Add fallback logic for when MCP is unavailable
```

### references/protocols.md

```markdown
# Known Protocol Registry

This document contains metadata for common DeFi protocols on Ethereum mainnet.

## Lending Protocols

### Aave V3
- **Contract**: 0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2
- **Category**: Lending
- **Risk Level**: Low
- **Audit**: Multiple (OpenZeppelin, Trail of Bits)
- **TVL**: > $5B (check DefiLlama for current)
- **Docs**: https://docs.aave.com/

### Compound V3
- **Contract**: 0xc3d688B66703497DAA19211EEdff47f25384cdc3
- **Category**: Lending
- **Risk Level**: Low
- **Audit**: Multiple (OpenZeppelin)
- **TVL**: > $2B
- **Docs**: https://docs.compound.finance/

### MakerDAO (Spark)
- **Contract**: Various (see docs)
- **Category**: Lending
- **Risk Level**: Low
- **Audit**: Multiple
- **TVL**: > $8B
- **Docs**: https://docs.makerdao.com/

## Liquid Staking

### Lido (stETH)
- **Contract**: 0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84
- **Category**: Liquid Staking
- **Risk Level**: Low
- **Audit**: Multiple (Quantstamp, MixBytes)
- **TVL**: > $15B
- **Docs**: https://docs.lido.fi/

### Rocket Pool (rETH)
- **Contract**: 0xae78736Cd615f374D3085123A210448E74Fc6393
- **Category**: Liquid Staking
- **Risk Level**: Low
- **Audit**: Multiple (Sigma Prime)
- **TVL**: > $2B
- **Docs**: https://docs.rocketpool.net/

## DEXs

### Uniswap V3
- **Factory**: 0x1F98431c8aD98523631AE4a59f267346ea31F984
- **Category**: DEX
- **Risk Level**: Low
- **Audit**: Multiple
- **TVL**: > $4B
- **Docs**: https://docs.uniswap.org/

### Curve Finance
- **Contract**: 0xD533a949740bb3306d119CC777fa900bA034cd52
- **Category**: DEX (Stablecoins)
- **Risk Level**: Low
- **Audit**: Multiple
- **TVL**: > $2B
- **Docs**: https://docs.curve.fi/

## Yield Aggregators

### Yearn V3
- **Registry**: 0x3c91D8ba3C8cB06D9CFe5b8F31c68a746f0f15B6
- **Category**: Yield Aggregator
- **Risk Level**: Medium
- **Audit**: Multiple
- **TVL**: Variable
- **Docs**: https://docs.yearn.fi/

## Protocol Metadata Schema

```json
{
  "name": "Protocol Name",
  "address": "0x...",
  "chain": "ethereum",
  "category": "lending|dex|staking|aggregator",
  "risk_level": "low|medium|high",
  "audit_status": "multiple|single|community|none",
  "tvl": 1000000000,
  "maturity_days": 730,
  "has_timelock": true,
  "governance": "dao|multisig|admin",
  "docs_url": "https://...",
  "icon_url": "https://..."
}
```

## Adding New Protocols

To add a new protocol, create a JSON file in `config/protocols/`:

```json
{
  "name": "New Protocol",
  "address": "0x...",
  "chain": "ethereum",
  "category": "lending",
  "risk_level": "medium",
  "audit_status": "single",
  "docs_url": "https://..."
}
```

Then run:
```bash
python scripts/discovery/analyze_protocol.py --add-config config/protocols/new-protocol.json
```
```

### references/risk-framework.md

```markdown
# Risk Assessment Framework

## Risk Levels

| Level | Score | Description | Typical Characteristics |
|-------|-------|-------------|------------------------|
| **Low** | 0-3 | Blue-chip, battle-tested | TVL > $1B, Audited, 2+ years live |
| **Medium** | 4-6 | Established but some risk | TVL $100M-$1B, Audited, 1+ year live |
| **High** | 7-10 | Experimental or new | TVL < $100M, No audit, < 1 year live |

## Risk Factors

### 1. Protocol Maturity (0-3 points)

| Criteria | Points |
|----------|--------|
| Live > 2 years with no major exploits | 0 |
| Live 1-2 years | 1 |
| Live < 1 year | 2 |
| Live < 3 months | 3 |

### 2. Audit Status (0-3 points)

| Criteria | Points |
|----------|--------|
| Multiple audits from top firms (OpenZeppelin, Trail of Bits, etc.) | 0 |
| Single audit from reputable firm | 1 |
| Community audit only | 2 |
| No audit | 3 |

### 3. TVL Size (0-2 points)

| TVL | Points |
|-----|--------|
| > $1 billion | 0 |
| $100M - $1B | 1 |
| < $100M | 2 |

### 4. Decentralization (0-2 points)

| Criteria | Points |
|----------|--------|
| DAO governance, no admin keys or timelocked | 0 |
| Timelocked admin keys (> 48h) | 1 |
| Admin keys, no timelock | 2 |

## Risk Calculation

```
Risk Score = Protocol Maturity + Audit Status + TVL Size + Decentralization
```

### Examples

| Protocol | Maturity | Audit | TVL | Decentralization | Total | Level |
|----------|----------|-------|-----|------------------|-------|-------|
| Aave V3 | 0 | 0 | 0 | 1 | 1 | Low |
| Compound | 0 | 0 | 0 | 0 | 0 | Low |
| New Lending Protocol | 3 | 3 | 2 | 2 | 10 | High |

## Risk-Adjusted APY

When comparing opportunities, consider risk-adjusted returns:

```
Risk-Adjusted APY = Nominal APY × (1 - Risk Score / 20)
```

### Example

| Protocol | APY | Risk Score | Risk-Adjusted APY |
|----------|-----|------------|-------------------|
| Aave USDC | 5% | 1 | 4.75% |
| New Protocol | 20% | 8 | 12% |

## Special Risk Flags

Additional risk factors that should be noted but not scored:

- ⚠️ **Fork Risk**: Protocol is a fork of another protocol with issues
- ⚠️ **Oracle Risk**: Uses custom oracle instead of Chainlink
- ⚠️ **Leverage Risk**: Involves leverage or looping
- ⚠️ **Illicit Risk**: Protocol in sanctioned jurisdiction
- ⚠️ **Regulatory Risk**: Under regulatory scrutiny

## Usage in Discovery

When running `find_opportunities.py`:

```bash
# Only show low-risk opportunities
python scripts/discovery/find_opportunities.py --max-risk low

# Filter by minimum TVL (in USD)
python scripts/discovery/find_opportunities.py --min-tvl 1000000

# Require audit
python scripts/discovery/find_opportunities.py --require-audit
```
```

### references/safe-vault-spec.md

```markdown
# Safe Vault Technical Specification

## Overview

Safe Vault is a secure transaction execution framework that enables AI agents to interact with Web3 protocols safely. It follows a phased approach:

- **Phase 1**: Simulation + Manual Confirmation
- **Phase 2**: Limited Auto-Execution (≤ $100 USD by default)
- **Phase 3**: Full Autonomous Execution

## Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                      Agent Request                           │
│  "Invest 100 USDC into Aave with max 5% slippage"           │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                   Intent Parser                              │
│  - Parse natural language to structured intent              │
│  - Identify protocol, action, amount, parameters            │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                   Whitelist Check                            │
│  - Is target address in whitelist?                          │
│  - Is action type allowed?                                   │
│  - Is amount within limits?                                   │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                   Risk Analysis                              │
│  - Protocol risk assessment                                  │
│  - Slippage estimation                                        │
│  - Gas cost estimation                                        │
│  - Simulate transaction                                       │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                   Execution Decision                         │
│                                                              │
│  Phase 1: Generate signing request → Human confirms         │
│  Phase 2: Auto-sign if ≤ limit → Human reviews in batch     │
│  Phase 3: Auto-sign within parameters → Log only             │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                   Transaction Execution                      │
│  - Submit to mempool                                         │
│  - Monitor confirmation                                       │
│  - Report result                                              │
└─────────────────────────────────────────────────────────────┘
```

## Phase 1: Simulation Mode (Current)

### Transaction Flow

```
Agent → Intent Parser → Whitelist Check → Risk Analysis
→ Simulation → Signing Request → Human Review → Manual Execution
```

### Signing Request Format

```json
{
  "request_id": "uuid",
  "timestamp": "2024-01-15T10:30:00Z",
  "intent": {
    "action": "deposit",
    "protocol": "aave-v3",
    "amount": "100",
    "asset": "USDC",
    "chain": "ethereum"
  },
  "transaction": {
    "to": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2",
    "value": "0",
    "data": "0x...",
    "gas_limit": 250000,
    "gas_price": "20000000000"
  },
  "simulation": {
    "success": true,
    "gas_used": 185000,
    "state_changes": [
      {
        "contract": "0x...",
        "change": "USDC balance -100"
      },
      {
        "contract": "0x...",
        "change": "aUSDC balance +100"
      }
    ]
  },
  "risk_assessment": {
    "risk_level": "low",
    "risk_score": 1,
    "warnings": []
  },
  "approval_url": "https://safe-vault.example/approve/uuid"
}
```

### Human Confirmation Methods

1. **CLI Confirmation**: Agent displays signing request, user copies to wallet
2. **QR Code**: Generate QR code for mobile wallet scanning
3. **Safe{Wallet}**: Create pending transaction in Safe UI

## Phase 2: Limited Auto-Execution

### Requirements

1. **Secure Key Storage**
   - Use environment variables or secure vault
   - Never log or expose private keys
   - Consider using AWS KMS / GCP KMS

2. **Transaction Limits**
   - Default: $100 USD equivalent
   - Configurable per asset and protocol
   - Daily cumulative limit

3. **Rate Limiting**
   - Max 10 transactions per hour
   - Max 50 transactions per day
   - Configurable cooldown periods

4. **Monitoring**
   - All transactions logged
   - Alert on suspicious patterns
   - Daily summary to user

### Configuration

```json
{
  "phase_2": {
    "enabled": true,
    "limits": {
      "per_transaction_usd": 100,
      "daily_total_usd": 500,
      "hourly_count": 10,
      "daily_count": 50
    },
    "whitelist": {
      "required": true,
      "addresses": [
        {
          "address": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2",
          "name": "Aave Pool",
          "max_amount_usd": 500
        }
      ]
    },
    "notifications": {
      "on_execute": true,
      "daily_summary": true,
      "channel": "telegram"
    }
  }
}
```

## Phase 3: Full Autonomous

### Requirements

1. **Comprehensive Audit**
   - Third-party security audit
   - Formal verification of critical paths
   - Bug bounty program

2. **Safety Mechanisms**
   - Circuit breakers
   - Automatic pause on anomaly detection
   - Insurance fund

3. **Governance**
   - Multi-sig for critical changes
   - Timelock for parameter updates
   - User veto power

### Not Recommended For
- Individual users without insurance
- Amounts > $10,000 USD
- Novel protocols (use Phase 1/2)

## Wallet Integration

### MetaMask (EOA)

```python
# Phase 1: Generate request for user to sign
request = generate_signing_request(tx)
print(f"Please sign this transaction in MetaMask:")
print(f"To: {request['to']}")
print(f"Data: {request['data']}")

# Phase 2+: Use web3.py with private key
from web3 import Web3
w3 = Web3(...)
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
```

### Safe{Wallet} (Multi-sig)

```python
# Build Safe transaction
safe_tx = safe.build_transaction(
    to=tx['to'],
    value=tx['value'],
    data=tx['data'],
    safe_nonce=safe_nonce
)

# Phase 1: Create pending transaction
# User approves in Safe UI
safe_url = f"https://app.safe.global/transactions/queue?safe={safe_address}"

# Phase 2+: Sign and propose
safe.sign_transaction(safe_tx)
safe.propose_transaction(safe_tx)
```

## Security Checklist

- [ ] Private keys stored securely (env vars, vault)
- [ ] Whitelist enforced before any execution
- [ ] All transactions simulated before signing
- [ ] Rate limits in place
- [ ] Transaction logging enabled
- [ ] Error handling for failed transactions
- [ ] Gas price limits to prevent frontrunning
- [ ] Slippage protection for DEX trades
```

### scripts/discovery/analyze_protocol.py

```python
#!/usr/bin/env python3
"""
Analyze a specific protocol for investment suitability.

Usage:
    python analyze_protocol.py --protocol aave-v3
    python analyze_protocol.py --protocol uniswap --output json
"""

import argparse
import json
import sys
import urllib.request
import urllib.error
from datetime import datetime
from typing import Optional

DEFILLAMA_PROTOCOLS = "https://api.llama.fi/protocols"
DEFILLAMA_PROTOCOL = "https://api.llama.fi/protocol/{slug}"


def fetch_protocol(slug: str) -> Optional[dict]:
    """Fetch protocol data from DefiLlama."""
    url = DEFILLAMA_PROTOCOL.format(slug=slug.lower())
    try:
        with urllib.request.urlopen(url, timeout=30) as response:
            return json.loads(response.read().decode())
    except urllib.error.URLError as e:
        print(f"Error fetching protocol: {e}", file=sys.stderr)
        return None


def fetch_all_protocols() -> list:
    """Fetch all protocols list."""
    try:
        with urllib.request.urlopen(DEFILLAMA_PROTOCOLS, timeout=30) as response:
            return json.loads(response.read().decode())
    except urllib.error.URLError as e:
        print(f"Error fetching protocols list: {e}", file=sys.stderr)
        return []


def find_protocol_slug(name: str) -> Optional[str]:
    """Find protocol slug by name."""
    protocols = fetch_all_protocols()
    name_lower = name.lower()
    
    for p in protocols:
        if name_lower in p.get("name", "").lower():
            return p.get("slug") or p.get("name", "").lower().replace(" ", "-")
        if name_lower in p.get("slug", "").lower():
            return p.get("slug")
    
    return None


def calculate_risk_score(protocol: dict) -> dict:
    """Calculate detailed risk score."""
    scores = {}
    total = 0
    
    # Maturity (0-3)
    audits = protocol.get("audits", [])
    if len(audits) >= 2:
        scores["maturity"] = {"score": 0, "note": "Multiple audits, well established"}
    elif len(audits) == 1:
        scores["maturity"] = {"score": 1, "note": "Single audit"}
    elif protocol.get("audit_note"):
        scores["maturity"] = {"score": 2, "note": "Audit in progress or partial"}
    else:
        scores["maturity"] = {"score": 3, "note": "No audit"}
    total += scores["maturity"]["score"]
    
    # TVL (0-2)
    tvl = protocol.get("tvl", 0)
    # Handle case where tvl might be a list or other type
    if isinstance(tvl, (list, dict)):
        tvl = 0
    tvl = tvl or 0
    if tvl > 1_000_000_000:
        scores["tvl"] = {"score": 0, "note": f"TVL > $1B (${tvl:,.0f})"}
    elif tvl > 100_000_000:
        scores["tvl"] = {"score": 1, "note": f"TVL $100M-$1B (${tvl:,.0f})"}
    else:
        scores["tvl"] = {"score": 2, "note": f"TVL < $100M (${tvl:,.0f})"}
    total += scores["tvl"]["score"]
    
    # Decentralization (0-2)
    if protocol.get("governanceID"):
        scores["decentralization"] = {"score": 0, "note": "DAO governed"}
    elif protocol.get("gecko_id"):
        scores["decentralization"] = {"score": 1, "note": "Has token, governance unclear"}
    else:
        scores["decentralization"] = {"score": 2, "note": "No governance token"}
    total += scores["decentralization"]["score"]
    
    # Audit status (0-3) - separate from maturity
    if isinstance(audits, list):
        audit_count = len(audits)
    else:
        audit_count = 0
    
    if audit_count >= 2:
        scores["audit"] = {"score": 0, "note": f"{audit_count} audits"}
    elif audit_count == 1:
        scores["audit"] = {"score": 1, "note": "1 audit"}
    elif protocol.get("audit_note"):
        scores["audit"] = {"score": 2, "note": protocol.get("audit_note", "Audit issues")}
    else:
        scores["audit"] = {"score": 3, "note": "No audit"}
    total += scores["audit"]["score"]
    
    return {
        "total": min(total, 10),
        "breakdown": scores,
        "level": "low" if total <= 3 else "medium" if total <= 6 else "high"
    }


def analyze_protocol(name_or_slug: str) -> Optional[dict]:
    """
    Analyze a protocol and return detailed information.
    
    Args:
        name_or_slug: Protocol name or slug
    
    Returns:
        Protocol analysis dict
    """
    # Try as slug first
    protocol = fetch_protocol(name_or_slug)
    
    # If not found, search by name
    if not protocol:
        slug = find_protocol_slug(name_or_slug)
        if slug:
            protocol = fetch_protocol(slug)
    
    if not protocol:
        return None
    
    # Calculate risk
    risk = calculate_risk_score(protocol)
    
    # Get chains
    chains = protocol.get("chains", [])
    if isinstance(chains, dict):
        chains = list(chains.keys())
    
    # Get TVL by chain
    chain_tvl = {}
    if protocol.get("chainTvl"):
        ctvl = protocol.get("chainTvl")
        if isinstance(ctvl, dict):
            chain_tvl = ctvl
    
    # Get total TVL (handle various data types)
    raw_tvl = protocol.get("tvl", 0)
    if isinstance(raw_tvl, (int, float)):
        total_tvl = raw_tvl
    else:
        total_tvl = 0
    
    # Build analysis
    analysis = {
        "name": protocol.get("name", ""),
        "slug": protocol.get("slug", ""),
        "logo": protocol.get("logo", ""),
        "url": protocol.get("url", ""),
        "description": protocol.get("description", ""),
        "category": protocol.get("category", ""),
        "chains": chains,
        "tvl": {
            "total": total_tvl,
            "by_chain": chain_tvl,
            "change_24h": protocol.get("change_1d", 0) or 0,
            "change_7d": protocol.get("change_7d", 0) or 0
        },
        "risk": risk,
        "audits": [
            {
                "name": a.get("name", ""),
                "link": a.get("link", ""),
                "time": a.get("time", "")
            }
            for a in protocol.get("audits", [])
        ],
        "exploits": protocol.get("exploits", []),
        "governance": {
            "has_token": bool(protocol.get("gecko_id")),
            "token_name": "",
            "token_symbol": ""
        },
        "social": {
            "twitter": protocol.get("twitter", ""),
            "discord": protocol.get("discord", ""),
            "telegram": protocol.get("telegram", "")
        },
        "defillama_url": f"https://defillama.com/protocol/{protocol.get('slug', '')}"
    }
    
    return analysis


def format_report(analysis: dict, output_format: str = "markdown") -> str:
    """Format protocol analysis as report."""
    if not analysis:
        return "Protocol not found."
    
    if output_format == "json":
        return json.dumps(analysis, indent=2)
    
    # Markdown format
    lines = [
        f"# Protocol Analysis: {analysis['name']}",
        "",
        f"**Category**: {analysis['category']}",
        f"**Chains**: {', '.join(analysis['chains'])}",
        f"**Website**: {analysis['url']}",
        "",
        "---",
        "",
        "## 📊 TVL Overview",
        "",
        f"| Metric | Value |",
        f"|--------|-------|",
        f"| Total TVL | ${analysis['tvl']['total']:,.0f} |",
        f"| 24h Change | {analysis['tvl']['change_24h']:.2f}% |",
        f"| 7d Change | {analysis['tvl']['change_7d']:.2f}% |",
        ""
    ]
    
    if analysis['tvl']['by_chain']:
        lines.extend([
            "### TVL by Chain",
            "",
            "| Chain | TVL |",
            "|-------|-----|"
        ])
        for chain, tvl in sorted(analysis['tvl']['by_chain'].items(), 
                                   key=lambda x: x[1], reverse=True)[:5]:
            lines.append(f"| {chain} | ${tvl:,.0f} |")
        lines.append("")
    
    lines.extend([
        "---",
        "",
        "## ⚠️ Risk Assessment",
        "",
        f"| Factor | Score | Notes |",
        f"|--------|-------|-------|"
    ])
    
    for factor, data in analysis['risk']['breakdown'].items():
        lines.append(f"| {factor.title()} | {data['score']}/3 | {data['note']} |")
    
    lines.extend([
        f"| **Total** | **{analysis['risk']['total']}/10** | **{analysis['risk']['level'].upper()} RISK** |",
        ""
    ])
    
    if analysis['exploits']:
        lines.extend([
            "### 🚨 Known Exploits",
            ""
        ])
        for exploit in analysis['exploits']:
            lines.append(f"- {exploit}")
        lines.append("")
    
    lines.extend([
        "---",
        "",
        "## 🔐 Audits",
        ""
    ])
    
    if analysis['audits']:
        for audit in analysis['audits']:
            lines.append(f"- [{audit['name']}]({audit['link']}) ({audit['time'] or 'N/A'})")
    else:
        lines.append("No audits found.")
    
    lines.extend([
        "",
        "---",
        "",
        "## 🏛️ Governance",
        "",
        f"- **Token**: {analysis['governance']['token_symbol'] or 'No governance token'}",
        f"- **Decentralized**: {'Yes' if analysis['governance']['has_token'] else 'No'}",
        ""
    ])
    
    if analysis['social']['twitter'] or analysis['social']['discord']:
        lines.extend([
            "---",
            "",
            "## 📱 Social",
            ""
        ])
        if analysis['social']['twitter']:
            lines.append(f"- Twitter: {analysis['social']['twitter']}")
        if analysis['social']['discord']:
            lines.append(f"- Discord: {analysis['social']['discord']}")
        if analysis['social']['telegram']:
            lines.append(f"- Telegram: {analysis['social']['telegram']}")
    
    lines.extend([
        "",
        "---",
        "",
        f"📊 [View on DefiLlama]({analysis['defillama_url']})"
    ])
    
    return "\n".join(lines)


def main():
    parser = argparse.ArgumentParser(description="Analyze a DeFi protocol")
    parser.add_argument("--protocol", required=True, help="Protocol name or slug")
    parser.add_argument("--output", choices=["markdown", "json"], default="markdown",
                        help="Output format")
    
    args = parser.parse_args()
    
    analysis = analyze_protocol(args.protocol)
    
    if not analysis:
        print(f"Protocol '{args.protocol}' not found.", file=sys.stderr)
        sys.exit(1)
    
    print(format_report(analysis, args.output))


if __name__ == "__main__":
    main()
```

### scripts/discovery/dune_mcp.py

```python
#!/usr/bin/env python3
"""
Dune MCP Adapter for Web3 Investor Skill

Provides a clean interface to interact with Dune's MCP server
for blockchain data discovery and analysis.

Usage:
    python dune_mcp.py search "aave lending ethereum" --category spell
    python dune_mcp.py query 12345  # Execute a query by ID
    python dune_mcp.py results <execution_id>  # Get execution results
"""

import argparse
import json
import sys
import urllib.request
import urllib.error
from typing import Optional, Dict, Any, List

# Dune MCP Configuration
DUNE_MCP_URL = "https://api.dune.com/mcp/v1"
DUNE_API_KEY = "v9j8OB9igHEm46MDI9PvxA4nSKu8ZuCI"  # TODO: Move to env variable


def call_mcp_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
    """
    Call a Dune MCP tool and return the result.
    
    Handles SSE (Server-Sent Events) response format.
    """
    payload = {
        "jsonrpc": "2.0",
        "method": "tools/call",
        "id": 1,
        "params": {
            "name": tool_name,
            "arguments": arguments
        }
    }
    
    headers = {
        "x-dune-api-key": DUNE_API_KEY,
        "Content-Type": "application/json",
        "Accept": "application/json, text/event-stream"
    }
    
    try:
        data = json.dumps(payload).encode('utf-8')
        req = urllib.request.Request(DUNE_MCP_URL, data=data, headers=headers, method='POST')
        
        with urllib.request.urlopen(req, timeout=120) as response:
            # Handle SSE response
            response_text = response.read().decode('utf-8')
            
            # Parse SSE format: "event: message\nid: xxx\ndata: {...}"
            result = None
            for line in response_text.split('\n'):
                if line.startswith('data: '):
                    try:
                        parsed = json.loads(line[6:])
                        if 'result' in parsed:
                            result = parsed['result']
                        elif 'error' in parsed:
                            return {"error": parsed['error']}
                    except json.JSONDecodeError:
                        continue
            
            return result or {"error": "No result found in response"}
            
    except urllib.error.URLError as e:
        return {"error": str(e)}
    except Exception as e:
        return {"error": f"Unexpected error: {e}"}


def search_tables(
    query: str,
    categories: Optional[List[str]] = None,
    blockchains: Optional[List[str]] = None,
    limit: int = 20,
    include_schema: bool = False
) -> Dict[str, Any]:
    """
    Search for Dune tables matching the query.
    
    Args:
        query: Natural language query (e.g., "aave lending ethereum")
        categories: Filter by category (canonical, decoded, spell, community)
        blockchains: Filter by blockchain (e.g., ethereum, arbitrum)
        limit: Maximum number of results
        include_schema: Include column schema in results
    
    Returns:
        Dict with search results
    """
    arguments = {
        "query": query,
        "limit": limit,
        "includeSchema": include_schema
    }
    
    if categories:
        arguments["categories"] = categories
    if blockchains:
        arguments["blockchains"] = blockchains
    
    return call_mcp_tool("searchTables", arguments)


def execute_query(query_id: int, parameters: Optional[List[Dict]] = None) -> Dict[str, Any]:
    """
    Execute a Dune query by ID.
    
    Args:
        query_id: The numeric ID of the query
        parameters: Optional query parameters
    
    Returns:
        Dict with execution_id and state
    """
    arguments = {"query_id": query_id}
    if parameters:
        arguments["query_parameters"] = parameters
    
    return call_mcp_tool("executeQueryById", arguments)


def get_execution_results(execution_id: str, limit: int = 100, timeout: int = 60) -> Dict[str, Any]:
    """
    Get results of a query execution.
    
    Args:
        execution_id: The execution ID
        limit: Maximum number of rows to return
        timeout: Seconds to wait for completion
    
    Returns:
        Dict with execution results
    """
    arguments = {
        "executionId": execution_id,
        "limit": limit,
        "timeout": timeout
    }
    
    return call_mcp_tool("getExecutionResults", arguments)


def create_query(name: str, sql: str, description: str = "", is_temp: bool = True) -> Dict[str, Any]:
    """
    Create a new Dune query.
    
    Args:
        name: Query title
        sql: SQL query text
        description: Query description
        is_temp: Whether to create as temporary query
    
    Returns:
        Dict with query_id
    """
    arguments = {
        "name": name,
        "query": sql,
        "is_temp": is_temp
    }
    if description:
        arguments["description"] = description
    
    return call_mcp_tool("createDuneQuery", arguments)


def list_blockchains(categories: Optional[List[str]] = None, query: str = "*") -> Dict[str, Any]:
    """
    List available blockchains with table counts.
    
    Args:
        categories: Filter by category
        query: Semantic filter
    
    Returns:
        Dict with blockchain list
    """
    arguments = {"query": query}
    if categories:
        arguments["categories"] = categories
    
    return call_mcp_tool("listBlockchains", arguments)


def get_usage() -> Dict[str, Any]:
    """
    Get current billing period usage.
    
    Returns:
        Dict with usage information
    """
    return call_mcp_tool("getUsage", {})


# ============================================================================
# Convenience functions for Web3 Investor Skill
# ============================================================================

def find_yield_pools(chain: str = "ethereum", limit: int = 20) -> List[Dict]:
    """
    Find DeFi yield pools using Dune's curated datasets.
    
    Returns structured opportunity data compatible with find_opportunities.py
    """
    # Search for yield/lending related tables
    result = search_tables(
        query="defi yield lending APY",
        categories=["spell"],
        blockchains=[chain],
        limit=limit,
        include_schema=True
    )
    
    if "error" in result:
        return []
    
    return result.get("results", [])


def format_table_results(results: List[Dict]) -> str:
    """Format table search results for display."""
    if not results:
        return "No tables found."
    
    lines = ["# Dune Tables Found\n"]
    for table in results:
        lines.append(f"## {table.get('full_name', 'N/A')}")
        lines.append(f"- **Category**: {table.get('category', 'N/A')}")
        lines.append(f"- **Blockchains**: {', '.join(table.get('blockchains', []))}")
        if table.get('description'):
            lines.append(f"- **Description**: {table['description']}")
        if table.get('partition_columns'):
            lines.append(f"- **Partition Columns**: {', '.join(table['partition_columns'])}")
        lines.append("")
    
    return "\n".join(lines)


def main():
    parser = argparse.ArgumentParser(description="Dune MCP Adapter for Web3 Investor")
    subparsers = parser.add_subparsers(dest="command", help="Available commands")
    
    # Search tables
    search_parser = subparsers.add_parser("search", help="Search Dune tables")
    search_parser.add_argument("query", help="Search query")
    search_parser.add_argument("--category", "-c", action="append", help="Filter by category (spell, decoded, canonical, community)")
    search_parser.add_argument("--chain", help="Filter by blockchain")
    search_parser.add_argument("--limit", "-n", type=int, default=20, help="Number of results")
    search_parser.add_argument("--schema", action="store_true", help="Include column schema")
    search_parser.add_argument("--json", action="store_true", help="Output as JSON")
    
    # Execute query
    exec_parser = subparsers.add_parser("exec", help="Execute a Dune query")
    exec_parser.add_argument("query_id", type=int, help="Query ID to execute")
    
    # Get results
    results_parser = subparsers.add_parser("results", help="Get execution results")
    results_parser.add_argument("execution_id", help="Execution ID")
    results_parser.add_argument("--limit", "-n", type=int, default=100, help="Number of rows")
    
    # List chains
    subparsers.add_parser("chains", help="List available blockchains")
    
    # Usage
    subparsers.add_parser("usage", help="Get API usage info")
    
    args = parser.parse_args()
    
    if args.command == "search":
        blockchains = [args.chain] if args.chain else None
        result = search_tables(
            query=args.query,
            categories=args.category,
            blockchains=blockchains,
            limit=args.limit,
            include_schema=args.schema
        )
        
        if args.json:
            print(json.dumps(result, indent=2))
        else:
            if "error" in result:
                print(f"Error: {result['error']}", file=sys.stderr)
            elif "results" in result:
                print(format_table_results(result["results"]))
            else:
                print(json.dumps(result, indent=2))
    
    elif args.command == "exec":
        result = execute_query(args.query_id)
        print(json.dumps(result, indent=2))
    
    elif args.command == "results":
        result = get_execution_results(args.execution_id, limit=args.limit)
        print(json.dumps(result, indent=2))
    
    elif args.command == "chains":
        result = list_blockchains()
        print(json.dumps(result, indent=2))
    
    elif args.command == "usage":
        result = get_usage()
        print(json.dumps(result, indent=2))
    
    else:
        parser.print_help()


if __name__ == "__main__":
    main()
```

### scripts/discovery/investment_profile.py

```python
#!/usr/bin/env python3
"""
Investment Profile - User preference collection and opportunity filtering.

This module provides a convenient way for agents to collect user investment
preferences and filter opportunities accordingly.

Usage:
    from investment_profile import InvestmentProfile
    
    profile = InvestmentProfile()
    profile.chain = "ethereum"
    profile.capital_token = "USDC"
    profile.accept_il = False
    profile.reward_preference = "single"
    
    # Filter opportunities
    filtered = profile.filter_opportunities(opportunities)
"""

from typing import Optional, List, Dict, Any, Callable
import json


class InvestmentProfile:
    """
    Collect and store user investment preferences for opportunity filtering.
    
    This class helps agents:
    1. Ask the right questions to users
    2. Store preferences in a structured way
    3. Filter opportunities based on preferences
    """
    
    def __init__(self):
        # Required fields
        self.chain: Optional[str] = None
        self.capital_token: Optional[str] = None
        
        # Optional preference fields (None means "no preference / any")
        self.reward_preference: Optional[str] = None  # "single" | "multi" | "none" | None
        self.accept_il: Optional[bool] = None  # True | False | None
        self.underlying_preference: Optional[str] = None  # "rwa" | "onchain" | "mixed" | None
        
        # Additional constraints
        self.min_apy: Optional[float] = None
        self.max_apy: Optional[float] = None
        self.min_tvl: Optional[float] = None
    
    @classmethod
    def get_questions(cls) -> Dict[str, Any]:
        """
        Get the list of questions to ask users.
        
        Returns a structured dict that agents can use to build their UI.
        """
        return {
            "required": [
                {
                    "key": "chain",
                    "question": "您想在哪条链上投资?",
                    "description": "选择目标区块链",
                    "options": ["ethereum", "base", "arbitrum", "optimism"],
                    "type": "select"
                },
                {
                    "key": "capital_token",
                    "question": "您的投资本金是什么代币?",
                    "description": "例如:USDC, USDT, ETH, WBTC",
                    "examples": ["USDC", "USDT", "ETH"],
                    "type": "text"
                }
            ],
            "preference": [
                {
                    "key": "reward_preference",
                    "question": "您对奖励代币有什么偏好吗?",
                    "description": "有些产品奖励单一代币,有些奖励多种代币(如 CRV+CVX)",
                    "options": [
                        {"value": "single", "label": "只接受单一代币奖励", "description": "简单清晰,易于管理"},
                        {"value": "multi", "label": "可以接受多代币奖励", "description": "可能更高收益,但需要管理多种代币"},
                        {"value": "none", "label": "不需要奖励,只要基础收益", "description": "最稳定的收益结构"},
                        {"value": None, "label": "无特别偏好", "description": "都可以接受"}
                    ],
                    "type": "select"
                },
                {
                    "key": "accept_il",
                    "question": "您能接受无常损失(Impermanent Loss)吗?",
                    "description": "LP 类产品在价格波动时可能产生无常损失",
                    "options": [
                        {"value": False, "label": "不接受", "description": "只想要本金保障的产品,如借贷、单币质押"},
                        {"value": True, "label": "可以接受", "description": "理解并接受 LP 的无常损失风险"},
                        {"value": None, "label": "不确定/看情况", "description": "需要更多建议"}
                    ],
                    "type": "select"
                },
                {
                    "key": "underlying_preference",
                    "question": "您对底层资产有偏好吗?",
                    "description": "产品的底层资产类型",
                    "options": [
                        {"value": "onchain", "label": "纯链上合约", "description": "DeFi 原生协议,如 Aave、Compound"},
                        {"value": "rwa", "label": "现实世界资产 (RWA)", "description": "如国债代币化、美债收益"},
                        {"value": "mixed", "label": "混合资产", "description": "多种资产组合"},
                        {"value": None, "label": "无偏好", "description": "不限制底层资产类型"}
                    ],
                    "type": "select"
                }
            ],
            "constraints": [
                {
                    "key": "min_apy",
                    "question": "最低可接受的 APY 是多少?",
                    "description": "例如:5 表示至少 5%",
                    "type": "number",
                    "unit": "%"
                },
                {
                    "key": "max_apy",
                    "question": "最高 APY 上限(过滤过高风险)",
                    "description": "超过此 APY 的产品将被过滤,建议 30-50",
                    "type": "number",
                    "unit": "%"
                },
                {
                    "key": "min_tvl",
                    "question": "最低 TVL 要求?",
                    "description": "产品总锁仓价值,建议至少 $100万",
                    "type": "number",
                    "unit": "USD"
                }
            ]
        }
    
    def set_preferences(self, **kwargs) -> "InvestmentProfile":
        """
        Set multiple preferences at once.
        
        Example:
            profile.set_preferences(
                chain="ethereum",
                capital_token="USDC",
                accept_il=False,
                reward_preference="single"
            )
        """
        valid_fields = [
            'chain', 'capital_token', 'reward_preference', 'accept_il',
            'underlying_preference', 'min_apy', 'max_apy', 'min_tvl'
        ]
        
        for key, value in kwargs.items():
            if key in valid_fields:
                setattr(self, key, value)
        
        return self
    
    def is_valid(self) -> bool:
        """Check if required fields are set."""
        return self.chain is not None and self.capital_token is not None
    
    def _matches_reward_preference(self, opp: Dict[str, Any]) -> bool:
        """Check if opportunity matches reward preference."""
        if self.reward_preference is None:
            return True
        
        risk_signals = opp.get("risk_signals", {})
        reward_type = risk_signals.get("reward_type", "unknown")
        
        # Map preference to allowed types
        preference_map = {
            "single": ["single", "none"],
            "multi": ["multi"],
            "none": ["none"]
        }
        
        allowed = preference_map.get(self.reward_preference, [])
        return reward_type in allowed
    
    def _matches_il_preference(self, opp: Dict[str, Any]) -> bool:
        """Check if opportunity matches IL preference."""
        if self.accept_il is None:
            return True
        
        risk_signals = opp.get("risk_signals", {})
        has_il = risk_signals.get("has_il_risk", False)
        
        # If user doesn't accept IL, filter out products with IL risk
        if not self.accept_il and has_il:
            return False
        
        return True
    
    def _matches_underlying_preference(self, opp: Dict[str, Any]) -> bool:
        """Check if opportunity matches underlying asset preference."""
        if self.underlying_preference is None:
            return True
        
        risk_signals = opp.get("risk_signals", {})
        underlying_type = risk_signals.get("underlying_type", "unknown")
        
        # Exact match or "mixed" can satisfy "onchain" preference
        if self.underlying_preference == "onchain" and underlying_type in ["onchain", "mixed"]:
            return True
        
        return underlying_type == self.underlying_preference
    
    def _matches_constraints(self, opp: Dict[str, Any]) -> bool:
        """Check if opportunity matches numeric constraints."""
        apy = opp.get("apy", 0)
        tvl = opp.get("tvl_usd", 0)
        
        if self.min_apy is not None and apy < self.min_apy:
            return False
        
        if self.max_apy is not None and apy > self.max_apy:
            return False
        
        if self.min_tvl is not None and tvl < self.min_tvl:
            return False
        
        return True
    
    def filter_opportunities(self, opportunities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Filter opportunities based on stored preferences.
        
        Args:
            opportunities: List of opportunity dicts from find_opportunities()
        
        Returns:
            Filtered list matching all preferences
        """
        if not self.is_valid():
            raise ValueError("Required fields (chain, capital_token) must be set before filtering")
        
        filtered = []
        
        for opp in opportunities:
            # Check all preferences
            if not self._matches_reward_preference(opp):
                continue
            
            if not self._matches_il_preference(opp):
                continue
            
            if not self._matches_underlying_preference(opp):
                continue
            
            if not self._matches_constraints(opp):
                continue
            
            filtered.append(opp)
        
        return filtered
    
    def explain_filtering(self, original_count: int, filtered_count: int) -> str:
        """Generate a human-readable explanation of filtering results."""
        lines = [
            f"# 投资偏好过滤结果",
            f"",
            f"**原始产品数**: {original_count}",
            f"**符合条件**: {filtered_count}",
            f"**过滤比例**: {(1 - filtered_count/original_count)*100:.1f}%",
            f"",
            f"## 您的偏好设置",
            f"",
        ]
        
        # Required
        lines.append(f"- **目标链**: {self.chain}")
        lines.append(f"- **投资代币**: {self.capital_token}")
        
        # Preferences
        if self.reward_preference:
            reward_labels = {"single": "单一代币奖励", "multi": "多代币奖励", "none": "无奖励"}
            lines.append(f"- **奖励偏好**: {reward_labels.get(self.reward_preference, self.reward_preference)}")
        
        if self.accept_il is not None:
            lines.append(f"- **接受无常损失**: {'是' if self.accept_il else '否'}")
        
        if self.underlying_preference:
            underlying_labels = {"onchain": "纯链上", "rwa": "现实世界资产", "mixed": "混合"}
            lines.append(f"- **底层资产**: {underlying_labels.get(self.underlying_preference, self.underlying_preference)}")
        
        # Constraints
        if self.min_apy:
            lines.append(f"- **最低 APY**: {self.min_apy}%")
        if self.max_apy:
            lines.append(f"- **最高 APY**: {self.max_apy}%")
        if self.min_tvl:
            lines.append(f"- **最低 TVL**: ${self.min_tvl:,.0f}")
        
        return "\n".join(lines)
    
    def to_dict(self) -> Dict[str, Any]:
        """Export preferences as dict."""
        return {
            "chain": self.chain,
            "capital_token": self.capital_token,
            "reward_preference": self.reward_preference,
            "accept_il": self.accept_il,
            "underlying_preference": self.underlying_preference,
            "min_apy": self.min_apy,
            "max_apy": self.max_apy,
            "min_tvl": self.min_tvl
        }
    
    def from_dict(self, data: Dict[str, Any]) -> "InvestmentProfile":
        """Load preferences from dict."""
        for key, value in data.items():
            if hasattr(self, key):
                setattr(self, key, value)
        return self
    
    def __repr__(self) -> str:
        return f"InvestmentProfile(chain={self.chain}, token={self.capital_token})"


# Convenience function for quick filtering
def filter_by_preferences(
    opportunities: List[Dict[str, Any]],
    chain: str,
    capital_token: str,
    **kwargs
) -> List[Dict[str, Any]]:
    """
    One-shot filter function.
    
    Example:
        filtered = filter_by_preferences(
            opportunities,
            chain="ethereum",
            capital_token="USDC",
            accept_il=False,
            reward_preference="single"
        )
    """
    profile = InvestmentProfile()
    profile.set_preferences(chain=chain, capital_token=capital_token, **kwargs)
    return profile.filter_opportunities(opportunities)


if __name__ == "__main__":
    # Demo: Print available questions
    print("# 投资偏好收集问题列表\n")
    questions = InvestmentProfile.get_questions()
    
    print("## 必问问题")
    for q in questions["required"]:
        print(f"\n**{q['question']}**")
        print(f"- Key: `{q['key']}`")
        if 'options' in q:
            print(f"- 选项: {', '.join(q['options'])}")
    
    print("\n## 重要问题(强烈建议询问)")
    for q in questions["preference"]:
        print(f"\n**{q['question']}**")
        print(f"- Key: `{q['key']}`")
        print(f"- 说明: {q['description']}")
        for opt in q['options']:
            if isinstance(opt, dict):
                print(f"  - `{opt['value']}`: {opt['label']} ({opt.get('description', '')})")
```

### scripts/discovery/unified_search.py

```python
#!/usr/bin/env python3
"""
Unified Search Interface for Web3 Investor Skill (v0.2.0)

Combines data from multiple sources:
1. DefiLlama API - Real-time yield data (with actionable addresses)
2. Dune Analytics - Deep on-chain analytics via MCP

Key Features:
- LLM-ready output format
- Actionable addresses for execution
- Risk signals collection (no local scoring)
- Protocol registry integration

Usage:
    python unified_search.py --min-apy 5 --chain ethereum
    python unified_search.py --source dune --query "aave lending"
    python unified_search.py --llm-ready --output json
"""

import argparse
import json
import sys
from datetime import datetime
from typing import Optional, List, Dict, Any

# Import existing modules
try:
    from find_opportunities import (
        find_opportunities, 
        format_report as format_defillama,
        format_llm_prompt,
        get_protocol_registry
    )
except ImportError:
    # Fallback if run from different directory
    import importlib.util
    spec = importlib.util.spec_from_file_location("find_opportunities", 
        __file__.replace("unified_search.py", "find_opportunities.py"))
    find_opps_module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(find_opps_module)
    find_opportunities = find_opps_module.find_opportunities
    format_defillama = find_opps_module.format_report
    format_llm_prompt = getattr(find_opps_module, "format_llm_prompt", None)
    get_protocol_registry = getattr(find_opps_module, "get_protocol_registry", lambda: {})

# Import Dune MCP with fallback
try:
    from dune_mcp import (
        search_tables, 
        execute_query, 
        get_execution_results,
        create_query
    )
except ImportError:
    # Fallback for relative import
    try:
        from .dune_mcp import (
            search_tables, 
            execute_query, 
            get_execution_results,
            create_query
        )
    except ImportError:
        # Last resort: manual import
        import importlib.util
        dune_spec = importlib.util.spec_from_file_location("dune_mcp",
            __file__.replace("unified_search.py", "dune_mcp.py"))
        dune_module = importlib.util.module_from_spec(dune_spec)
        dune_spec.loader.exec_module(dune_module)
        search_tables = dune_module.search_tables
        execute_query = dune_module.execute_query
        get_execution_results = dune_module.get_execution_results
        create_query = dune_module.create_query


def search_defillama(
    min_apy: float = 0,
    max_risk: str = "medium",
    chain: str = "Ethereum",
    min_tvl: float = 0,
    limit: int = 20
) -> List[Dict]:
    """
    Search DefiLlama for yield opportunities.
    
    Returns standardized opportunity format.
    """
    opportunities = find_opportunities(
        min_apy=min_apy,
        max_risk=max_risk,
        chain=chain,
        min_tvl=min_tvl,
        limit=limit
    )
    
    # Add source identifier
    for opp in opportunities:
        opp["source"] = "defillama"
    
    return opportunities


def search_dune_tables(
    query: str,
    chain: str = "ethereum",
    categories: List[str] = None,
    limit: int = 10
) -> List[Dict]:
    """
    Search Dune for relevant data tables.
    
    Returns table metadata for potential query execution.
    """
    if categories is None:
        categories = ["spell"]
    
    result = search_tables(
        query=query,
        categories=categories,
        blockchains=[chain],
        limit=limit,
        include_schema=True
    )
    
    tables = []
    
    # Handle MCP response format
    if "structuredContent" in result:
        results = result["structuredContent"].get("results", [])
    elif "content" in result:
        # Parse text content
        content = result["content"]
        if isinstance(content, list) and len(content) > 0:
            text = content[0].get("text", "{}")
            try:
                parsed = json.loads(text)
                results = parsed.get("results", [])
            except json.JSONDecodeError:
                results = []
        else:
            results = []
    elif "results" in result:
        results = result["results"]
    else:
        results = []
    
    for table in results:
        tables.append({
            "source": "dune",
            "full_name": table.get("full_name", ""),
            "category": table.get("category", ""),
            "blockchains": table.get("blockchains", []),
            "description": table.get("description", ""),
            "partition_columns": table.get("partition_columns", []),
            "schema": table.get("schema", {}),
            "visibility": table.get("visibility", "public")
        })
    
    return tables


def find_aave_rates_via_dune(chain: str = "ethereum") -> Dict:
    """
    Get Aave lending rates using Dune's curated tables.
    
    This demonstrates executing a query on Dune data.
    """
    # First, find the relevant table
    tables = search_dune_tables("aave interest", chain=chain)
    
    # Check if aave_ethereum.interest exists
    aave_table = None
    for t in tables:
        if "aave" in t["full_name"].lower() and "interest" in t["full_name"].lower():
            aave_table = t
            break
    
    if not aave_table:
        return {"error": "Aave interest table not found"}
    
    # Create a query to get current rates
    # Note: This requires knowing the table schema
    sql = f"""
    SELECT 
        asset_symbol,
        asset_address,
        supply_rate,
        variable_borrow_rate,
        stable_borrow_rate,
        block_date
    FROM {aave_table['full_name']}
    WHERE block_date = (SELECT MAX(block_date) FROM {aave_table['full_name']})
    ORDER BY supply_rate DESC
    LIMIT 20
    """
    
    result = create_query(
        name=f"Aave {chain} Rates - {datetime.now().strftime('%Y-%m-%d')}",
        sql=sql,
        description="Current Aave lending rates",
        is_temp=True
    )
    
    return {
        "table": aave_table["full_name"],
        "query_result": result
    }


def unified_search(
    min_apy: float = 0,
    chain: str = "Ethereum",
    min_tvl: float = 0,
    sources: List[str] = None,
    limit: int = 20,
    include_risk_signals: bool = True
) -> Dict[str, Any]:
    """
    Search across all configured data sources.
    
    v0.2.0: Removed max_risk filter (LLM will analyze risk)
    
    Args:
        min_apy: Minimum APY percentage
        chain: Blockchain to search
        min_tvl: Minimum TVL in USD
        sources: Data sources to use (defillama, dune)
        limit: Maximum results per source
        include_risk_signals: Include risk signals for LLM analysis
    
    Returns:
        Dict with opportunities from each source
    """
    if sources is None:
        sources = ["defillama", "dune"]
    
    results = {
        "generated_at": datetime.now().isoformat(),
        "version": "0.2.0",
        "parameters": {
            "min_apy": min_apy,
            "chain": chain,
            "min_tvl": min_tvl,
            "sources": sources
        },
        "sources": {}
    }
    
    # DefiLlama - Real-time yields with actionable addresses
    if "defillama" in sources:
        try:
            opps = find_opportunities(
                min_apy=min_apy,
                chain=chain,
                min_tvl=min_tvl,
                limit=limit,
                include_risk_signals=include_risk_signals
            )
            results["sources"]["defillama"] = {
                "status": "success",
                "count": len(opps),
                "opportunities": opps
            }
        except Exception as e:
            results["sources"]["defillama"] = {
                "status": "error",
                "error": str(e)
            }
    
    # Dune - Data tables for custom analysis
    if "dune" in sources:
        try:
            tables = search_dune_tables(
                query=f"yield lending {chain}",
                chain=chain.lower(),
                limit=limit
            )
            results["sources"]["dune"] = {
                "status": "success",
                "count": len(tables),
                "tables": tables
            }
        except Exception as e:
            results["sources"]["dune"] = {
                "status": "error",
                "error": str(e)
            }
    
    return results


def format_unified_report(results: Dict) -> str:
    """Format unified search results as markdown report (v0.2.0)."""
    lines = [
        "# 💰 Unified Investment Search Results",
        f"\n**Generated**: {results['generated_at']}",
        f"**Version**: {results.get('version', 'unknown')}",
        f"**Parameters**: min_apy={results['parameters']['min_apy']}%, "
        f"chain={results['parameters']['chain']}",
        "\n---\n"
    ]
    
    # DefiLlama results
    if "defillama" in results["sources"]:
        source = results["sources"]["defillama"]
        lines.append("## 📊 DefiLlama - Real-time Yields\n")
        
        if source["status"] == "success" and source.get("opportunities"):
            for opp in source["opportunities"]:
                # Actionable status
                actionable = opp.get("actionable_addresses", {})
                action_status = "✅ Ready" if actionable.get("has_actionable_address") else "⚠️ Needs lookup"
                
                lines.extend([
                    f"### {opp.get('protocol', 'N/A')} - {opp.get('symbol', 'N/A')}",
                    f"- **APY**: {opp.get('apy', 0):.2f}%",
                    f"- **TVL**: ${opp.get('tvl_usd', 0):,.0f}",
                    f"- **Chain**: {opp.get('chain', 'N/A')}",
                    f"- **Actionable**: {action_status}",
                    f"- **Link**: {opp.get('url', 'N/A')}",
                    ""
                ])
                
                # Risk signals summary
                risk = opp.get("risk_signals", {})
                if risk:
                    lines.append(f"**Risk Signals**: Audited={'✅' if risk.get('has_audit') else '❌'}, "
                                f"Known={'✅' if risk.get('known_protocol') else '❌'}, "
                                f"APY Type={risk.get('apy_composition', 'unknown')}")
                    lines.append("")
        else:
            lines.append(f"Status: {source.get('status', 'unknown')}")
            if source.get("error"):
                lines.append(f"Error: {source['error']}")
            lines.append("")
    
    # Dune results
    if "dune" in results["sources"]:
        source = results["sources"]["dune"]
        lines.append("## 🧙 Dune Analytics - Data Tables\n")
        
        if source["status"] == "success" and source.get("tables"):
            lines.append("Available tables for custom analysis:\n")
            for table in source["tables"]:
                chains = ", ".join(table.get("blockchains", [])[:3])
                lines.append(f"- `{table['full_name']}` ({table.get('category', 'N/A')}) - Chains: {chains}")
        else:
            lines.append(f"Status: {source.get('status', 'unknown')}")
            if source.get("error"):
                lines.append(f"Error: {source['error']}")
        lines.append("")
    
    return "\n".join(lines)


def format_llm_ready_output(results: Dict) -> str:
    """
    Format results as LLM-ready prompt for risk analysis.
    
    Optimized for LLM consumption with structured data.
    """
    lines = [
        "# DeFi Investment Search Results - LLM Analysis Request",
        f"\nGenerated: {results['generated_at']}",
        "",
        "## Task",
        "Analyze the following investment opportunities and provide:",
        "1. Risk assessment (Low/Medium/High) for each opportunity",
        "2. Key risk factors and warnings",
        "3. Recommended execution approach",
        "4. Any additional data needed for decision",
        "",
        "---\n"
    ]
    
    # DefiLlama opportunities
    if "defillama" in results["sources"]:
        source = results["sources"]["defillama"]
        if source["status"] == "success" and source.get("opportunities"):
            lines.append("## Opportunities from DefiLlama\n")
            for i, opp in enumerate(source["opportunities"], 1):
                lines.append(f"### Opportunity {i}")
                lines.append("```json")
                lines.append(json.dumps(opp, indent=2, ensure_ascii=False))
                lines.append("```\n")
    
    # Dune tables
    if "dune" in results["sources"]:
        source = results["sources"]["dune"]
        if source["status"] == "success" and source.get("tables"):
            lines.append("## Available Dune Data Tables\n")
            for table in source["tables"]:
                lines.append(f"- `{table['full_name']}`: {table.get('description', 'No description')}")
            lines.append("")
    
    return "\n".join(lines)


def main():
    parser = argparse.ArgumentParser(
        description="Unified search across DefiLlama and Dune (v0.2.0 - LLM-ready)"
    )
    parser.add_argument("--min-apy", type=float, default=0, help="Minimum APY %%")
    parser.add_argument("--chain", default="Ethereum", help="Blockchain to search")
    parser.add_argument("--min-tvl", type=float, default=0, help="Minimum TVL in USD")
    parser.add_argument("--limit", "-n", type=int, default=20, help="Results per source")
    parser.add_argument("--source", action="append", choices=["defillama", "dune"],
                        help="Specific sources to use (default: all)")
    parser.add_argument("--output", "-o", choices=["markdown", "json"], default="markdown",
                        help="Output format")
    parser.add_argument("--llm-ready", action="store_true",
                        help="Output LLM-ready prompt for risk analysis")
    parser.add_argument("--no-risk-signals", action="store_true",
                        help="Exclude risk signals from output")
    parser.add_argument("--dune-query", type=str, help="Custom Dune table search query")
    
    args = parser.parse_args()
    
    # Custom Dune search
    if args.dune_query:
        tables = search_dune_tables(
            query=args.dune_query,
            chain=args.chain.lower(),
            limit=args.limit
        )
        if args.output == "json":
            print(json.dumps(tables, indent=2))
        else:
            for t in tables:
                print(f"{t['full_name']:40} | {t['category']:10} | {', '.join(t['blockchains'][:2])}")
        return
    
    # Unified search
    sources = args.source if args.source else None
    results = unified_search(
        min_apy=args.min_apy,
        chain=args.chain,
        min_tvl=args.min_tvl,
        sources=sources,
        limit=args.limit,
        include_risk_signals=not args.no_risk_signals
    )
    
    # Output formatting
    if args.llm_ready:
        print(format_llm_ready_output(results))
    elif args.output == "json":
        print(json.dumps(results, indent=2, ensure_ascii=False))
    else:
        print(format_unified_report(results))


if __name__ == "__main__":
    main()
```

### scripts/portfolio/indexer.py

```python
#!/usr/bin/env python3
"""
On-chain portfolio indexer with blockchain explorer API support.

Usage:
    python indexer.py --address 0x...
    python indexer.py --address 0x... --chain base
    python indexer.py --address 0x... --output json
"""

import argparse
import json
import os
import sys
import urllib.request
from datetime import datetime

# ============================================================================
# Chain Configurations
# ============================================================================

CHAIN_CONFIGS = {
    "ethereum": {
        "name": "Ethereum",
        "chain_id": 1,
        "rpcs": ["https://cloudflare-eth.com", "https://mainnet.gateway.fm"],
        "alchemy_template": "https://eth-mainnet.g.alchemy.com/v2/{api_key}",
        "explorer_api": "https://api.etherscan.io/v2/api",
        "explorer_env_key": "ETHERSCAN_API_KEY",
    },
    "base": {
        "name": "Base",
        "chain_id": 8453,
        "rpcs": ["https://mainnet.base.org", "https://base.llamarpc.com"],
        "alchemy_template": "https://base-mainnet.g.alchemy.com/v2/{api_key}",
        "explorer_api": "https://api.basescan.org/api",
        "explorer_env_key": "BASESCAN_API_KEY",
    },
}

# Price symbol to CoinGecko ID mapping
SYMBOL_TO_COINGECKO = {
    "ETH": "ethereum", "WETH": "ethereum", "USDC": "usd-coin",
    "USDT": "tether", "DAI": "dai", "LINK": "chainlink",
    "UNI": "uniswap", "AAVE": "aave", "stETH": "staked-ether",
}


# ============================================================================
# Blockchain Explorer API (Primary - Requires Free API Key)
# ============================================================================

def get_explorer_api_key(chain: str) -> str:
    """Get explorer API key from environment or config."""
    config = CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])
    env_key = config.get("explorer_env_key", "ETHERSCAN_API_KEY")
    
    # Check environment variable
    api_key = os.environ.get(env_key) or os.environ.get("ETHERSCAN_API_KEY")
    if api_key:
        return api_key
    
    # Check config file
    cfg = get_config()
    return cfg.get("portfolio", {}).get("etherscan_api_key", "")


def get_balance_from_explorer(address: str, chain: str = "ethereum") -> float:
    """Get native balance using blockchain explorer API."""
    config = CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])
    api_key = get_explorer_api_key(chain)
    
    if not api_key:
        print(f"⚠️ 未配置 {chain} 链的区块链浏览器 API Key", file=sys.stderr)
        return 0
    
    # Etherscan V2 API format
    url = f"{config['explorer_api']}?chainid={config['chain_id']}&module=account&action=balance&address={address}&apikey={api_key}"
    
    try:
        req = urllib.request.Request(url, headers={"User-Agent": "Web3-Investor/0.3.0"})
        with urllib.request.urlopen(req, timeout=15) as resp:
            data = json.loads(resp.read().decode())
            if data.get("status") == "1":
                return int(data.get("result", "0")) / 1e18
            else:
                print(f"⚠️ Explorer API 返回错误: {data.get('message', 'Unknown')}", file=sys.stderr)
    except Exception as e:
        print(f"⚠️ Explorer API 请求失败: {e}", file=sys.stderr)
    return 0


def get_tokens_from_explorer(address: str, chain: str = "ethereum") -> list:
    """Get token list using blockchain explorer API."""
    config = CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])
    api_key = get_explorer_api_key(chain)
    
    if not api_key:
        return []
    
    url = f"{config['explorer_api']}?chainid={config['chain_id']}&module=account&action=tokenlist&address={address}&apikey={api_key}"
    
    tokens = []
    try:
        req = urllib.request.Request(url, headers={"User-Agent": "Web3-Investor/0.3.0"})
        with urllib.request.urlopen(req, timeout=15) as resp:
            data = json.loads(resp.read().decode())
            if data.get("status") == "1" and data.get("result"):
                for t in data["result"]:
                    try:
                        decimals = int(t.get("tokenDecimal", 18))
                        balance = int(t.get("value", "0")) / (10 ** decimals)
                        if balance > 0:
                            tokens.append({
                                "address": t.get("contractAddress", ""),
                                "symbol": t.get("tokenSymbol", "?"),
                                "name": t.get("tokenName", "?"),
                                "balance": balance,
                                "decimals": decimals
                            })
                    except (ValueError, TypeError):
                        continue
    except Exception as e:
        print(f"⚠️ Token list 请求失败: {e}", file=sys.stderr)
    return tokens


# ============================================================================
# RPC Fallback (Secondary)
# ============================================================================

def get_rpc_url(chain: str) -> str:
    """Get RPC URL with environment/config fallback."""
    env_var = f"WEB3_INVESTOR_{chain.upper()}_RPC_URL"
    rpc = os.environ.get(env_var) or os.environ.get("WEB3_INVESTOR_RPC_URL")
    if rpc:
        return rpc
    
    alchemy_key = os.environ.get("ALCHEMY_API_KEY")
    if alchemy_key:
        return CHAIN_CONFIGS[chain]["alchemy_template"].format(api_key=alchemy_key)
    
    return CHAIN_CONFIGS[chain]["rpcs"][0]


def get_balance_rpc(address: str, chain: str) -> float:
    """Get native balance via RPC (fallback)."""
    try:
        payload = {"jsonrpc": "2.0", "method": "eth_getBalance", "params": [address, "latest"], "id": 1}
        req = urllib.request.Request(get_rpc_url(chain), data=json.dumps(payload).encode(),
                                     headers={"Content-Type": "application/json"})
        with urllib.request.urlopen(req, timeout=15) as resp:
            result = json.loads(resp.read().decode())
            return int(result.get("result", "0x0"), 16) / 1e18
    except Exception as e:
        print(f"⚠️ RPC error: {e}", file=sys.stderr)
    return 0


# ============================================================================
# Prices
# ============================================================================

def get_prices(tokens: list) -> dict:
    """Get prices from CoinGecko."""
    ids = {SYMBOL_TO_COINGECKO.get(t.get("symbol", "").upper()) for t in tokens}
    ids.discard(None)
    if not ids:
        return {}
    try:
        url = f"https://api.coingecko.com/api/v3/simple/price?ids={','.join(ids)}&vs_currencies=usd"
        with urllib.request.urlopen(url, timeout=15) as resp:
            return json.loads(resp.read().decode())
    except Exception as e:
        print(f"⚠️ Price error: {e}", file=sys.stderr)
    return {}


# ============================================================================
# Portfolio Builder
# ============================================================================

def build_portfolio(address: str, chain: str, native: float, tokens: list, source: str) -> dict:
    """Build portfolio response."""
    prices = get_prices(tokens)
    eth_price = prices.get("ethereum", {}).get("usd", 2000)
    
    total = native * eth_price
    for t in tokens:
        price_id = SYMBOL_TO_COINGECKO.get(t.get("symbol", "").upper())
        t["price_usd"] = prices.get(price_id, {}).get("usd", 0) if price_id else 0
        t["value_usd"] = t.get("balance", 0) * t.get("price_usd", 0)
        total += t["value_usd"]
    
    all_tokens = [{
        "address": "0x0000000000000000000000000000000000000000",
        "symbol": "ETH", "name": "Ethereum",
        "balance": native, "price_usd": eth_price, "value_usd": native * eth_price
    }] + [t for t in tokens if t.get("value_usd", 0) > 0.01]
    all_tokens.sort(key=lambda x: x.get("value_usd", 0), reverse=True)
    
    return {
        "address": address.lower(), "chain": chain,
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "total_value_usd": round(total, 2), "tokens": all_tokens,
        "defi_positions": [], "expiring_positions": [], "data_source": source
    }


def get_portfolio(address: str, chain: str = "ethereum") -> dict:
    """Get portfolio with automatic method selection (explorer first, then RPC)."""
    chain = chain.lower()
    
    # Try explorer API first
    print(f"🔍 查询 {chain} 链资产(使用区块链浏览器)...", file=sys.stderr)
    native = get_balance_from_explorer(address, chain)
    tokens = get_tokens_from_explorer(address, chain)
    
    if native > 0 or tokens:
        print(f"✅ 区块链浏览器查询成功!", file=sys.stderr)
        return build_portfolio(address, chain, native, tokens, "explorer")
    
    # Fallback to RPC
    print(f"⚠️ 浏览器无数据,使用 RPC 节点...", file=sys.stderr)
    native = get_balance_rpc(address, chain)
    return build_portfolio(address, chain, native, [], "rpc")


# ============================================================================
# Output Formatting
# ============================================================================

def format_report(portfolio: dict, fmt: str = "markdown") -> str:
    """Format portfolio as report."""
    if fmt == "json":
        return json.dumps(portfolio, indent=2, ensure_ascii=False)
    
    lines = [
        f"# 📊 投资组合报告",
        f"\n| 项目 | 值 |", f"|------|-----|",
        f"| 地址 | `{portfolio['address']}` |",
        f"| 链 | {portfolio['chain'].title()} |",
        f"| 总价值 | **${portfolio['total_value_usd']:,.2f}** |",
        f"| 数据源 | {portfolio['data_source']} |",
        f"\n---\n", "## 🪙 代币持仓\n",
        "| 代币 | 余额 | 价格 | 价值 |", "|------|------|------|------|"
    ]
    for t in portfolio.get("tokens", []):
        lines.append(f"| {t.get('symbol', '?')} | {t.get('balance', 0):.4f} | ${t.get('price_usd', 0):.2f} | ${t.get('value_usd', 0):,.2f} |")
    return "\n".join(lines)


# ============================================================================
# Main
# ============================================================================

def main():
    parser = argparse.ArgumentParser(description="Portfolio indexer")
    parser.add_argument("--address", required=True, help="Wallet address")
    parser.add_argument("--chain", default="ethereum", choices=["ethereum", "base"])
    parser.add_argument("--output", choices=["markdown", "json"], default="markdown")
    args = parser.parse_args()
    
    portfolio = get_portfolio(args.address, args.chain)
    print(format_report(portfolio, args.output))


if __name__ == "__main__":
    main()
```

### scripts/schemas/output_schema.py

```python
"""
Output Schema Module

Provides standardized output formats for all transaction operations.
Inspired by xaut-trade's structured output design.

Key Features:
- Consistent structure across swap/transfer/deposit operations
- Clear stage indication (preview/ready_to_execute)
- Risk warnings and confirmation requirements
- Machine-readable JSON and human-readable text output
"""

from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Any
from enum import Enum
from datetime import datetime
import uuid
import json


class Stage(str, Enum):
    """Transaction stage."""
    PREVIEW = "preview"
    READY_TO_EXECUTE = "ready_to_execute"
    EXECUTED = "executed"
    FAILED = "failed"


class RiskLevel(str, Enum):
    """Risk severity level."""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"


@dataclass
class RiskWarning:
    """Risk warning item."""
    type: str
    level: RiskLevel
    message: str
    details: Optional[Dict[str, Any]] = None


@dataclass
class InputInfo:
    """Transaction input information."""
    token: str
    amount: str
    chain: str
    token_out: Optional[str] = None  # For swap
    protocol: Optional[str] = None   # For deposit


@dataclass
class QuoteInfo:
    """Quote information (varies by transaction type)."""
    estimated_output: Optional[str] = None
    rate: Optional[str] = None
    slippage_bps: Optional[int] = None
    min_amount_out: Optional[str] = None
    gas_estimate: Optional[str] = None
    apy: Optional[str] = None  # For deposit/staking


@dataclass
class PreviewOutput:
    """
    Standardized preview output format.
    
    Used for all transaction types: swap, transfer, deposit.
    """
    stage: Stage
    request_id: str
    timestamp: str
    
    # Input
    input: InputInfo
    
    # Quote (varies by type)
    quote: QuoteInfo
    
    # Risk warnings
    risk_warnings: List[RiskWarning]
    
    # Confirmation
    requires_double_confirm: bool = False
    double_confirm_reasons: List[str] = field(default_factory=list)
    confirm_instruction: Optional[str] = None
    
    # Navigation
    next_step: str = "approve"
    
    # Raw data (for debugging)
    raw_preview_id: Optional[str] = None
    raw_transaction: Optional[Dict] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary."""
        result = {
            "stage": self.stage.value,
            "request_id": self.request_id,
            "timestamp": self.timestamp,
            "input": asdict(self.input),
            "quote": asdict(self.quote),
            "risk_warnings": [
                {
                    "type": w.type,
                    "level": w.level.value,
                    "message": w.message,
                    **({"details": w.details} if w.details else {})
                }
                for w in self.risk_warnings
            ],
            "requires_double_confirm": self.requires_double_confirm,
            "next_step": self.next_step
        }
        
        if self.requires_double_confirm:
            result["double_confirm_reasons"] = self.double_confirm_reasons
            if self.confirm_instruction:
                result["confirm_instruction"] = self.confirm_instruction
        
        if self.raw_preview_id:
            result["preview_id"] = self.raw_preview_id
        
        return result
    
    def to_json(self, indent: int = 2) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
    
    def to_text(self) -> str:
        """Convert to human-readable text."""
        lines = []
        
        # Header
        lines.append(f"📋 Preview ID: {self.raw_preview_id or self.request_id}")
        lines.append(f"   Stage: {self.stage.value}")
        lines.append("")
        
        # Input
        lines.append("   Input:")
        lines.append(f"      Token: {self.input.token}")
        lines.append(f"      Amount: {self.input.amount}")
        lines.append(f"      Chain: {self.input.chain}")
        if self.input.token_out:
            lines.append(f"      → Output: {self.input.token_out}")
        if self.input.protocol:
            lines.append(f"      Protocol: {self.input.protocol}")
        lines.append("")
        
        # Quote
        if self.quote.estimated_output or self.quote.rate:
            lines.append("   Quote:")
            if self.quote.rate:
                lines.append(f"      Rate: {self.quote.rate}")
            if self.quote.estimated_output:
                lines.append(f"      Estimated Output: {self.quote.estimated_output}")
            if self.quote.slippage_bps:
                lines.append(f"      Slippage: {self.quote.slippage_bps} bps")
            if self.quote.min_amount_out:
                lines.append(f"      Min Output: {self.quote.min_amount_out}")
            if self.quote.gas_estimate:
                lines.append(f"      Gas Estimate: {self.quote.gas_estimate}")
            if self.quote.apy:
                lines.append(f"      APY: {self.quote.apy}")
            lines.append("")
        
        # Risk warnings
        if self.risk_warnings:
            lines.append("   ⚠️ Risk Warnings:")
            for w in self.risk_warnings:
                level_emoji = {"low": "🟢", "medium": "🟡", "high": "🔴"}[w.level.value]
                lines.append(f"      {level_emoji} {w.type}: {w.message}")
            lines.append("")
        
        # Double confirm
        if self.requires_double_confirm:
            lines.append("   🔒 Double Confirmation Required:")
            for reason in self.double_confirm_reasons:
                lines.append(f"      - {reason}")
            if self.confirm_instruction:
                lines.append(f"   {self.confirm_instruction}")
            lines.append("")
        
        # Next step
        lines.append(f"   Next Step: {self.next_step}")
        
        return "\n".join(lines)


@dataclass
class ApprovalResult:
    """Standardized approval result."""
    success: bool
    approval_id: Optional[str] = None
    preview_id: Optional[str] = None
    approved_at: Optional[str] = None
    expires_at: Optional[str] = None
    error: Optional[Dict[str, Any]] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary."""
        result = {
            "success": self.success,
            "approval_id": self.approval_id,
            "preview_id": self.preview_id,
            "approved_at": self.approved_at
        }
        
        if self.expires_at:
            result["expires_at"] = self.expires_at
        
        if self.error:
            result["error"] = self.error
        
        return result
    
    def to_json(self, indent: int = 2) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
    
    def to_text(self) -> str:
        """Convert to human-readable text."""
        if self.success:
            lines = [
                f"✅ Approval ID: {self.approval_id}",
                f"   Preview ID: {self.preview_id}",
                f"   Approved At: {self.approved_at}",
                f"   Next Step: execute --approval-id {self.approval_id}"
            ]
        else:
            error = self.error or {}
            lines = [
                f"❌ Approval Failed",
                f"   Error: {error.get('message', 'Unknown')}"
            ]
        
        return "\n".join(lines)


@dataclass
class ExecutionResult:
    """Standardized execution result."""
    success: bool
    tx_hash: Optional[str] = None
    explorer_url: Optional[str] = None
    network: Optional[str] = None
    executed_at: Optional[str] = None
    error: Optional[Dict[str, Any]] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary."""
        result = {
            "success": self.success,
            "tx_hash": self.tx_hash,
            "network": self.network,
            "executed_at": self.executed_at
        }
        
        if self.explorer_url:
            result["explorer_url"] = self.explorer_url
        
        if self.error:
            result["error"] = self.error
        
        return result
    
    def to_json(self, indent: int = 2) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
    
    def to_text(self) -> str:
        """Convert to human-readable text."""
        if self.success:
            lines = [
                "🚀 Transaction Submitted!",
                f"   TX Hash: {self.tx_hash}",
                f"   Network: {self.network}",
                f"   Explorer: {self.explorer_url}"
            ]
        else:
            error = self.error or {}
            lines = [
                "❌ Execution Failed",
                f"   Error: {error.get('message', 'Unknown')}"
            ]
        
        return "\n".join(lines)


# Factory functions

def create_preview_output(
    input_info: InputInfo,
    quote_info: QuoteInfo,
    risk_warnings: List[RiskWarning],
    requires_double_confirm: bool = False,
    double_confirm_reasons: List[str] = None,
    confirm_instruction: str = None,
    preview_id: str = None,
    raw_transaction: Dict = None
) -> PreviewOutput:
    """Create a standardized preview output."""
    return PreviewOutput(
        stage=Stage.PREVIEW,
        request_id=str(uuid.uuid4()),
        timestamp=datetime.utcnow().isoformat() + "Z",
        input=input_info,
        quote=quote_info,
        risk_warnings=risk_warnings,
        requires_double_confirm=requires_double_confirm,
        double_confirm_reasons=double_confirm_reasons or [],
        confirm_instruction=confirm_instruction,
        next_step="approve",
        raw_preview_id=preview_id,
        raw_transaction=raw_transaction
    )


def create_approval_result(
    success: bool,
    approval_id: str = None,
    preview_id: str = None,
    error: Dict = None
) -> ApprovalResult:
    """Create a standardized approval result."""
    return ApprovalResult(
        success=success,
        approval_id=approval_id,
        preview_id=preview_id,
        approved_at=datetime.utcnow().isoformat() + "Z" if success else None,
        error=error
    )


def create_execution_result(
    success: bool,
    tx_hash: str = None,
    network: str = None,
    explorer_url: str = None,
    error: Dict = None
) -> ExecutionResult:
    """Create a standardized execution result."""
    return ExecutionResult(
        success=success,
        tx_hash=tx_hash,
        network=network,
        explorer_url=explorer_url,
        executed_at=datetime.utcnow().isoformat() + "Z" if success else None,
        error=error
    )


if __name__ == "__main__":
    # Test output
    preview = create_preview_output(
        input_info=InputInfo(
            token="USDC",
            amount="10000",
            chain="base",
            token_out="WETH"
        ),
        quote_info=QuoteInfo(
            estimated_output="5.0",
            rate="1 WETH = 2000 USDC",
            slippage_bps=50,
            min_amount_out="4.975"
        ),
        risk_warnings=[
            RiskWarning(
                type="large_trade",
                level=RiskLevel.MEDIUM,
                message="Large trade size, consider price impact"
            )
        ],
        requires_double_confirm=True,
        double_confirm_reasons=["Large trade: 10000 >= threshold 5000"],
        confirm_instruction='To approve, use: --confirm "CONFIRM_HIGH_RISK"',
        preview_id="test-preview-id-123"
    )
    
    print("=== JSON Output ===")
    print(preview.to_json())
    
    print("\n=== Text Output ===")
    print(preview.to_text())
```

### scripts/trading/safe_vault.py

```python
#!/usr/bin/env python3
"""
Safe Vault - Secure transaction signing and execution (v0.2.0)

Features:
- Balance pre-check before transaction preparation
- Deposit preview for Aave/Compound/Lido
- Whitelist and limit protection
- Local signing capability

Usage:
    python safe_vault.py --check-balance --address 0x... --chain ethereum
    python safe_vault.py --preview-deposit --protocol aave --asset USDC --amount 1000
    python safe_vault.py --prepare-tx --to 0x... --value 0 --data 0x...
"""

import argparse
import json
import os
import sys
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
import urllib.request
import urllib.error

# ============================================================================
# Configuration
# ============================================================================

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "config.json")
WHITELIST_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "whitelist.json")
PROTOCOLS_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "protocols.json")

# Chain configurations
CHAIN_CONFIGS = {
    "ethereum": {
        "chain_id": 1,
        "rpcs": ["https://cloudflare-eth.com", "https://mainnet.gateway.fm"],
        "explorer_api": "https://api.etherscan.io/v2/api",
        "explorer_env_key": "ETHERSCAN_API_KEY",
        "native_token": "ETH"
    },
    "base": {
        "chain_id": 8453,
        "rpcs": ["https://mainnet.base.org"],
        "explorer_api": "https://api.basescan.org/api",
        "explorer_env_key": "BASESCAN_API_KEY",
        "native_token": "ETH"
    }
}

# Known token addresses (Ethereum mainnet)
KNOWN_TOKENS = {
    "USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
    "USDT": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
    "DAI": "0x6B175474E89094C44Da98b954EesdeeAC495271d0F",
    "WETH": "0xC02aaA39b223FE8D0A0e5C4F27ead9083C756Cc2",
    "stETH": "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84",
    "rETH": "0xae78736Cd615f374D3085123A210448E74Fc6393",
}

# Protocol contract addresses (Ethereum mainnet)
PROTOCOL_CONTRACTS = {
    "aave": {
        "pool": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2",
        "pool_proxy": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2"
    },
    "compound": {
        "comet": "0xc3d688B66703497DAA19211EEdff47f25384cdc3"
    },
    "lido": {
        "steth": "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84"
    }
}

# Error messages
ERROR_MESSAGES = {
    "NO_PRIVATE_KEY": "❌ 无法签名交易:未配置私钥。请设置 WEB3_INVESTOR_PRIVATE_KEY 环境变量。",
    "WHITELIST_BLOCKED": "⛔ 交易被拒绝:目标地址不在白名单中。",
    "EXCEEDS_LIMIT": "⚠️ 交易金额超过限额。当前限额为 ${limit} USD。",
    "SIMULATION_FAILED": "🔴 交易模拟失败:{reason}",
    "RPC_ERROR": "📡 无法连接到区块链网络:{error}",
    "INSUFFICIENT_BALANCE": "💰 余额不足:需要 {need} {token},实际余额 {balance}。",
    "INVALID_ADDRESS": "📛 无效的地址格式:{address}",
    "SIGNING_UNAVAILABLE": "🔐 本地签名功能不可用。请安装 eth-account。",
    "UNKNOWN_PROTOCOL": "❓ 未知的协议:{protocol}。支持的协议:aave, compound, lido",
    "UNKNOWN_TOKEN": "❓ 未知的代币:{token}。已知代币:USDC, USDT, DAI, WETH, stETH, rETH",
}


# ============================================================================
# Utility Functions
# ============================================================================

def load_config() -> dict:
    """Load configuration."""
    if os.path.exists(CONFIG_PATH):
        with open(CONFIG_PATH) as f:
            return json.load(f)
    return {"trading": {"mode": "simulation", "whitelist_enabled": True, "default_limit_usd": 100}}


def load_whitelist() -> dict:
    """Load whitelist."""
    if os.path.exists(WHITELIST_PATH):
        with open(WHITELIST_PATH) as f:
            return json.load(f)
    return {"addresses": [], "enabled": True}


def load_protocols() -> dict:
    """Load protocol registry."""
    if os.path.exists(PROTOCOLS_PATH):
        with open(PROTOCOLS_PATH) as f:
            return json.load(f)
    return {"protocols": {}}


def get_rpc_url(chain: str) -> str:
    """Get RPC URL for chain."""
    env_var = f"WEB3_INVESTOR_{chain.upper()}_RPC_URL"
    rpc = os.environ.get(env_var) or os.environ.get("WEB3_INVESTOR_RPC_URL")
    if rpc:
        return rpc
    
    alchemy_key = os.environ.get("ALCHEMY_API_KEY")
    if alchemy_key:
        return f"https://eth-mainnet.g.alchemy.com/v2/{alchemy_key}"
    
    return CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])["rpcs"][0]


def get_chain_id(chain: str) -> int:
    """Get chain ID for chain name."""
    return CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])["chain_id"]


# ============================================================================
# Balance Pre-Check
# ============================================================================

def get_native_balance(address: str, chain: str = "ethereum") -> float:
    """
    Get native token balance (ETH) for an address.
    
    Returns balance in ETH (not wei).
    """
    rpc_url = get_rpc_url(chain)
    
    payload = {
        "jsonrpc": "2.0",
        "method": "eth_getBalance",
        "params": [address, "latest"],
        "id": 1
    }
    
    try:
        req = urllib.request.Request(
            rpc_url,
            data=json.dumps(payload).encode(),
            headers={"Content-Type": "application/json"}
        )
        with urllib.request.urlopen(req, timeout=15) as resp:
            result = json.loads(resp.read().decode())
            balance_wei = int(result.get("result", "0x0"), 16)
            return balance_wei / 1e18
    except Exception as e:
        print(f"⚠️ 无法获取余额: {e}", file=sys.stderr)
        return 0


def get_erc20_balance(wallet: str, token: str, chain: str = "ethereum") -> float:
    """
    Get ERC20 token balance for an address.
    
    Args:
        wallet: Wallet address
        token: Token symbol or address
        chain: Blockchain name
    
    Returns:
        Balance as float (normalized by decimals)
    """
    # Resolve token address
    if token.upper() in KNOWN_TOKENS:
        token_address = KNOWN_TOKENS[token.upper()]
    elif token.startswith("0x"):
        token_address = token
    else:
        return 0
    
    rpc_url = get_rpc_url(chain)
    
    # ERC20 balanceOf(address) call
    # selector: 0x70a08231 + padded address
    data = "0x70a08231" + wallet[2:].lower().zfill(64)
    
    payload = {
        "jsonrpc": "2.0",
        "method": "eth_call",
        "params": [{"to": token_address, "data": data}, "latest"],
        "id": 1
    }
    
    try:
        req = urllib.request.Request(
            rpc_url,
            data=json.dumps(payload).encode(),
            headers={"Content-Type": "application/json"}
        )
        with urllib.request.urlopen(req, timeout=15) as resp:
            result = json.loads(resp.read().decode())
            balance_raw = int(result.get("result", "0x0"), 16)
            # Assume 18 decimals for most tokens, 6 for USDC/USDT
            decimals = 6 if token.upper() in ["USDC", "USDT"] else 18
            return balance_raw / (10 ** decimals)
    except Exception as e:
        print(f"⚠️ 无法获取代币余额: {e}", file=sys.stderr)
        return 0


def check_balance_before_deposit(
    wallet: str,
    token: str,
    amount: float,
    chain: str = "ethereum"
) -> Dict[str, Any]:
    """
    Check if wallet has sufficient balance for deposit.
    
    Returns:
        dict with sufficient (bool), balance, needed, message
    """
    result = {
        "sufficient": False,
        "token": token,
        "chain": chain,
        "needed": amount,
        "balance": 0,
        "message": ""
    }
    
    # Check native token (ETH)
    if token.upper() in ["ETH", "WETH"]:
        balance = get_native_balance(wallet, chain)
        result["balance"] = balance
        
        if balance >= amount:
            result["sufficient"] = True
            result["message"] = f"✅ 余额充足:{balance:.4f} ETH >= {amount} ETH"
        else:
            result["message"] = ERROR_MESSAGES["INSUFFICIENT_BALANCE"].format(
                need=amount, token="ETH", balance=balance
            )
        return result
    
    # Check ERC20 token
    balance = get_erc20_balance(wallet, token, chain)
    result["balance"] = balance
    
    if balance >= amount:
        result["sufficient"] = True
        result["message"] = f"✅ 余额充足:{balance:.4f} {token} >= {amount} {token}"
    else:
        result["message"] = ERROR_MESSAGES["INSUFFICIENT_BALANCE"].format(
            need=amount, token=token, balance=balance
        )
    
    return result


# ============================================================================
# Deposit Preview (Calldata Generation)
# ============================================================================

def encode_uint256(value: int) -> str:
    """Encode uint256 for calldata."""
    return hex(value)[2:].zfill(64)


def encode_address(address: str) -> str:
    """Encode address for calldata."""
    return address[2:].lower().zfill(64)


def generate_aave_deposit_calldata(
    asset: str,
    amount: float,
    on_behalf_of: str = None,
    referral_code: int = 0
) -> Dict[str, Any]:
    """
    Generate calldata for Aave V3 supply (deposit).
    
    Function: supply(address asset, uint256 amount, address onBehalfOf, uint16 referralCode)
    Selector: 0x617ba037
    """
    # Resolve asset address
    if asset.upper() in KNOWN_TOKENS:
        asset_address = KNOWN_TOKENS[asset.upper()]
    elif asset.startswith("0x"):
        asset_address = asset
    else:
        return {"error": ERROR_MESSAGES["UNKNOWN_TOKEN"].format(token=asset)}
    
    # Convert amount to wei (assume 18 decimals, 6 for stablecoins)
    decimals = 6 if asset.upper() in ["USDC", "USDT"] else 18
    amount_wei = int(amount * (10 ** decimals))
    
    # onBehalfOf defaults to caller (we'll use a placeholder)
    on_behalf = on_behalf_of or "0x0000000000000000000000000000000000000001"
    
    # Build calldata
    selector = "617ba037"  # supply(address,uint256,address,uint16)
    calldata = "0x" + selector + \
               encode_address(asset_address) + \
               encode_uint256(amount_wei) + \
               encode_address(on_behalf) + \
               encode_uint256(referral_code)
    
    return {
        "protocol": "aave",
        "method": "supply",
        "to": PROTOCOL_CONTRACTS["aave"]["pool"],
        "asset": asset_address,
        "amount": amount,
        "amount_wei": amount_wei,
        "calldata": calldata,
        "description": f"Deposit {amount} {asset} to Aave V3"
    }


def generate_compound_deposit_calldata(
    asset: str,
    amount: float
) -> Dict[str, Any]:
    """
    Generate calldata for Compound V3 supply (deposit).
    
    Function: supply(address asset, uint256 amount)
    Selector: 0xd0e30db0 (for deposit)
    Note: Compound V3 uses different pattern
    """
    # Compound V3 Comet uses: supply(address asset, uint256 amount)
    if asset.upper() in KNOWN_TOKENS:
        asset_address = KNOWN_TOKENS[asset.upper()]
    elif asset.startswith("0x"):
        asset_address = asset
    else:
        return {"error": ERROR_MESSAGES["UNKNOWN_TOKEN"].format(token=asset)}
    
    decimals = 6 if asset.upper() in ["USDC", "USDT"] else 18
    amount_wei = int(amount * (10 ** decimals))
    
    # Compound V3 supply selector
    selector = "0xe8eda9df"  # supply(address,uint256)
    calldata = "0x" + selector + \
               encode_address(asset_address) + \
               encode_uint256(amount_wei)
    
    return {
        "protocol": "compound",
        "method": "supply",
        "to": PROTOCOL_CONTRACTS["compound"]["comet"],
        "asset": asset_address,
        "amount": amount,
        "amount_wei": amount_wei,
        "calldata": calldata,
        "description": f"Deposit {amount} {asset} to Compound V3"
    }


def generate_lido_deposit_calldata(
    amount: float
) -> Dict[str, Any]:
    """
    Generate calldata for Lido stETH deposit.
    
    Function: submit(address _referral) - sends ETH and receives stETH
    Selector: 0x386497fd
    """
    amount_wei = int(amount * 1e18)
    
    # Lido submit selector with empty referral
    selector = "386497fd"  # submit(address)
    calldata = "0x" + selector + encode_address("0x0000000000000000000000000000000000000000")
    
    return {
        "protocol": "lido",
        "method": "submit",
        "to": PROTOCOL_CONTRACTS["lido"]["steth"],
        "asset": "ETH",
        "amount": amount,
        "amount_wei": amount_wei,
        "calldata": calldata,
        "value_wei": amount_wei,  # Need to send ETH
        "description": f"Stake {amount} ETH with Lido to receive stETH"
    }


def preview_deposit(
    protocol: str,
    asset: str,
    amount: float,
    wallet: str = None,
    chain: str = "ethereum"
) -> Dict[str, Any]:
    """
    Generate deposit preview with calldata and balance check.
    
    Args:
        protocol: Protocol name (aave, compound, lido)
        asset: Asset symbol or address
        amount: Amount to deposit
        wallet: Wallet address for balance check
        chain: Blockchain name
    
    Returns:
        dict with calldata, balance check, and execution preview
    """
    protocol_lower = protocol.lower()
    
    # Generate calldata based on protocol
    if protocol_lower == "aave":
        calldata_result = generate_aave_deposit_calldata(asset, amount)
    elif protocol_lower == "compound":
        calldata_result = generate_compound_deposit_calldata(asset, amount)
    elif protocol_lower == "lido":
        calldata_result = generate_lido_deposit_calldata(amount)
    else:
        return {
            "error": ERROR_MESSAGES["UNKNOWN_PROTOCOL"].format(protocol=protocol),
            "supported_protocols": ["aave", "compound", "lido"]
        }
    
    if "error" in calldata_result:
        return calldata_result
    
    result = {
        "protocol": protocol_lower,
        "asset": asset,
        "amount": amount,
        "chain": chain,
        "calldata": calldata_result,
        "preview": {
            "to": calldata_result["to"],
            "value": calldata_result.get("value_wei", 0),
            "data": calldata_result["calldata"],
            "gas_estimate": 250000  # Estimated gas for deposit
        }
    }
    
    # Balance check if wallet provided
    if wallet:
        balance_check = check_balance_before_deposit(
            wallet=wallet,
            token=asset,
            amount=amount,
            chain=chain
        )
        result["balance_check"] = balance_check
        
        if not balance_check["sufficient"]:
            result["can_execute"] = False
            result["warning"] = balance_check["message"]
        else:
            result["can_execute"] = True
    else:
        result["balance_check"] = {"checked": False, "message": "Wallet not provided"}
        result["can_execute"] = None  # Unknown
    
    return result


# ============================================================================
# Whitelist & Security
# ============================================================================

def check_whitelist(address: str, amount_usd: float = 0) -> dict:
    """Check if address is in whitelist and within limits."""
    config = load_config()
    whitelist = load_whitelist()
    
    if not config["trading"].get("whitelist_enabled", True):
        return {"allowed": True, "reason": "Whitelist disabled", "limit": float("inf")}
    
    if not whitelist.get("enabled", True):
        return {"allowed": True, "reason": "Whitelist disabled", "limit": float("inf")}
    
    address_lower = address.lower()
    
    for entry in whitelist.get("addresses", []):
        if entry.get("address", "").lower() == address_lower:
            limit = entry.get("max_amount_usd", config["trading"].get("default_limit_usd", 100))
            if amount_usd <= limit:
                return {"allowed": True, "reason": f"Within limit", "limit": limit}
            else:
                return {"allowed": False, "reason": f"Exceeds limit", "limit": limit}
    
    return {"allowed": False, "reason": "Address not in whitelist", "limit": 0}


def get_eth_price_usd() -> float:
    """Get current ETH price from CoinGecko."""
    try:
        url = "https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd"
        with urllib.request.urlopen(url, timeout=10) as response:
            data = json.loads(response.read().decode())
            return data["ethereum"]["usd"]
    except:
        return 2000


# ============================================================================
# Transaction Preparation
# ============================================================================

def prepare_transaction(
    to: str,
    value: str = "0",
    data: str = "0x",
    gas_limit: int = 300000,
    description: str = "",
    chain: str = "ethereum"
) -> dict:
    """Prepare a transaction for signing."""
    config = load_config()
    mode = config["trading"].get("mode", "simulation")
    
    # Estimate value in USD
    try:
        value_eth = float(value) / 1e18
        value_usd = value_eth * get_eth_price_usd()
    except:
        value_usd = 0
    
    # Check whitelist
    whitelist_result = check_whitelist(to, value_usd)
    
    # Generate request
    request_id = str(uuid.uuid4())
    request = {
        "request_id": request_id,
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "mode": mode,
        "chain": chain,
        "chain_id": get_chain_id(chain),
        "transaction": {
            "to": to,
            "value": value,
            "data": data,
            "gas_limit": gas_limit,
            "chain_id": get_chain_id(chain),
        },
        "value_usd": round(value_usd, 2),
        "whitelist": whitelist_result,
        "description": description,
        "approval_required": mode == "simulation" or not whitelist_result["allowed"]
    }
    
    return request


# ============================================================================
# CLI
# ============================================================================

def format_preview_output(result: Dict) -> str:
    """Format deposit preview for display."""
    if "error" in result:
        return f"❌ 错误: {result['error']}"
    
    lines = [
        f"# 📋 存款预览 - {result['protocol'].upper()}",
        "",
        f"| 项目 | 值 |",
        f"|------|-----|",
        f"| 协议 | {result['protocol']} |",
        f"| 资产 | {result['asset']} |",
        f"| 金额 | {result['amount']} |",
        f"| 链 | {result['chain']} |",
        "",
        "## 交易数据",
        f"- **To**: `{result['preview']['to']}`",
        f"- **Value**: {result['preview']['value']} wei",
        f"- **Data**: `{result['preview']['data'][:60]}...`",
        f"- **Gas Estimate**: {result['preview']['gas_estimate']:,}",
    ]
    
    if result.get("balance_check"):
        bc = result["balance_check"]
        lines.extend([
            "",
            "## 余额检查",
            f"- **余额**: {bc.get('balance', 'N/A')} {bc.get('token', '')}",
            f"- **状态**: {bc.get('message', 'N/A')}",
        ])
    
    return "\n".join(lines)


def main():
    parser = argparse.ArgumentParser(
        description="Safe Vault - Secure transaction manager (v0.2.0)",
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    
    subparsers = parser.add_subparsers(dest="command", help="Commands")
    
    # Check balance
    balance_parser = subparsers.add_parser("balance", help="Check wallet balance")
    balance_parser.add_argument("--wallet", "-w", required=True, help="Wallet address")
    balance_parser.add_argument("--token", "-t", default="ETH", help="Token symbol or address")
    balance_parser.add_argument("--chain", "-c", default="ethereum", help="Blockchain")
    balance_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Preview deposit
    preview_parser = subparsers.add_parser("preview-deposit", help="Preview deposit transaction")
    preview_parser.add_argument("--protocol", "-p", required=True, 
                                choices=["aave", "compound", "lido"],
                                help="Protocol name")
    preview_parser.add_argument("--asset", "-a", required=True, help="Asset symbol")
    preview_parser.add_argument("--amount", "-m", type=float, required=True, help="Amount to deposit")
    preview_parser.add_argument("--wallet", "-w", help="Wallet for balance check")
    preview_parser.add_argument("--chain", "-c", default="ethereum", help="Blockchain")
    preview_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Prepare transaction
    tx_parser = subparsers.add_parser("prepare-tx", help="Prepare transaction")
    tx_parser.add_argument("--to", required=True, help="Target address")
    tx_parser.add_argument("--value", default="0", help="Value in wei")
    tx_parser.add_argument("--data", default="0x", help="Calldata")
    tx_parser.add_argument("--chain", default="ethereum", help="Blockchain")
    tx_parser.add_argument("--json", action="store_true", help="JSON output")
    
    # Check signing capability
    subparsers.add_parser("check-signing", help="Check local signing capability")
    
    args = parser.parse_args()
    
    if args.command == "balance":
        result = check_balance_before_deposit(
            wallet=args.wallet,
            token=args.token,
            amount=0,  # Just checking balance
            chain=args.chain
        )
        if args.json:
            print(json.dumps(result, indent=2))
        else:
            print(f"💰 {args.token} 余额: {result['balance']:.6f}")
    
    elif args.command == "preview-deposit":
        result = preview_deposit(
            protocol=args.protocol,
            asset=args.asset,
            amount=args.amount,
            wallet=args.wallet,
            chain=args.chain
        )
        if args.json:
            print(json.dumps(result, indent=2, ensure_ascii=False))
        else:
            print(format_preview_output(result))
    
    elif args.command == "prepare-tx":
        result = prepare_transaction(
            to=args.to,
            value=args.value,
            data=args.data,
            chain=args.chain
        )
        if args.json:
            print(json.dumps(result, indent=2))
        else:
            print(f"✅ 交易已准备")
            print(f"   To: {result['transaction']['to']}")
            print(f"   Value: {result['transaction']['value']} wei (${result['value_usd']})")
    
    elif args.command == "check-signing":
        try:
            from eth_account import Account
            pk = os.environ.get("WEB3_INVESTOR_PRIVATE_KEY")
            print("✅ eth-account 已安装")
            print(f"   私钥配置: {'✅ 是' if pk else '❌ 否'}")
        except ImportError:
            print("❌ eth-account 未安装")
            print("   运行: pip install eth-account")
    
    else:
        parser.print_help()


if __name__ == "__main__":
    main()
```

### scripts/trading/simulate_tx.py

```python
#!/usr/bin/env python3
"""
Simulate a transaction before execution.

Usage:
    python simulate_tx.py --to 0x... --value 0 --data 0x...
    python simulate_tx.py --to 0x... --data 0x... --output json
"""

import argparse
import json
import sys
import os

# Add parent to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from trading.safe_vault import prepare_transaction, format_signing_request


def main():
    parser = argparse.ArgumentParser(description="Simulate Ethereum transaction")
    parser.add_argument("--to", required=True, help="Target address")
    parser.add_argument("--value", default="0", help="Value in wei")
    parser.add_argument("--data", default="0x", help="Transaction calldata")
    parser.add_argument("--gas-limit", type=int, default=300000, help="Gas limit")
    parser.add_argument("--output", choices=["text", "json"], default="text", help="Output format")
    
    args = parser.parse_args()
    
    request = prepare_transaction(
        to=args.to,
        value=args.value,
        data=args.data,
        gas_limit=args.gas_limit
    )
    
    if args.output == "json":
        print(json.dumps(request, indent=2))
    else:
        print(format_signing_request(request))


if __name__ == "__main__":
    main()
```

### scripts/utils/preflight.py

```python
"""
Pre-flight Checks Module

Provides environment validation before executing transactions.
Supports multiple check types that can be configured per transaction type.
"""

import os
import json
import requests
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from urllib.parse import urljoin

try:
    from .rpc_manager import RPCManager
    RPC_MANAGER_AVAILABLE = True
except ImportError:
    try:
        from utils.rpc_manager import RPCManager
        RPC_MANAGER_AVAILABLE = True
    except ImportError:
        RPC_MANAGER_AVAILABLE = False


class CheckSeverity(Enum):
    """Severity levels for check results."""
    CRITICAL = "critical"      # Hard stop, cannot proceed
    WARNING = "warning"        # Warning, can proceed with caution
    INFO = "info"              # Informational only


@dataclass
class CheckResult:
    """Result of a single pre-flight check."""
    name: str
    passed: bool
    severity: CheckSeverity
    message: str
    fix_hint: str = ""
    details: Dict[str, Any] = field(default_factory=dict)


@dataclass
class PreflightReport:
    """Complete pre-flight check report."""
    all_passed: bool
    critical_passed: bool
    results: List[CheckResult]
    summary: str = ""


class PreflightChecker:
    """
    Pre-flight checker for transaction execution.
    
    Usage:
        checker = PreflightChecker(config)
        report = checker.run(transaction_type="swap", params={...})
        if not report.all_passed:
            raise PreflightError(report)
    """
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.checks_registry = self._build_checks_registry()
    
    def _build_checks_registry(self) -> Dict[str, Callable]:
        """Build registry of available check functions."""
        return {
            "signer_api": self._check_signer_api,
            "rpc_reachable": self._check_rpc_reachable,
            "gas_balance": self._check_gas_balance,
            "token_balance": self._check_token_balance,
        }
    
    def run(
        self,
        transaction_type: str,
        params: Optional[Dict[str, Any]] = None
    ) -> PreflightReport:
        """
        Run pre-flight checks for a transaction.
        
        Args:
            transaction_type: Type of transaction (swap, transfer, deposit, etc.)
            params: Transaction parameters (token, amount, network, etc.)
        
        Returns:
            PreflightReport with all check results
        """
        params = params or {}
        
        # Get check list for this transaction type from config
        check_list = self._get_check_list(transaction_type)
        
        results = []
        for check_name in check_list:
            check_fn = self.checks_registry.get(check_name)
            if check_fn:
                try:
                    result = check_fn(params)
                except Exception as e:
                    result = CheckResult(
                        name=check_name,
                        passed=False,
                        severity=CheckSeverity.CRITICAL,
                        message=f"Check failed with exception: {str(e)}",
                        fix_hint="Check logs for details"
                    )
                results.append(result)
        
        # Determine overall status
        critical_results = [r for r in results if r.severity == CheckSeverity.CRITICAL]
        critical_passed = all(r.passed for r in critical_results)
        all_passed = all(r.passed for r in results)
        
        summary = self._build_summary(results, critical_passed)
        
        return PreflightReport(
            all_passed=all_passed,
            critical_passed=critical_passed,
            results=results,
            summary=summary
        )
    
    def _get_check_list(self, transaction_type: str) -> List[str]:
        """Get list of checks to run for a transaction type."""
        preflight_config = self.config.get("preflight", {})
        
        # Get checks for this specific type
        checks_config = preflight_config.get("checks", {})
        type_checks = checks_config.get(transaction_type)
        if type_checks:
            return type_checks
        
        # Fall back to swap checks
        return checks_config.get("swap", ["api", "rpc"])
    
    def _build_summary(self, results: List[CheckResult], critical_passed: bool) -> str:
        """Build human-readable summary of check results."""
        passed = sum(1 for r in results if r.passed)
        total = len(results)
        
        if critical_passed:
            return f"Pre-flight checks: {passed}/{total} passed"
        
        failed_critical = [r for r in results if not r.passed and r.severity == CheckSeverity.CRITICAL]
        failed_names = ", ".join(r.name for r in failed_critical)
        return f"Pre-flight checks failed: {failed_names}"
    
    # ==================== Individual Check Functions ====================
    
    def _check_signer_api(self, params: Dict[str, Any]) -> CheckResult:
        """Check if signer API is reachable."""
        # Support both old and new config paths
        api_config = self.config.get("api", {})
        api_url = api_config.get("url") or api_config.get("url", "")
        timeout = api_config.get("timeout_seconds", 30)
        
        # Handle environment variable syntax ${VAR:default}
        if api_url and api_url.startswith("${"):
            import os
            import re
            match = re.match(r'\$\{([^:}]+):?(.*?)\}', api_url)
            if match:
                env_var = match.group(1)
                default = match.group(2)
                api_url = os.environ.get(env_var, default)
        
        if not api_url:
            return CheckResult(
                name="signer_api",
                passed=False,
                severity=CheckSeverity.CRITICAL,
                message="API URL not configured",
                fix_hint="Set WEB3_INVESTOR_API_URL environment variable or update config.json"
            )
        
        try:
            # Try to call health check or balances endpoint
            health_url = urljoin(api_url, "/wallet/balances")
            response = requests.get(health_url, timeout=timeout)
            
            if response.status_code in [200, 401]:  # 401 is OK (auth required but service up)
                return CheckResult(
                    name="signer_api",
                    passed=True,
                    severity=CheckSeverity.CRITICAL,
                    message="Signer API is reachable",
                    details={"url": api_url, "status": response.status_code}
                )
            else:
                return CheckResult(
                    name="signer_api",
                    passed=False,
                    severity=CheckSeverity.CRITICAL,
                    message=f"Signer API returned unexpected status: {response.status_code}",
                    fix_hint="Check signer service status"
                )
        
        except requests.exceptions.ConnectionError:
            return CheckResult(
                name="signer_api",
                passed=False,
                severity=CheckSeverity.CRITICAL,
                message="Cannot connect to signer API",
                fix_hint="Ensure signer service is running on the configured port"
            )
        except requests.exceptions.Timeout:
            return CheckResult(
                name="signer_api",
                passed=False,
                severity=CheckSeverity.WARNING,
                message="Signer API connection timed out",
                fix_hint="Check network connectivity or increase timeout in config"
            )
        except Exception as e:
            return CheckResult(
                name="signer_api",
                passed=False,
                severity=CheckSeverity.CRITICAL,
                message=f"Unexpected error checking signer API: {str(e)}",
                fix_hint="Check configuration and network"
            )
    
    def _check_rpc_reachable(self, params: Dict[str, Any]) -> CheckResult:
        """Check if RPC endpoint is reachable (with fallback support)."""
        network = params.get("network", self.config.get("discovery", {}).get("default_chain", "base"))
        
        # Try using RPC Manager for fallback support
        if RPC_MANAGER_AVAILABLE:
            try:
                manager = RPCManager(self.config)
                url = manager.get_active_rpc(network)
                
                if not url:
                    return CheckResult(
                        name="rpc_reachable",
                        passed=False,
                        severity=CheckSeverity.CRITICAL,
                        message=f"No RPC endpoint configured for network: {network}",
                        fix_hint=f"Add RPC endpoint for {network} in config.json"
                    )
                
                success, block_number = manager.health_check(network, url)
                
                if success:
                    return CheckResult(
                        name="rpc_reachable",
                        passed=True,
                        severity=CheckSeverity.CRITICAL,
                        message=f"RPC reachable (block #{block_number})",
                        details={"rpc_url": url, "block_number": block_number, "fallback_available": manager.rpc_config.get("fallback_enabled", False)}
                    )
                
                # Primary failed, try fallback
                if manager.rpc_config.get("fallback_enabled", False):
                    fallback_url = manager.find_healthy_fallback(network)
                    if fallback_url:
                        success, block_number = manager.health_check(network, fallback_url)
                        if success:
                            return CheckResult(
                                name="rpc_reachable",
                                passed=True,
                                severity=CheckSeverity.WARNING,
                                message=f"RPC reachable via fallback (block #{block_number})",
                                details={"rpc_url": fallback_url, "block_number": block_number, "original_failed": url}
                            )
                
                return CheckResult(
                    name="rpc_reachable",
                    passed=False,
                    severity=CheckSeverity.CRITICAL,
                    message=f"RPC unreachable: {url}",
                    fix_hint="Check RPC URL or configure fallback RPCs in config.json"
                )
            
            except Exception as e:
                return CheckResult(
                    name="rpc_reachable",
                    passed=False,
                    severity=CheckSeverity.CRITICAL,
                    message=f"RPC check failed: {str(e)}",
                    fix_hint="Check RPC configuration"
                )
        
        # Fallback: direct check without RPC Manager
        rpc_config = self.config.get("rpc", {})
        rpc_urls = rpc_config.get("endpoints", {}).get(network, [])
        
        if not rpc_urls:
            return CheckResult(
                name="rpc_reachable",
                passed=False,
                severity=CheckSeverity.CRITICAL,
                message=f"No RPC endpoint configured for network: {network}",
                fix_hint=f"Add RPC endpoint for {network} in config.json"
            )
        
        primary_rpc = rpc_urls[0]
        timeout = rpc_config.get("timeout_seconds", 10)
        
        try:
            response = requests.post(
                primary_rpc,
                json={
                    "jsonrpc": "2.0",
                    "method": "eth_blockNumber",
                    "params": [],
                    "id": 1
                },
                timeout=timeout,
                headers={"Content-Type": "application/json"}
            )
            
            if response.status_code == 200:
                data = response.json()
                if "result" in data:
                    block_number = int(data["result"], 16)
                    return CheckResult(
                        name="rpc_reachable",
                        passed=True,
                        severity=CheckSeverity.CRITICAL,
                        message=f"RPC reachable (block #{block_number})",
                        details={"rpc_url": primary_rpc, "block_number": block_number}
                    )
            
            return CheckResult(
                name="rpc_reachable",
                passed=False,
                severity=CheckSeverity.CRITICAL,
                message=f"RPC returned unexpected response: {response.status_code}",
                fix_hint="Check RPC URL and network connectivity"
            )
        
        except requests.exceptions.RequestException as e:
            return CheckResult(
                name="rpc_reachable",
                passed=False,
                severity=CheckSeverity.CRITICAL,
                message=f"Cannot connect to RPC: {str(e)}",
                fix_hint="Check RPC URL or configure fallback RPCs"
            )
    
    def _check_env_completeness(self, params: Dict[str, Any]) -> CheckResult:
        """Check if required environment variables are set."""
        preflight_config = self.config.get("preflight", {})
        required_vars = preflight_config.get("required_env_vars", [])
        
        missing = []
        for var in required_vars:
            if not os.environ.get(var):
                missing.append(var)
        
        if missing:
            return CheckResult(
                name="env_completeness",
                passed=False,
                severity=CheckSeverity.WARNING,
                message=f"Missing environment variables: {', '.join(missing)}",
                fix_hint="Add missing variables to ~/.bashrc or .env file"
            )
        
        return CheckResult(
            name="env_completeness",
            passed=True,
            severity=CheckSeverity.INFO,
            message="All required environment variables present"
        )
    
    def _check_gas_balance(self, params: Dict[str, Any]) -> CheckResult:
        """Check if wallet has sufficient gas balance."""
        # This check requires the signer API to be available
        # It will be skipped if signer API is not reachable
        api_url = self.config.get("api", {}).get("url", "")
        network = params.get("network", "base")
        
        if not api_url:
            return CheckResult(
                name="gas_balance",
                passed=False,
                severity=CheckSeverity.WARNING,
                message="Cannot check gas balance: API URL not configured",
                fix_hint="Configure API base URL"
            )
        
        try:
            # Get native token balance (ETH for ethereum, ETH for base)
            response = requests.get(
                f"{api_url}/wallet/balances?chain={network}",
                timeout=10
            )
            
            if response.status_code == 200:
                data = response.json()
                balances = data.get("balances", [])
                
                # Find native token (ETH)
                native_balance = None
                for bal in balances:
                    if bal.get("symbol") in ["ETH", "WETH"]:
                        native_balance = float(bal.get("balance", 0))
                        break
                
                min_gas = self.config.get("preflight", {}).get("min_gas_balance_eth", 0.001)
                
                if native_balance is None or native_balance < min_gas:
                    return CheckResult(
                        name="gas_balance",
                        passed=False,
                        severity=CheckSeverity.CRITICAL,
                        message=f"Insufficient gas balance: {native_balance or 0} ETH (min: {min_gas})",
                        fix_hint=f"Top up wallet with at least {min_gas} ETH for gas fees"
                    )
                
                return CheckResult(
                    name="gas_balance",
                    passed=True,
                    severity=CheckSeverity.CRITICAL,
                    message=f"Gas balance sufficient: {native_balance} ETH",
                    details={"balance_eth": native_balance, "min_required": min_gas}
                )
            
            return CheckResult(
                name="gas_balance",
                passed=False,
                severity=CheckSeverity.WARNING,
                message="Could not fetch gas balance from API",
                fix_hint="Check API connectivity"
            )
        
        except Exception as e:
            return CheckResult(
                name="gas_balance",
                passed=False,
                severity=CheckSeverity.WARNING,
                message=f"Error checking gas balance: {str(e)}",
                fix_hint="Check API status"
            )
    
    def _check_token_balance(self, params: Dict[str, Any]) -> CheckResult:
        """Check if wallet has sufficient token balance for transaction."""
        token = params.get("from_token") or params.get("asset")
        amount = params.get("amount")
        network = params.get("network", "base")
        
        if not token or not amount:
            return CheckResult(
                name="token_balance",
                passed=True,
                severity=CheckSeverity.INFO,
                message="Token balance check skipped: token or amount not specified",
                details={"token": token, "amount": amount}
            )
        
        api_url = self.config.get("api", {}).get("url", "")
        
        try:
            response = requests.get(
                f"{api_url}/wallet/balances?chain={network}&tokens={token}",
                timeout=10
            )
            
            if response.status_code == 200:
                data = response.json()
                balances = data.get("balances", [])
                
                token_balance = None
                for bal in balances:
                    if bal.get("symbol", "").upper() == token.upper():
                        token_balance = float(bal.get("balance", 0))
                        break
                
                amount_float = float(amount)
                
                if token_balance is None or token_balance < amount_float:
                    return CheckResult(
                        name="token_balance",
                        passed=False,
                        severity=CheckSeverity.CRITICAL,
                        message=f"Insufficient {token} balance: {token_balance or 0} (required: {amount})",
                        fix_hint=f"Deposit {amount} {token} to wallet before proceeding"
                    )
                
                return CheckResult(
                    name="token_balance",
                    passed=True,
                    severity=CheckSeverity.CRITICAL,
                    message=f"Token balance sufficient: {token_balance} {token}",
                    details={"token": token, "balance": token_balance, "required": amount_float}
                )
            
            return CheckResult(
                name="token_balance",
                passed=False,
                severity=CheckSeverity.WARNING,
                message="Could not fetch token balance from API",
                fix_hint="Check API connectivity"
            )
        
        except Exception as e:
            return CheckResult(
                name="token_balance",
                passed=False,
                severity=CheckSeverity.WARNING,
                message=f"Error checking token balance: {str(e)}",
                fix_hint="Check API status"
            )
    
    def _check_allowance(self, params: Dict[str, Any]) -> CheckResult:
        """Check if token allowance is sufficient (for swap/deposit only)."""
        tx_type = params.get("type", "")
        
        # Allowance check only relevant for swap and deposit
        if tx_type not in ["swap", "deposit"]:
            return CheckResult(
                name="allowance",
                passed=True,
                severity=CheckSeverity.INFO,
                message="Allowance check not required for this transaction type"
            )
        
        # This is a soft check - actual allowance will be checked during preview
        return CheckResult(
            name="allowance",
            passed=True,
            severity=CheckSeverity.INFO,
            message="Allowance will be checked during transaction preview"
        )


class PreflightError(Exception):
    """Exception raised when pre-flight checks fail."""
    
    def __init__(self, report: PreflightReport):
        self.report = report
        self.message = report.summary
        super().__init__(self.message)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert error to dictionary for API responses."""
        return {
            "success": False,
            "error": {
                "code": "E014",
                "message": "Pre-flight checks failed",
                "details": {
                    "summary": self.report.summary,
                    "failed_checks": [
                        {
                            "name": r.name,
                            "message": r.message,
                            "fix_hint": r.fix_hint,
                            "severity": r.severity.value
                        }
                        for r in self.report.results if not r.passed
                    ]
                }
            }
        }


def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
    """Load configuration from config.json."""
    if config_path is None:
        script_dir = os.path.dirname(os.path.abspath(__file__))
        skill_dir = os.path.dirname(os.path.dirname(script_dir))  # utils -> scripts -> skill_dir
        config_path = os.path.join(skill_dir, "config", "config.json")
    
    if os.path.exists(config_path):
        with open(config_path) as f:
            return json.load(f)
    
    return {}


if __name__ == "__main__":
    # Simple test
    config = load_config()
    checker = PreflightChecker(config)
    report = checker.run("swap", {"network": "base", "from_token": "USDC", "amount": "100"})
    print(f"All passed: {report.all_passed}")
    print(f"Summary: {report.summary}")
    for r in report.results:
        status = "✅" if r.passed else "❌"
        print(f"{status} {r.name}: {r.message}")

```

### scripts/utils/rpc_manager.py

```python
"""
RPC Manager Module

Provides RPC fallback and session stickiness for blockchain operations.
Inspired by xaut-trade's RPC resilience design.

Key Features:
- Automatic fallback to secondary RPC nodes on network errors
- Session stickiness: once a fallback is selected, use it for the entire session
- Clear distinction between network errors (should fallback) and business errors (should not)
"""

import os
import json
import time
import requests
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum


class RPCErrors(Exception):
    """Exception raised when all RPC endpoints fail."""
    pass


@dataclass
class RPCConfig:
    """RPC configuration for a single chain."""
    chain: str
    endpoints: List[str]
    timeout_seconds: int = 10
    fallback_enabled: bool = True
    session_sticky: bool = True


@dataclass
class RPCStatus:
    """Status of an RPC endpoint."""
    url: str
    is_active: bool = False
    last_error: Optional[str] = None
    last_success_time: Optional[float] = None
    block_number: Optional[int] = None


class RPCManager:
    """
    Manages RPC connections with fallback support.
    
    Design Philosophy (from xaut-trade):
    - Only trigger fallback for NETWORK errors (429, 502, 503, timeout, connection refused)
    - Do NOT trigger fallback for business errors (insufficient balance, contract revert, etc.)
    - Once a fallback is selected, use it for the entire session (session stickiness)
    
    Usage:
        manager = RPCManager(config)
        
        # Get active RPC URL (may be primary or fallback)
        rpc_url = manager.get_active_rpc("base")
        
        # Make request with automatic fallback
        result = manager.request("base", {
            "jsonrpc": "2.0",
            "method": "eth_call",
            "params": [...],
            "id": 1
        })
    """
    
    # Error patterns that should trigger fallback
    NETWORK_ERROR_PATTERNS = [
        "429",           # Too Many Requests
        "502",           # Bad Gateway
        "503",           # Service Unavailable
        "timeout",
        "timed out",
        "connection refused",
        "ECONNREFUSED",
        "ENOTFOUND",
        "EAI_AGAIN",     # DNS lookup error
        "rate limit",
        "too many requests",
    ]
    
    def __init__(self, config: Dict[str, Any]):
        """
        Initialize RPC Manager.
        
        Args:
            config: Full config dict from config.json
        """
        self.config = config
        self.rpc_config = config.get("rpc", {})
        
        # Session state: active RPC per chain
        self._active_rpc: Dict[str, str] = {}
        
        # Status tracking
        self._rpc_status: Dict[str, Dict[str, RPCStatus]] = {}
        
        # Initialize status for all configured endpoints
        self._init_status()
    
    def _init_status(self):
        """Initialize RPC status tracking."""
        endpoints_config = self.rpc_config.get("endpoints", {})
        
        for chain, urls in endpoints_config.items():
            self._rpc_status[chain] = {}
            for url in urls:
                self._rpc_status[chain][url] = RPCStatus(url=url)
    
    def get_endpoints(self, chain: str) -> List[str]:
        """Get all configured endpoints for a chain."""
        # Support new config structure: chain.rpc
        chain_config = self.config.get("chain", {})
        rpc_endpoints = chain_config.get("rpc", {})
        if rpc_endpoints:
            return rpc_endpoints.get(chain, [])
        
        # Fallback to old structure: rpc.endpoints
        return self.rpc_config.get("endpoints", {}).get(chain, [])
    
    def get_active_rpc(self, chain: str) -> Optional[str]:
        """
        Get the currently active RPC URL for a chain.
        
        Returns the session-sticky fallback if one was selected,
        otherwise returns the primary endpoint.
        """
        # Check session sticky fallback
        if self.rpc_config.get("session_sticky", True):
            if chain in self._active_rpc:
                return self._active_rpc[chain]
        
        # Return primary (first) endpoint
        endpoints = self.get_endpoints(chain)
        return endpoints[0] if endpoints else None
    
    def set_active_rpc(self, chain: str, url: str):
        """Set the active RPC for a chain (for session stickiness)."""
        if self.rpc_config.get("session_sticky", True):
            self._active_rpc[chain] = url
        
        # Update status
        if chain in self._rpc_status and url in self._rpc_status[chain]:
            self._rpc_status[chain][url].is_active = True
            self._rpc_status[chain][url].last_success_time = time.time()
    
    def is_network_error(self, error: Exception) -> bool:
        """
        Check if an error is a network error that should trigger fallback.
        
        Network errors: 429, 502, 503, timeout, connection refused
        Business errors: insufficient balance, contract revert, invalid params
        """
        error_str = str(error).lower()
        
        for pattern in self.NETWORK_ERROR_PATTERNS:
            if pattern.lower() in error_str:
                return True
        
        return False
    
    def health_check(self, chain: str, url: str) -> tuple[bool, Optional[int]]:
        """
        Perform a quick health check on an RPC endpoint.
        
        Returns:
            (success, block_number) tuple
        """
        timeout = self.rpc_config.get("timeout_seconds", 10)
        
        try:
            response = requests.post(
                url,
                json={
                    "jsonrpc": "2.0",
                    "method": "eth_blockNumber",
                    "params": [],
                    "id": 1
                },
                timeout=timeout,
                headers={"Content-Type": "application/json"}
            )
            
            if response.status_code == 200:
                data = response.json()
                if "result" in data:
                    block_number = int(data["result"], 16)
                    return True, block_number
            
            return False, None
        
        except Exception:
            return False, None
    
    def find_healthy_fallback(self, chain: str) -> Optional[str]:
        """
        Find a healthy fallback RPC endpoint.
        
        Tries each endpoint in order, returns the first one that responds.
        """
        endpoints = self.get_endpoints(chain)
        current_active = self.get_active_rpc(chain)
        
        for url in endpoints:
            # Skip the current active (which failed)
            if url == current_active:
                continue
            
            success, block = self.health_check(chain, url)
            
            if success:
                self.set_active_rpc(chain, url)
                return url
        
        return None
    
    def request(
        self,
        chain: str,
        payload: Dict[str, Any],
        prefer_url: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Make an RPC request with automatic fallback.
        
        Args:
            chain: Chain name (base, ethereum)
            payload: JSON-RPC payload
            prefer_url: Preferred URL (optional, overrides session state)
        
        Returns:
            JSON-RPC response
        
        Raises:
            RPCErrors: If all endpoints fail
        """
        timeout = self.rpc_config.get("timeout_seconds", 10)
        fallback_enabled = self.rpc_config.get("fallback_enabled", True)
        
        # Determine which URL to use
        if prefer_url:
            url = prefer_url
        else:
            url = self.get_active_rpc(chain)
        
        if not url:
            raise RPCErrors(f"No RPC endpoint configured for chain: {chain}")
        
        # Try the primary/active URL
        try:
            response = requests.post(
                url,
                json=payload,
                timeout=timeout,
                headers={"Content-Type": "application/json"}
            )
            
            if response.status_code == 200:
                result = response.json()
                
                # Check for JSON-RPC error (business error, don't fallback)
                if "error" in result:
                    return result
                
                # Success
                self.set_active_rpc(chain, url)
                return result
            
            # Non-200 status - check if it's a network error
            if self._is_status_network_error(response.status_code):
                raise requests.exceptions.RequestException(f"HTTP {response.status_code}")
            
            # Other HTTP errors - return as-is
            return {"error": {"code": response.status_code, "message": response.text}}
        
        except requests.exceptions.RequestException as e:
            # Check if we should try fallback
            if not fallback_enabled:
                raise RPCErrors(f"RPC request failed: {str(e)}")
            
            if not self.is_network_error(e):
                # Business error, don't fallback
                raise RPCErrors(f"RPC request failed: {str(e)}")
            
            # Try fallback
            fallback_url = self.find_healthy_fallback(chain)
            
            if fallback_url:
                # Retry with fallback
                try:
                    response = requests.post(
                        fallback_url,
                        json=payload,
                        timeout=timeout,
                        headers={"Content-Type": "application/json"}
                    )
                    
                    if response.status_code == 200:
                        result = response.json()
                        self.set_active_rpc(chain, fallback_url)
                        return result
                
                except requests.exceptions.RequestException:
                    pass
            
            # All endpoints failed
            raise RPCErrors(
                f"All RPC endpoints failed for {chain}. "
                f"Tried: {len(self.get_endpoints(chain))} endpoints."
            )
    
    def _is_status_network_error(self, status_code: int) -> bool:
        """Check if HTTP status code indicates a network error."""
        network_status_codes = [429, 502, 503, 504]
        return status_code in network_status_codes
    
    def get_status_report(self, chain: str) -> Dict[str, Any]:
        """Get status report for all RPC endpoints of a chain."""
        endpoints = self.get_endpoints(chain)
        active = self.get_active_rpc(chain)
        
        report = {
            "chain": chain,
            "active_rpc": active,
            "fallback_enabled": self.rpc_config.get("fallback_enabled", True),
            "session_sticky": self.rpc_config.get("session_sticky", True),
            "endpoints": []
        }
        
        for url in endpoints:
            status = self._rpc_status.get(chain, {}).get(url)
            report["endpoints"].append({
                "url": url,
                "is_active": url == active,
                "last_error": status.last_error if status else None,
                "last_success_time": status.last_success_time if status else None
            })
        
        return report


# Singleton instance for session-level stickiness
_rpc_manager_instance: Optional[RPCManager] = None


def get_rpc_manager(config: Optional[Dict[str, Any]] = None) -> RPCManager:
    """
    Get or create the singleton RPC manager.
    
    Args:
        config: Config dict (required on first call)
    
    Returns:
        RPCManager instance
    """
    global _rpc_manager_instance
    
    if _rpc_manager_instance is None:
        if config is None:
            raise ValueError("Config required for first initialization")
        _rpc_manager_instance = RPCManager(config)
    
    return _rpc_manager_instance


def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
    """Load configuration from config.json."""
    if config_path is None:
        # Try multiple paths to find config.json
        script_dir = os.path.dirname(os.path.abspath(__file__))
        
        # Path 1: scripts/utils/.. -> skills/web3-investor/
        skill_dir = os.path.dirname(os.path.dirname(script_dir))
        config_path = os.path.join(skill_dir, "config", "config.json")
        
        # Path 2: If running from workspace root
        if not os.path.exists(config_path):
            config_path = "skills/web3-investor/config/config.json"
        
        # Path 3: If running from skill directory
        if not os.path.exists(config_path):
            config_path = "config/config.json"
    
    if os.path.exists(config_path):
        with open(config_path) as f:
            return json.load(f)
    
    return {}


if __name__ == "__main__":
    # Simple test
    config = load_config()
    manager = RPCManager(config)
    
    print("RPC Manager Test")
    print("=" * 50)
    
    for chain in ["base", "ethereum"]:
        print(f"\n{chain.upper()}:")
        print(f"  Primary: {manager.get_endpoints(chain)[0] if manager.get_endpoints(chain) else 'N/A'}")
        print(f"  Active: {manager.get_active_rpc(chain)}")
        
        # Health check
        url = manager.get_active_rpc(chain)
        if url:
            success, block = manager.health_check(chain, url)
            print(f"  Health: {'✅ OK' if success else '❌ Failed'}")
            if block:
                print(f"  Block: #{block}")
    
    print("\n" + "=" * 50)
    print("Status Report:")
    print(json.dumps(manager.get_status_report("base"), indent=2))
```