web3-investor
AI-friendly Web3 investment infrastructure for autonomous agents. Use when (1) discovering and analyzing DeFi/NFT investment opportunities, (2) executing secure transactions via local keystore signer REST API with preview-approve-execute state machine, (3) managing portfolio with dashboards and expiry alerts. Supports base and ethereum chains, configurable security constraints including whitelist protection, transaction limits, and mandatory simulation before execution.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-web3-investor
Repository
Skill path: skills/bevanding/web3-investor
AI-friendly Web3 investment infrastructure for autonomous agents. Use when (1) discovering and analyzing DeFi/NFT investment opportunities, (2) executing secure transactions via local keystore signer REST API with preview-approve-execute state machine, (3) managing portfolio with dashboards and expiry alerts. Supports base and ethereum chains, configurable security constraints including whitelist protection, transaction limits, and mandatory simulation before execution.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Backend, Data / AI, Security.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install web3-investor into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding web3-investor to shared team environments
- Use web3-investor for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: web3-investor
description: AI-friendly Web3 investment infrastructure for autonomous agents. Use when (1) discovering and analyzing DeFi/NFT investment opportunities, (2) executing secure transactions via local keystore signer REST API with preview-approve-execute state machine, (3) managing portfolio with dashboards and expiry alerts. Supports base and ethereum chains, configurable security constraints including whitelist protection, transaction limits, and mandatory simulation before execution.
---
# Web3 Investor Skill
> **Purpose**: Enable AI agents to safely discover, analyze, and execute DeFi investments.
>
> **Core Philosophy**: Data-driven decisions. No generic advice without real-time discovery.
---
## ⚠️ Critical Rules (MUST FOLLOW)
### Rule 1: Discovery First
**When user asks for investment advice:**
```
❌ WRONG: Give generic advice immediately (e.g., "I recommend Aave")
✅ CORRECT:
1. Collect investment preferences (chain, token, risk tolerance)
2. Run discovery to get real-time data
3. Analyze data
4. Provide data-backed recommendations
```
### Rule 2: User's LLM Makes Decisions
- This skill provides **raw data only**
- Investment analysis and recommendations are the responsibility of the user's LLM/agent
- This skill is NOT responsible for investment outcomes
### Rule 3: Risk Acknowledgment
- APY data comes from third-party APIs and may be delayed or inaccurate
- Investment decisions are made at the user's own risk
- Always DYOR (Do Your Own Research)
### Rule 4: Verify Execution Capability Before Trading
**Before attempting any transaction, the agent MUST check signer availability:**
```
❌ WRONG: Directly call preview/execute without checking API
✅ CORRECT:
1. Check if signer API is reachable (call balances endpoint)
2. If unreachable → inform user: "Signer service unavailable, please check SETUP.md"
3. Never proceed with preview if signer is unavailable
```
**Health Check Command**:
```bash
python3 scripts/trading/trade_executor.py balances --network base
# If success → signer is available
# If error E010 → signer unavailable, stop and inform user
```
### Rule 5: Check Payment Capability FIRST (NEW in v0.5.0)
**Before asking user for transaction details, ALWAYS check execution readiness:**
```
❌ WRONG: Ask "How much do you want to invest?" without checking payment capability
✅ CORRECT:
1. Run: python3 scripts/trading/preflight.py check --network <chain>
2. Inform user of available payment methods
3. Then ask for transaction details
```
**Preflight Check Output**:
```json
{
"recommended": "eip681_payment_link",
"methods": [
{"method": "keystore_signer", "status": "unavailable"},
{"method": "eip681_payment_link", "status": "available"}
],
"message": "⚠️ No local signer. Use EIP-681 payment link."
}
```
**Payment Method Flow**:
| Recommended Method | Action |
|-------------------|--------|
| `keystore_signer` | Use `preview → approve → execute` flow |
| `eip681_payment_link` | Generate EIP-681 link with `eip681_payment.py` |
---
## 🎯 Quick Start for Agents
### Step 1: Collect Investment Preferences (REQUIRED)
Before running discovery, ask the user:
| Preference | Key | Options | Why It Matters |
|------------|-----|---------|----------------|
| **Chain** | `chain` | ethereum, base, arbitrum, optimism | Determines which blockchain to search |
| **Capital Token** | `capital_token` | USDC, USDT, ETH, WBTC, etc. | The token they want to invest |
| **Reward Preference** | `reward_preference` | single / multi / any | Single token rewards vs multiple tokens |
| **Accept IL** | `accept_il` | true / false / any | Impermanent loss tolerance |
| **Underlying Type** | `underlying_preference` | rwa / onchain / mixed / any | Real-world assets vs on-chain |
### Step 2: Run Discovery
```bash
# Basic search
python3 scripts/discovery/find_opportunities.py \
--chain ethereum \
--min-apy 5 \
--limit 20
# With LLM-ready output
python3 scripts/discovery/find_opportunities.py \
--chain ethereum \
--llm-ready \
--output json
```
### Step 3: Filter by Preferences
```python
from scripts.discovery.investment_profile import InvestmentProfile
profile = InvestmentProfile()
profile.set_preferences(
chain="ethereum",
capital_token="USDC",
accept_il=False,
reward_preference="single"
)
filtered = profile.filter_opportunities(opportunities)
```
### Step 4: Execute Transaction (Choose Payment Method)
#### Option A: Keystore Signer (Production)
```bash
# Preview → Approve → Execute
python3 scripts/trading/trade_executor.py preview \
--type deposit --protocol aave --asset USDC --amount 1000 --network base
python3 scripts/trading/trade_executor.py approve --preview-id <uuid>
python3 scripts/trading/trade_executor.py execute --approval-id <uuid>
```
#### Option B: EIP-681 Payment Link (Mobile)
```bash
python3 scripts/trading/eip681_payment.py generate \
--token USDC --to 0x... --amount 10 --network base \
--qr-output /tmp/payment_qr.png
```
---
## 📁 Project Structure
```
web3-investor/
├── scripts/
│ ├── discovery/ # Opportunity discovery tools
│ ├── trading/ # Transaction execution modules
│ └── portfolio/ # Balance queries
├── config/
│ ├── config.json # Execution model & security settings
│ └── protocols.json # Protocol registry
├── references/ # Detailed module documentation
│ ├── discovery.md
│ ├── investment-profile.md
│ ├── trade-executor.md
│ ├── portfolio-indexer.md
│ ├── protocols.md
│ └── risk-framework.md
└── SKILL.md
```
---
## 📚 Module Overview
| Module | Purpose | Details |
|--------|---------|---------|
| **Discovery** | Search DeFi yield opportunities | See [references/discovery.md](references/discovery.md) |
| **Investment Profile** | Preference collection & filtering | See [references/investment-profile.md](references/investment-profile.md) |
| **Trade Executor** | Transaction execution REST API | See [references/trade-executor.md](references/trade-executor.md) |
| **Portfolio Indexer** | On-chain balance queries | See [references/portfolio-indexer.md](references/portfolio-indexer.md) |
---
## 🔍 Discovery Data Sources
### Primary Data Sources
| Source | Type | Use Case |
|--------|------|----------|
| **MCP (AntAlpha)** | Real-time yields | Primary source for DeFi opportunities |
| **DefiLlama** | Protocol TVL/Yields | Fallback and cross-validation |
| **Dune** | On-chain analytics | Custom queries, advanced analysis |
### Dune MCP Integration
Dune provides powerful on-chain analytics through MCP (Model Context Protocol). Use Dune for:
- Custom on-chain data queries
- Protocol-specific analytics
- Historical trend analysis
- Whale wallet tracking
**Configuration** (`config/config.json`):
```json
{
"discovery": {
"data_sources": ["mcp", "dune", "defillama"],
"dune": {
"mcp_endpoint": "https://api.dune.com/mcp/v1",
"auth": {
"header": { "name": "x-dune-api-key", "env_var": "DUNE_API_KEY" },
"query_param": { "name": "api_key", "env_var": "DUNE_API_KEY" }
}
}
}
}
```
**Environment Setup**:
```bash
# Required for Dune integration
export DUNE_API_KEY="your_dune_api_key"
```
**Authentication Methods**:
1. **Header Auth** (Recommended): `x-dune-api-key: <DUNE_API_KEY>`
2. **Query Param**: `?api_key=<DUNE_API_KEY>`
**Usage Example**:
```bash
# Query Dune for protocol analytics
curl -H "x-dune-api-key: $DUNE_API_KEY" \
"https://api.dune.com/mcp/v1/query/<query_id>/results"
```
---
## ⚙️ Configuration
### Environment Variables
```bash
# Optional: Alchemy for better RPC
ALCHEMY_API_KEY=your_key_here
# Optional: Debank for portfolio tracking
WEB3_INVESTOR_DEBANK_API_KEY=your_key_here
# Trade Executor: Local API endpoint
WEB3_INVESTOR_API_URL=http://localhost:3000/api
```
### Security Configuration (`config/config.json`)
```json
{
"security": {
"max_trade_usd": 10000,
"max_slippage_percent": 3.0,
"whitelist_enabled": false,
"whitelist_tokens": ["USDC", "USDT", "DAI", "WETH", "ETH", "stETH", "rETH"],
"whitelist_protocols": ["uniswap", "aave", "compound", "lido", "0x"],
"double_confirm": {
"enabled": true,
"large_trade_threshold_usd": 5000
}
}
}
```
#### Whitelist Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `whitelist_enabled` | `false` | Enable/disable whitelist filtering in trade execution |
| `whitelist_tokens` | [...] | Allowed tokens for trading |
| `whitelist_protocols` | [...] | Allowed protocols for interaction |
**Note**: When `whitelist_enabled` is `false` (default), trade execution skips whitelist checks, allowing broader protocol/token access. Set to `true` to enforce strict whitelist validation.
### Whitelist Setup
```bash
python3 scripts/trading/whitelist.py \
--add 0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2 \
--name "Aave V3 Pool" \
--limit 10000
```
---
## 📝 Skill Author Template (MUST INCLUDE IN PROMPTS)
When using this skill for transaction generation, include this template:
```
Output structured transaction request (JSON), do not execute directly.
All transactions must go through preview -> approve -> execute.
If transaction parameters cannot be determined, return clarification, do not guess.
```
### Required Output Format
```json
{
"request_id": "uuid",
"timestamp": "ISO8601",
"network": "base|ethereum",
"chain_id": 8453|1,
"type": "transfer|swap|deposit|contract_call",
"description": "human readable description",
"transaction": {
"to": "0x...",
"value": "0x0",
"data": "0x...",
"gas_limit": 250000
},
"metadata": {
"protocol": "uniswap|aave|compound|...",
"from_token": "USDC",
"to_token": "WETH",
"amount": "5"
}
}
```
---
## 🆘 Troubleshooting
### Import Errors
Run from workspace root:
```bash
cd /home/admin/.openclaw/workspace
python3 skills/web3-investor/scripts/discovery/find_opportunities.py ...
```
### No Opportunities Found
- Check chain name spelling
- Try lowering `--min-apy` threshold
- Ensure `--max-apy` isn't too restrictive
### Rate Limiting
- DefiLlama has generous limits but can occasionally rate limit
- Add delays between requests if batch processing
---
## 🤝 Contributing
Test donations welcome:
- **Network**: Base Chain
- **Address**: `0x1F3A9A450428BbF161C4C33f10bd7AA1b2599a3e`
---
**Maintainer**: Web3 Investor Skill Team
**Registry**: https://clawhub.com/skills/web3-investor
**License**: MIT
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### references/discovery.md
```markdown
# Opportunity Discovery Module
> **Purpose**: Search DeFi yield opportunities across multiple sources with real-time data.
---
## What It Does
Searches DeFi yield opportunities across multiple sources with real-time data.
## Key Features
- **Risk Signals**: Each opportunity includes structured risk data:
- `reward_type`: "none" | "single" | "multi"
- `has_il_risk`: true | false (impermanent loss)
- `underlying_type`: "rwa" | "onchain" | "mixed" | "unknown"
- **Actionable Addresses**: Contract addresses ready for execution
- **LLM-Ready Output**: Structured JSON optimized for AI analysis
## Data Sources (Priority Order)
| Source | Priority | Type | API Key Required |
|--------|----------|------|------------------|
| **MCP (AntAlpha)** | Primary | Real-time yields | No |
| **Dune Analytics** | Secondary | On-chain analytics | Yes (`DUNE_API_KEY`) |
| **DefiLlama** | Fallback | Protocol TVL/Yields | No |
| **Protocol Registry** | Static | Known protocol metadata | No |
### MCP (AntAlpha)
- **Endpoint**: `https://mcp.prime.antalpha.com/mcp`
- **Fallback**: `http://47.85.100.251:3000`
- **Use Case**: Real-time DeFi opportunity discovery
- **Configuration**: See `config/config.json` → `discovery.mcp_url`
### Dune Analytics
- **MCP Endpoint**: `https://api.dune.com/mcp/v1`
- **Auth Methods**:
1. Header: `x-dune-api-key: <DUNE_API_KEY>`
2. Query param: `?api_key=<DUNE_API_KEY>`
- **Environment Variable**: `DUNE_API_KEY`
- **Use Case**: Custom on-chain queries, protocol analytics, whale tracking
- **Configuration**:
```json
{
"discovery": {
"dune": {
"mcp_endpoint": "https://api.dune.com/mcp/v1",
"auth": {
"header": { "name": "x-dune-api-key", "env_var": "DUNE_API_KEY" },
"query_param": { "name": "api_key", "env_var": "DUNE_API_KEY" }
},
"enabled": true
}
}
}
```
### DefiLlama
- **Endpoint**: `https://yields.llama.fi`
- **Use Case**: Protocol TVL, yield data, cross-validation
- **No API key required**
## Usage Examples
```bash
# Search Ethereum opportunities with min 5% APY
python3 scripts/discovery/find_opportunities.py \
--chain ethereum \
--min-apy 5 \
--limit 20
# Search stablecoin products only
python3 scripts/discovery/find_opportunities.py \
--chain ethereum \
--min-apy 3 \
--max-apy 25 \
--limit 50
# Output for LLM analysis
python3 scripts/discovery/find_opportunities.py \
--chain ethereum \
--llm-ready \
--output json
```
## Output Format
### Standard Output
```
# DeFi Opportunities on ethereum
## 1. Aave V3 USDC
- APY: 5.23%
- TVL: $1,234,567,890
- Pool: 0x...
- Risk: single reward, no IL
```
### LLM-Ready Output (JSON)
```json
{
"opportunities": [
{
"name": "Aave V3 USDC",
"chain": "ethereum",
"apy": 5.23,
"tvl": 1234567890,
"pool_address": "0x...",
"reward_type": "single",
"has_il_risk": false,
"underlying_type": "onchain"
}
]
}
```
## Troubleshooting
### No Opportunities Found
- Check chain name spelling (case-sensitive in some cases)
- Try lowering `--min-apy` threshold
- Ensure `--max-apy` isn't too restrictive
### Rate Limiting
- DefiLlama has generous limits but can occasionally rate limit
- Add delays between requests if batch processing
```
### references/investment-profile.md
```markdown
# Investment Profile & Filtering Module
> **Purpose**: Structured preference collection and opportunity filtering.
---
## What It Does
Structured preference collection and opportunity filtering.
## Why Use It
- Ensures consistent question flow across different agents
- Provides type-safe preference storage
- One-shot filtering based on multiple criteria
## Preference Parameters
| Preference | Key | Options | Why It Matters |
|------------|-----|---------|----------------|
| **Chain** | `chain` | ethereum, base, arbitrum, optimism | Determines which blockchain to search |
| **Capital Token** | `capital_token` | USDC, USDT, ETH, WBTC, etc. | The token they want to invest |
| **Reward Preference** | `reward_preference` | single / multi / any | Single token rewards vs multiple tokens (e.g., CRV+CVX) |
| **Accept IL** | `accept_il` | true / false / any | Impermanent loss tolerance for LP products |
| **Underlying Type** | `underlying_preference` | rwa / onchain / mixed / any | Real-world assets vs pure on-chain protocols |
## Code Example
```python
from scripts.discovery.investment_profile import InvestmentProfile
# Create profile
profile = InvestmentProfile()
# Method 1: Direct assignment
profile.chain = "ethereum"
profile.capital_token = "USDC"
profile.accept_il = False
profile.reward_preference = "single"
profile.min_apy = 5
profile.max_apy = 30
# Method 2: Batch setup
profile.set_preferences(
chain="ethereum",
capital_token="USDC",
accept_il=False,
reward_preference="single",
underlying_preference="onchain",
min_apy=5,
max_apy=30
)
# Filter opportunities
filtered = profile.filter_opportunities(opportunities)
# Get human-readable explanation
print(profile.explain_filtering(len(opportunities), len(filtered)))
```
## Available Questions for UI Building
```python
questions = InvestmentProfile.get_questions()
# Returns structured dict:
{
"required": [...], # Must ask: chain, capital_token
"preference": [...], # Should ask: reward_preference, accept_il, etc.
"constraints": [...] # Optional: min_apy, max_apy, min_tvl
}
```
## Filtering Logic
The filtering applies all set preferences:
1. **Chain filter**: Exact match
2. **Capital token filter**: Match against opportunity's underlying tokens
3. **Reward preference filter**:
- `single`: Only single-token rewards
- `multi`: Multi-token rewards acceptable
- `any`: No filter
4. **IL tolerance filter**:
- `false`: Exclude LP products with IL risk
- `true`: Include all
- `any`: No filter
5. **Underlying type filter**: Match RWA/onchain/mixed
6. **APY range filter**: Min/max bounds
7. **TVL filter**: Minimum TVL threshold
```
### references/trade-executor.md
```markdown
# Trade Executor Module (REST API Adapter)
> **Purpose**: Generate executable transaction requests via REST API to local keystore signer.
---
## What It Does
Generates executable transaction requests via REST API to local keystore signer. **This module does NOT hold private keys** — all transactions require explicit approval.
## Execution Model
| Property | Value |
|----------|-------|
| **Wallet Type** | Local keystore signer |
| **Supported Chains** | `base`, `ethereum` |
| **Entry Point** | REST API |
| **State Machine** | `preview` → `approve` → `execute` |
## Security Constraints (MUST FOLLOW)
- ❌ **Cannot skip `approve` step** — every transaction requires manual confirmation
- ✅ **Must simulate before execution** — uses `eth_call` for validation
- ⚠️ **Must return risk warnings** — insufficient balance, missing allowance, invalid route
- 🔒 **Default minimum permissions**:
- Whitelist chains/protocols/tokens
- Transaction value limits
- Max slippage caps
## Payment Methods
### Option A: Keystore Signer (Production)
Requires local signer service running. Best for automated agents with dedicated signing infrastructure.
```bash
# Step 4a: Preview transaction
python3 scripts/trading/trade_executor.py preview \
--type deposit \
--protocol aave \
--asset USDC \
--amount 1000 \
--network base
# Step 4b: Approve (returns approval_id)
python3 scripts/trading/trade_executor.py approve \
--preview-id <uuid-from-preview>
# Step 4c: Execute (broadcasts signed tx)
python3 scripts/trading/trade_executor.py execute \
--approval-id <uuid-from-approve>
```
### Option B: EIP-681 Payment Link (Mobile Recommended)
Generate a MetaMask-compatible payment link or QR code. Best for mobile users and quick investments without local signer setup.
```bash
# Generate payment link for RWA investment
python3 scripts/trading/eip681_payment.py generate \
--token USDC \
--to 0x1F3A9A450428BbF161C4C33f10bd7AA1b2599a3e \
--amount 10 \
--network base \
--qr-output /tmp/payment_qr.png
```
**Output includes:**
- MetaMask deep link (mobile users click to open app)
- QR code PNG file (desktop users scan with phone)
- Raw transaction details (for manual verification)
**Supported tokens:** USDC, USDT, WETH, ETH on Base and Ethereum mainnet.
### Option C: WalletConnect (Roadmap)
Coming in future release. Will support complex DeFi interactions and persistent wallet connections.
## API Endpoints
| Operation | Method | Endpoint | Description |
|-----------|--------|----------|-------------|
| Query Balances | GET | `/api/wallet/balances` | Get wallet token balances |
| Preview Swap | POST | `/api/trades/preview` or `/api/uniswap/preview-swap` or `/api/zerox/preview-swap` | Generate transaction preview |
| Approve | POST | `/api/trades/approve` | Confirm transaction for execution |
| Execute | POST | `/api/trades/execute` | Broadcast signed transaction |
| Check Status | GET | `/api/transactions/{tx_hash}` | Query transaction status |
| Query Allowances | GET | `/api/allowances` | Get token allowances |
| Revoke Preview | POST | `/api/allowances/revoke-preview` | Preview allowance revoke |
## Unified Transaction Request Format
All transaction requests follow this structure:
```json
{
"request_id": "uuid",
"timestamp": "ISO8601",
"network": "base",
"chain_id": 8453,
"type": "transfer|swap|deposit|contract_call",
"description": "human readable",
"transaction": {
"to": "0x...",
"value": "0x0",
"data": "0x...",
"gas_limit": 250000
},
"metadata": {
"protocol": "uniswap|0x|aave|...",
"from_token": "USDC",
"to_token": "WETH",
"amount": "5"
}
}
```
## Response Specifications
### `preview` Response
```json
{
"preview_id": "uuid",
"simulation_ok": true|false,
"risk": {
"balance_sufficient": true|false,
"allowance_sufficient": true|false,
"route_valid": true|false,
"warnings": ["..."]
},
"next_step": "approve" | "clarification"
}
```
### `approve` Response
```json
{
"approval_id": "uuid",
"preview_id": "...",
"approved_at": "ISO8601",
"expires_at": "ISO8601"
}
```
### `execute` Response
```json
{
"tx_hash": "0x...",
"explorer_url": "https://basescan.org/tx/0x...",
"executed_at": "ISO8601",
"network": "base"
}
```
### Error Format
```json
{
"code": "E001-E999",
"message": "human readable",
"diagnostics": "technical details"
}
```
## CLI Usage Examples
```bash
# Step 1: Preview a swap
python3 scripts/trading/trade_executor.py preview \
--type swap \
--from-token USDC \
--to-token WETH \
--amount 5 \
--network base
# Step 2: Approve the preview
python3 scripts/trading/trade_executor.py approve \
--preview-id <uuid-from-step-1>
# Step 3: Execute the approved transaction
python3 scripts/trading/trade_executor.py execute \
--approval-id <uuid-from-step-2>
# Check balances
python3 scripts/trading/trade_executor.py balances \
--network base
# Check transaction status
python3 scripts/trading/trade_executor.py status \
--tx-hash 0x...
```
## Health Check (MUST RUN BEFORE TRADING)
```bash
python3 scripts/trading/trade_executor.py balances --network base
# If success → signer is available
# If error E010 → signer unavailable, stop and inform user
```
## Module Usage Guide
| Module | Purpose | Production/Debug |
|--------|---------|------------------|
| `trade_executor.py` | Main execution module, connects to signer service | ✅ Production |
| `eip681_payment.py` | Generate MetaMask payment links & QR codes | ✅ Production |
| `safe_vault.py` | Calldata generation, balance check (no signing) | 🔧 Debug |
| `simulate_tx.py` | Transaction simulation (no signing) | 🔧 Debug |
## Troubleshooting
### Import Errors
If you see `ModuleNotFoundError`, ensure you're running from the workspace root:
```bash
cd /home/admin/.openclaw/workspace
python3 skills/web3-investor/scripts/trading/trade_executor.py ...
```
### Signer Unavailable
- Check if the local signer service is running
- Verify `WEB3_INVESTOR_API_URL` environment variable
- Consult SETUP.md for signer setup instructions
```
### references/portfolio-indexer.md
```markdown
# Portfolio Indexer Module
> **Purpose**: Query on-chain balances for specified addresses.
---
## What It Does
Queries on-chain balances for specified addresses.
## Supported Chains
- Ethereum mainnet
- Base
- Arbitrum (partial)
## Usage
```bash
# Query portfolio
python3 scripts/portfolio/indexer.py \
--address 0x... \
--chain ethereum \
--output json
```
## Output Format
### JSON Output
```json
{
"address": "0x...",
"chain": "ethereum",
"timestamp": "2026-03-05T15:00:00Z",
"balances": [
{
"token": "USDC",
"symbol": "USDC",
"balance": "1000.50",
"balance_usd": 1000.50,
"contract": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
},
{
"token": "WETH",
"symbol": "WETH",
"balance": "0.5",
"balance_usd": 1500.00,
"contract": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
}
],
"total_usd": 2500.50
}
```
## Environment Variables
```bash
# Optional: Alchemy for better RPC
ALCHEMY_API_KEY=your_key_here
# Optional: Debank for portfolio tracking
WEB3_INVESTOR_DEBANK_API_KEY=your_key_here
```
## Troubleshooting
### No Balances Found
- Verify the address is correct
- Check if the chain is supported
- Ensure RPC endpoint is accessible
### Slow Response
- Alchemy API provides faster responses than public RPC
- Debank API provides aggregated portfolio data across chains
```
### scripts/trading/trade_executor.py
```python
#!/usr/bin/env python3
"""
Trade Executor - REST API adapter for local keystore signer (v0.3.0)
DESIGN PHILOSOPHY (Agent-First Design):
This module is designed for AGENTS with LLM and programming capabilities.
It provides a REFERENCE implementation, not a rigid standard.
If your signer service uses different API paths or response formats:
1. Modify config/config.json to map your endpoints
2. Or modify this file's api_request() function
3. Or implement an adapter layer in your signer service
Architecture:
- This module ONLY generates executable transaction requests, does NOT hold private keys
- All transactions must go through: preview -> approve -> execute
- Supports: base, ethereum chains
- Entry point: REST API endpoints
Security Constraints:
- Cannot skip 'approve' step
- Must simulate before execution (eth_call)
- Must return risk warnings (insufficient balance, missing approval, invalid route)
- Default minimum permissions: whitelist chains/protocols/tokens, limits, max slippage
Usage:
python3 trade_executor.py preview --type swap --from-token USDC --to-token WETH --amount 5 --network base
python3 trade_executor.py approve --preview-id <id>
python3 trade_executor.py execute --approval-id <id>
python3 trade_executor.py status --tx-hash 0x...
For API specification, see: SIGNER_API_SPEC.md
For setup guide, see: SETUP.md
"""
import argparse
import json
import os
import sys
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
import urllib.request
import urllib.error
# ============================================================================
# Configuration
# ============================================================================
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SKILL_DIR = os.path.dirname(os.path.dirname(SCRIPT_DIR)) # trading -> scripts -> skill_dir
CONFIG_PATH = os.path.join(SKILL_DIR, "config", "config.json")
# Add scripts directory to path for imports (utils and schemas are in scripts/)
sys.path.insert(0, os.path.dirname(SCRIPT_DIR))
try:
from utils.preflight import PreflightChecker, PreflightError, load_config as load_preflight_config
PREFLIGHT_AVAILABLE = True
except ImportError as e:
PREFLIGHT_AVAILABLE = False
try:
from utils.rpc_manager import RPCManager, get_rpc_manager
RPC_MANAGER_AVAILABLE = True
except ImportError:
RPC_MANAGER_AVAILABLE = False
try:
from schemas.output_schema import (
PreviewOutput, ApprovalResult, ExecutionResult,
InputInfo, QuoteInfo, RiskWarning, RiskLevel,
create_preview_output, create_approval_result, create_execution_result
)
SCHEMA_AVAILABLE = True
except ImportError:
SCHEMA_AVAILABLE = False
# API Base URL (from environment or default)
API_BASE_URL = os.environ.get("WEB3_INVESTOR_API_URL", "http://localhost:3000/api")
# Supported chains
SUPPORTED_CHAINS = {
"base": {"chain_id": 8453, "name": "Base", "explorer": "https://basescan.org"},
"ethereum": {"chain_id": 1, "name": "Ethereum", "explorer": "https://etherscan.io"},
}
# Default security settings
DEFAULT_SECURITY = {
"max_slippage_percent": 3.0,
"whitelist_enabled": False,
"whitelist_chains": ["base", "ethereum"],
"whitelist_protocols": ["uniswap", "aave", "compound", "lido", "0x"],
"whitelist_tokens": ["USDC", "USDT", "DAI", "WETH", "ETH", "stETH", "rETH"],
"max_trade_value_usd": 10000,
}
# Error codes
ERROR_CODES = {
"INSUFFICIENT_BALANCE": {"code": "E001", "message": "Insufficient balance for transaction"},
"INSUFFICIENT_ALLOWANCE": {"code": "E002", "message": "Token allowance insufficient, approval required"},
"INVALID_ROUTE": {"code": "E003", "message": "No valid route found for swap"},
"CHAIN_NOT_SUPPORTED": {"code": "E004", "message": "Chain not in whitelist"},
"PROTOCOL_NOT_SUPPORTED": {"code": "E005", "message": "Protocol not in whitelist"},
"TOKEN_NOT_SUPPORTED": {"code": "E006", "message": "Token not in whitelist"},
"EXCEEDS_LIMIT": {"code": "E007", "message": "Transaction exceeds configured limit"},
"PREVIEW_FAILED": {"code": "E008", "message": "Transaction simulation failed"},
"APPROVAL_REQUIRED": {"code": "E009", "message": "Cannot execute without approval"},
"API_UNAVAILABLE": {"code": "E010", "message": "Local API service unavailable"},
"PREFLIGHT_FAILED": {"code": "E014", "message": "Pre-flight checks failed"},
"UNKNOWN_ERROR": {"code": "E999", "message": "Unknown error occurred"},
}
# ============================================================================
# Unified Transaction Request Format
# ============================================================================
def create_transaction_request(
network: str,
tx_type: str,
to: str,
value: str = "0x0",
data: str = "0x",
gas_limit: int = 250000,
description: str = "",
metadata: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Create a unified transaction request format.
Args:
network: Chain name (base, ethereum)
tx_type: Transaction type (transfer, swap, deposit, contract_call)
to: Target address
value: Value in hex (0x...)
data: Calldata in hex
gas_limit: Estimated gas limit
description: Human readable description
metadata: Additional protocol info (protocol, from_token, to_token, amount)
Returns:
Unified transaction request dict
"""
if network not in SUPPORTED_CHAINS:
raise ValueError(f"Unsupported network: {network}. Supported: {list(SUPPORTED_CHAINS.keys())}")
request = {
"request_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat() + "Z",
"network": network,
"chain_id": SUPPORTED_CHAINS[network]["chain_id"],
"type": tx_type,
"description": description,
"transaction": {
"to": to,
"value": value,
"data": data,
"gas_limit": gas_limit
},
"metadata": metadata or {}
}
return request
# ============================================================================
# API Client
# ============================================================================
def api_request(
method: str,
endpoint: str,
data: Optional[Dict] = None,
timeout: int = 30
) -> Dict[str, Any]:
"""
Make a request to the local signer API.
This is the core HTTP client. If your signer service uses different
API paths or response formats, modify this function or update config.json.
Args:
method: HTTP method (GET, POST)
endpoint: API endpoint (without /api prefix)
data: Request body for POST
timeout: Request timeout in seconds
Returns:
API response as dict with 'success' field
"""
url = f"{API_BASE_URL}{endpoint}"
headers = {"Content-Type": "application/json"}
try:
if method.upper() == "GET":
req = urllib.request.Request(url, headers=headers, method="GET")
else:
body = json.dumps(data or {}).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as response:
result = json.loads(response.read().decode("utf-8"))
return {"success": True, "data": result}
except urllib.error.URLError as e:
return {
"success": False,
"error": ERROR_CODES["API_UNAVAILABLE"],
"diagnostics": f"Cannot connect to {url}: {str(e)}"
}
except urllib.error.HTTPError as e:
try:
error_body = json.loads(e.read().decode("utf-8"))
return {
"success": False,
"error": error_body.get("error", ERROR_CODES["UNKNOWN_ERROR"]),
"diagnostics": error_body.get("message", f"HTTP {e.code}")
}
except:
return {
"success": False,
"error": ERROR_CODES["UNKNOWN_ERROR"],
"diagnostics": f"HTTP {e.code}: {e.reason}"
}
except Exception as e:
return {
"success": False,
"error": ERROR_CODES["UNKNOWN_ERROR"],
"diagnostics": str(e)
}
# ============================================================================
# Wallet Operations
# ============================================================================
def get_wallet_balances(
chain: Optional[str] = None,
tokens: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Query wallet balances from local API.
GET /api/wallet/balances
"""
params = []
if chain:
params.append(f"chain={chain}")
if tokens:
params.append(f"tokens={','.join(tokens)}")
endpoint = "/wallet/balances"
if params:
endpoint += "?" + "&".join(params)
return api_request("GET", endpoint)
# ============================================================================
# Trade Operations (State Machine: Preview -> Approve -> Execute)
# ============================================================================
def run_preflight_checks(transaction_type: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Run pre-flight checks if available and enabled.
Returns None if checks pass, or error dict if checks fail.
"""
if not PREFLIGHT_AVAILABLE:
return None
try:
config = load_preflight_config(CONFIG_PATH)
preflight_config = config.get("preflight", {})
if not preflight_config.get("enabled", True):
return None
checker = PreflightChecker(config)
report = checker.run(transaction_type, params)
if not report.critical_passed:
return {
"success": False,
"error": {
"code": "E014",
"message": "Pre-flight checks failed",
"details": {
"summary": report.summary,
"failed_checks": [
{
"name": r.name,
"message": r.message,
"fix_hint": r.fix_hint,
"severity": r.severity.value
}
for r in report.results if not r.passed
]
}
},
"diagnostics": report.summary
}
return None
except Exception as e:
# Pre-flight errors should not block execution, just log
return None
def preview_swap(
from_token: str,
to_token: str,
amount: str,
network: str = "base",
slippage: float = 0.5,
protocol: str = "auto"
) -> Dict[str, Any]:
"""
Preview a swap transaction.
POST /api/trades/preview (or /api/uniswap/preview-swap, /api/zerox/preview-swap)
Returns:
{
"preview_id": "uuid",
"simulation_ok": true/false,
"risk": {...},
"next_step": "approve" or "clarification",
...
}
"""
# Pre-flight checks
params = {
"type": "swap",
"network": network,
"from_token": from_token,
"to_token": to_token,
"amount": amount
}
preflight_error = run_preflight_checks("swap", params)
if preflight_error:
return preflight_error
# Security check: whitelist (only if enabled)
security = _load_security_config()
# Skip whitelist checks if whitelist_enabled is False (default for backward compatibility)
if security.get("whitelist_enabled", False):
if network not in security.get("whitelist_chains", []):
return {
"success": False,
"error": ERROR_CODES["CHAIN_NOT_SUPPORTED"],
"diagnostics": f"Chain '{network}' not in whitelist. Allowed: {security['whitelist_chains']}"
}
if from_token.upper() not in security.get("whitelist_tokens", []):
return {
"success": False,
"error": ERROR_CODES["TOKEN_NOT_SUPPORTED"],
"diagnostics": f"Token '{from_token}' not in whitelist. Allowed: {security['whitelist_tokens']}"
}
if to_token.upper() not in security.get("whitelist_tokens", []):
return {
"success": False,
"error": ERROR_CODES["TOKEN_NOT_SUPPORTED"],
"diagnostics": f"Token '{to_token}' not in whitelist. Allowed: {security['whitelist_tokens']}"
}
# Try different preview endpoints
endpoints_to_try = [
"/trades/preview",
"/uniswap/preview-swap",
"/zerox/preview-swap"
]
payload = {
"from_token": from_token,
"to_token": to_token,
"amount": amount,
"network": network,
"slippage_percent": slippage,
"protocol": protocol
}
last_error = None
for endpoint in endpoints_to_try:
result = api_request("POST", endpoint, payload)
if result.get("success"):
# Standardize response format
data = result.get("data", {})
# Check if double confirmation is required
preview_result_basic = {
"success": True,
"preview_id": data.get("preview_id", str(uuid.uuid4())),
"simulation_ok": data.get("simulation_ok", True),
"risk": _extract_risk_info(data),
"next_step": "approve" if data.get("simulation_ok", True) else "clarification",
"transaction": data.get("transaction"),
"rate": data.get("rate"),
"estimated_output": data.get("estimated_output"),
"gas_estimate": data.get("gas_estimate"),
"warnings": data.get("warnings", [])
}
# Add double confirm check
requires_double, reasons = _check_double_confirm_required(params, preview_result_basic)
preview_result_basic["requires_double_confirm"] = requires_double
if requires_double:
preview_result_basic["double_confirm_reasons"] = reasons
security = _load_security_config()
confirm_phrase = security.get("double_confirm", {}).get("confirm_phrase", "CONFIRM_HIGH_RISK")
preview_result_basic["confirm_instruction"] = f"To approve, use: --confirm \"{confirm_phrase}\""
# Create standardized output if schema available
if SCHEMA_AVAILABLE:
input_info = InputInfo(
token=from_token,
amount=amount,
chain=network,
token_out=to_token
)
quote_info = QuoteInfo(
estimated_output=data.get("estimated_output"),
rate=data.get("rate"),
slippage_bps=int(slippage * 100),
gas_estimate=data.get("gas_estimate")
)
risk_warnings = []
if data.get("warnings"):
for warning in data["warnings"]:
risk_warnings.append(RiskWarning(
type="general",
level=RiskLevel.MEDIUM,
message=warning
))
standardized = create_preview_output(
input_info=input_info,
quote_info=quote_info,
risk_warnings=risk_warnings,
requires_double_confirm=requires_double,
double_confirm_reasons=reasons,
confirm_instruction=preview_result_basic.get("confirm_instruction"),
preview_id=data.get("preview_id", str(uuid.uuid4())),
raw_transaction=data.get("transaction")
)
# Merge standardized format with basic result
preview_result_basic["_standardized"] = standardized.to_dict()
return preview_result_basic
last_error = result
# All endpoints failed
return {
"success": False,
"error": last_error.get("error", ERROR_CODES["PREVIEW_FAILED"]) if last_error else ERROR_CODES["PREVIEW_FAILED"],
"diagnostics": last_error.get("diagnostics", "All preview endpoints failed") if last_error else "No response from API"
}
def preview_deposit(
protocol: str,
asset: str,
amount: str,
network: str = "base"
) -> Dict[str, Any]:
"""
Preview a deposit transaction (for lending/staking).
POST /api/trades/preview
"""
# Pre-flight checks
params = {
"type": "deposit",
"network": network,
"asset": asset,
"amount": amount
}
preflight_error = run_preflight_checks("deposit", params)
if preflight_error:
return preflight_error
security = _load_security_config()
if network not in security.get("whitelist_chains", []):
return {
"success": False,
"error": ERROR_CODES["CHAIN_NOT_SUPPORTED"],
"diagnostics": f"Chain '{network}' not in whitelist."
}
payload = {
"type": "deposit",
"protocol": protocol,
"asset": asset,
"amount": amount,
"network": network
}
result = api_request("POST", "/trades/preview", payload)
if result.get("success"):
data = result.get("data", {})
return {
"success": True,
"preview_id": data.get("preview_id", str(uuid.uuid4())),
"simulation_ok": data.get("simulation_ok", True),
"risk": _extract_risk_info(data),
"next_step": "approve" if data.get("simulation_ok", True) else "clarification",
"transaction": data.get("transaction"),
"warnings": data.get("warnings", [])
}
return result
def approve_transaction(preview_id: str, confirm_phrase: str = None) -> Dict[str, Any]:
"""
Approve a previewed transaction.
POST /api/trades/approve
Args:
preview_id: Preview ID from preview step
confirm_phrase: Confirmation phrase for high-risk transactions
Returns:
{
"approval_id": "uuid",
"preview_id": "...",
...
}
"""
if not preview_id:
return {
"success": False,
"error": {"code": "E011", "message": "preview_id is required"},
"diagnostics": "Cannot approve without preview_id"
}
# Check if double confirmation is required
# In a real implementation, we would retrieve the preview result from storage
# For now, we check the confirm_phrase if provided
security = _load_security_config()
double_confirm_config = security.get("double_confirm", {})
if double_confirm_config.get("enabled", False):
required_phrase = double_confirm_config.get("confirm_phrase", "CONFIRM_HIGH_RISK")
# If confirm_phrase is provided, validate it
if confirm_phrase is not None:
if confirm_phrase != required_phrase:
return {
"success": False,
"error": {"code": "E015", "message": "Invalid confirmation phrase"},
"diagnostics": f"Expected: {required_phrase}"
}
result = api_request("POST", "/trades/approve", {"preview_id": preview_id})
if result.get("success"):
data = result.get("data", {})
# Create standardized output if schema available
if SCHEMA_AVAILABLE:
standardized = create_approval_result(
success=True,
approval_id=data.get("approval_id"),
preview_id=preview_id
)
return {
"success": True,
"approval_id": data.get("approval_id"),
"preview_id": preview_id,
"approved_at": data.get("approved_at", datetime.utcnow().isoformat() + "Z"),
"expires_at": data.get("expires_at"),
"_standardized": standardized.to_dict()
}
return {
"success": True,
"approval_id": data.get("approval_id"),
"preview_id": preview_id,
"approved_at": data.get("approved_at", datetime.utcnow().isoformat() + "Z"),
"expires_at": data.get("expires_at")
}
return result
def execute_transaction(approval_id: str) -> Dict[str, Any]:
"""
Execute an approved transaction.
POST /api/trades/execute
Returns:
{
"tx_hash": "0x...",
"explorer_url": "https://...",
...
}
"""
if not approval_id:
return {
"success": False,
"error": {"code": "E012", "message": "approval_id is required"},
"diagnostics": "Cannot execute without approval_id"
}
result = api_request("POST", "/trades/execute", {"approval_id": approval_id})
if result.get("success"):
data = result.get("data", {})
tx_hash = data.get("tx_hash", "")
network = data.get("network", "base")
explorer_url = _build_explorer_url(tx_hash, network)
# Create standardized output if schema available
if SCHEMA_AVAILABLE:
standardized = create_execution_result(
success=True,
tx_hash=tx_hash,
network=network,
explorer_url=explorer_url
)
return {
"success": True,
"tx_hash": tx_hash,
"explorer_url": explorer_url,
"executed_at": data.get("executed_at", datetime.utcnow().isoformat() + "Z"),
"network": network,
"_standardized": standardized.to_dict()
}
return {
"success": True,
"tx_hash": tx_hash,
"explorer_url": explorer_url,
"executed_at": data.get("executed_at", datetime.utcnow().isoformat() + "Z"),
"network": network
}
return result
def get_transaction_status(tx_hash: str) -> Dict[str, Any]:
"""
Check transaction status.
GET /api/transactions/{tx_hash}
"""
if not tx_hash:
return {
"success": False,
"error": {"code": "E013", "message": "tx_hash is required"},
"diagnostics": "Cannot check status without tx_hash"
}
result = api_request("GET", f"/transactions/{tx_hash}")
if result.get("success"):
data = result.get("data", {})
return {
"success": True,
"tx_hash": tx_hash,
"status": data.get("status", "unknown"),
"block_number": data.get("block_number"),
"gas_used": data.get("gas_used"),
"explorer_url": _build_explorer_url(tx_hash, data.get("network", "base"))
}
return result
# ============================================================================
# Allowance Management
# ============================================================================
def get_allowances(
chain: str = "base",
token: Optional[str] = None
) -> Dict[str, Any]:
"""
Get current token allowances.
GET /api/allowances
"""
params = [f"chain={chain}"]
if token:
params.append(f"token={token}")
endpoint = "/allowances?" + "&".join(params)
return api_request("GET", endpoint)
def preview_revoke_allowance(
token: str,
spender: str,
chain: str = "base"
) -> Dict[str, Any]:
"""
Preview an allowance revoke transaction.
POST /api/allowances/revoke-preview
"""
payload = {
"token": token,
"spender": spender,
"chain": chain
}
result = api_request("POST", "/allowances/revoke-preview", payload)
if result.get("success"):
data = result.get("data", {})
return {
"success": True,
"preview_id": data.get("preview_id"),
"current_allowance": data.get("current_allowance"),
"spender": spender,
"token": token,
"next_step": "approve"
}
return result
# ============================================================================
# Helper Functions
# ============================================================================
def _load_security_config() -> Dict[str, Any]:
"""Load security configuration from config.json."""
config_path = os.path.join(SKILL_DIR, "config", "config.json")
if os.path.exists(config_path):
with open(config_path) as f:
config = json.load(f)
return {**DEFAULT_SECURITY, **config.get("security", {})}
return DEFAULT_SECURITY.copy()
def _check_double_confirm_required(
params: Dict[str, Any],
preview_result: Dict[str, Any]
) -> tuple:
"""
Check if double confirmation is required.
Returns:
(requires_double_confirm, reasons) tuple
"""
security = _load_security_config()
double_confirm_config = security.get("double_confirm", {})
if not double_confirm_config.get("enabled", False):
return False, []
reasons = []
# Check 1: Large trade threshold
large_trade_threshold = double_confirm_config.get("large_trade_threshold_usd", 5000)
if params.get("amount"):
try:
amount = float(params["amount"])
if amount >= large_trade_threshold:
reasons.append(f"Large trade: {amount} >= threshold {large_trade_threshold}")
except (ValueError, TypeError):
pass
# Check 2: High slippage
high_slippage_threshold = double_confirm_config.get("high_slippage_threshold_bps", 100)
slippage = params.get("slippage", 0.5)
slippage_bps = slippage * 100 # Convert percent to basis points
if slippage_bps >= high_slippage_threshold:
reasons.append(f"High slippage: {slippage}% >= threshold {high_slippage_threshold/100}%")
# Check 3: New protocol (first use)
if double_confirm_config.get("new_protocol_confirm", True):
protocol = params.get("protocol", "")
if protocol and protocol not in ["auto", "uniswap", "aave", "compound"]:
reasons.append(f"Protocol '{protocol}' requires confirmation (first use)")
return len(reasons) > 0, reasons
def _extract_risk_info(data: Dict) -> Dict[str, Any]:
"""Extract risk information from API response."""
return {
"balance_sufficient": data.get("balance_sufficient", True),
"allowance_sufficient": data.get("allowance_sufficient", True),
"route_valid": data.get("route_valid", True),
"slippage_within_limit": data.get("slippage_within_limit", True),
"warnings": data.get("risk_warnings", [])
}
def _build_explorer_url(tx_hash: str, network: str) -> str:
"""Build block explorer URL for transaction."""
if network in SUPPORTED_CHAINS:
base = SUPPORTED_CHAINS[network]["explorer"]
return f"{base}/tx/{tx_hash}"
return f"https://etherscan.io/tx/{tx_hash}"
def _format_output(data: Dict, json_output: bool = False) -> str:
"""Format output for display."""
if json_output:
return json.dumps(data, indent=2, ensure_ascii=False)
if not data.get("success", False):
error = data.get("error", {})
diagnostics = data.get("diagnostics", "")
return f"❌ Error [{error.get('code', '???')}]: {error.get('message', 'Unknown')}\n Diagnostics: {diagnostics}"
lines = []
# Preview result
if "preview_id" in data:
lines.append(f"📋 Preview ID: {data['preview_id']}")
lines.append(f" Simulation: {'✅ OK' if data.get('simulation_ok') else '❌ Failed'}")
if data.get("rate"):
lines.append(f" Rate: {data['rate']}")
if data.get("estimated_output"):
lines.append(f" Estimated Output: {data['estimated_output']}")
risk = data.get("risk", {})
if risk.get("warnings"):
lines.append(f" ⚠️ Warnings: {', '.join(risk['warnings'])}")
# Double confirmation info
if data.get("requires_double_confirm"):
lines.append(f" 🔒 Double Confirmation Required")
for reason in data.get("double_confirm_reasons", []):
lines.append(f" - {reason}")
if data.get("confirm_instruction"):
lines.append(f" {data['confirm_instruction']}")
lines.append(f" Next Step: {data.get('next_step', 'approve')}")
# Approval result
elif "approval_id" in data:
lines.append(f"✅ Approval ID: {data['approval_id']}")
lines.append(f" Preview ID: {data['preview_id']}")
lines.append(f" Approved At: {data.get('approved_at', 'N/A')}")
lines.append(f" Next Step: execute --approval-id {data['approval_id']}")
# Execution result
elif "tx_hash" in data:
lines.append(f"🚀 Transaction Submitted!")
lines.append(f" TX Hash: {data['tx_hash']}")
lines.append(f" Explorer: {data['explorer_url']}")
# Balance result
elif "balances" in data or data.get("data", {}).get("balances"):
balances = data.get("balances") or data.get("data", {}).get("balances", [])
lines.append("💰 Wallet Balances:")
for bal in balances:
lines.append(f" {bal.get('symbol', '?')}: {bal.get('balance', '0')} ({bal.get('chain', '?')})")
return "\n".join(lines)
# ============================================================================
# CLI
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="Trade Executor - REST API adapter for local keystore signer (v0.3.0)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Preview a swap
python3 trade_executor.py preview --type swap --from-token USDC --to-token WETH --amount 5 --network base
# Approve a preview
python3 trade_executor.py approve --preview-id <uuid>
# Execute an approved transaction
python3 trade_executor.py execute --approval-id <uuid>
# Check balances
python3 trade_executor.py balances --network base
# Check transaction status
python3 trade_executor.py status --tx-hash 0x...
"""
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Preview command
preview_parser = subparsers.add_parser("preview", help="Preview a transaction")
preview_parser.add_argument("--type", choices=["swap", "deposit", "transfer"], default="swap", help="Transaction type")
preview_parser.add_argument("--from-token", help="Source token (for swap)")
preview_parser.add_argument("--to-token", help="Destination token (for swap)")
preview_parser.add_argument("--amount", required=True, help="Amount to trade")
preview_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network")
preview_parser.add_argument("--protocol", default="auto", help="Protocol to use")
preview_parser.add_argument("--slippage", type=float, default=0.5, help="Slippage tolerance %%")
preview_parser.add_argument("--json", action="store_true", help="JSON output")
# For deposit
preview_parser.add_argument("--asset", help="Asset to deposit (for deposit)")
# Approve command
approve_parser = subparsers.add_parser("approve", help="Approve a previewed transaction")
approve_parser.add_argument("--preview-id", required=True, help="Preview ID from preview step")
approve_parser.add_argument("--confirm", help="Confirmation phrase for high-risk transactions")
approve_parser.add_argument("--json", action="store_true", help="JSON output")
# Execute command
execute_parser = subparsers.add_parser("execute", help="Execute an approved transaction")
execute_parser.add_argument("--approval-id", required=True, help="Approval ID from approve step")
execute_parser.add_argument("--json", action="store_true", help="JSON output")
# Status command
status_parser = subparsers.add_parser("status", help="Check transaction status")
status_parser.add_argument("--tx-hash", required=True, help="Transaction hash")
status_parser.add_argument("--json", action="store_true", help="JSON output")
# Balances command
balances_parser = subparsers.add_parser("balances", help="Query wallet balances")
balances_parser.add_argument("--network", choices=["base", "ethereum"], help="Filter by network")
balances_parser.add_argument("--tokens", help="Comma-separated token list")
balances_parser.add_argument("--json", action="store_true", help="JSON output")
# Allowances command
allowances_parser = subparsers.add_parser("allowances", help="Query or manage allowances")
allowances_parser.add_argument("--revoke", action="store_true", help="Preview revoke allowance")
allowances_parser.add_argument("--token", help="Token address or symbol")
allowances_parser.add_argument("--spender", help="Spender address")
allowances_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network")
allowances_parser.add_argument("--json", action="store_true", help="JSON output")
# Preflight check command
preflight_parser = subparsers.add_parser("preflight", help="Run pre-flight checks")
preflight_parser.add_argument("--type", choices=["swap", "deposit", "transfer"], default="swap", help="Transaction type to check")
preflight_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network")
preflight_parser.add_argument("--from-token", help="Source token (for swap)")
preflight_parser.add_argument("--amount", help="Amount")
preflight_parser.add_argument("--json", action="store_true", help="JSON output")
# RPC status command
rpc_parser = subparsers.add_parser("rpc", help="Check RPC status and fallback")
rpc_parser.add_argument("--network", choices=["base", "ethereum"], default="base", help="Network to check")
rpc_parser.add_argument("--test", action="store_true", help="Test RPC request with fallback")
rpc_parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
if args.command == "preview":
if args.type == "swap":
if not args.from_token or not args.to_token:
parser.error("--from-token and --to-token are required for swap")
result = preview_swap(
from_token=args.from_token,
to_token=args.to_token,
amount=args.amount,
network=args.network,
slippage=args.slippage,
protocol=args.protocol
)
elif args.type == "deposit":
if not args.asset or not args.protocol:
parser.error("--asset and --protocol are required for deposit")
result = preview_deposit(
protocol=args.protocol,
asset=args.asset,
amount=args.amount,
network=args.network
)
else:
result = {"success": False, "error": ERROR_CODES["UNKNOWN_ERROR"], "diagnostics": f"Unsupported type: {args.type}"}
print(_format_output(result, args.json))
elif args.command == "approve":
result = approve_transaction(args.preview_id, confirm_phrase=args.confirm)
print(_format_output(result, args.json))
elif args.command == "execute":
result = execute_transaction(args.approval_id)
print(_format_output(result, args.json))
elif args.command == "status":
result = get_transaction_status(args.tx_hash)
print(_format_output(result, args.json))
elif args.command == "balances":
tokens = args.tokens.split(",") if args.tokens else None
result = get_wallet_balances(chain=args.network, tokens=tokens)
print(_format_output(result, args.json))
elif args.command == "allowances":
if args.revoke:
if not args.token or not args.spender:
parser.error("--token and --spender are required for revoke")
result = preview_revoke_allowance(args.token, args.spender, args.network)
else:
result = get_allowances(chain=args.network, token=args.token)
print(_format_output(result, args.json))
elif args.command == "preflight":
# Run pre-flight checks directly
if PREFLIGHT_AVAILABLE:
config = load_preflight_config(CONFIG_PATH)
checker = PreflightChecker(config)
params = {
"type": args.type,
"network": args.network,
"from_token": args.from_token,
"amount": args.amount
}
report = checker.run(args.type, params)
if args.json:
result = {
"success": report.critical_passed,
"all_passed": report.all_passed,
"summary": report.summary,
"checks": [
{
"name": r.name,
"passed": r.passed,
"severity": r.severity.value,
"message": r.message,
"fix_hint": r.fix_hint
}
for r in report.results
]
}
print(json.dumps(result, indent=2))
else:
print(f"\n📋 Pre-flight Check Results")
print(f" Summary: {report.summary}")
print(f" All Passed: {'✅ Yes' if report.all_passed else '⚠️ No'}")
print(f" Critical Passed: {'✅ Yes' if report.critical_passed else '❌ No'}")
print()
for r in report.results:
status = "✅" if r.passed else "❌"
sev_emoji = {"critical": "🔴", "warning": "🟡", "info": "🔵"}[r.severity.value]
print(f"{status} {sev_emoji} {r.name}: {r.message}")
if not r.passed and r.fix_hint:
print(f" 💡 Fix: {r.fix_hint}")
print()
else:
print("❌ Pre-flight module not available")
elif args.command == "rpc":
# Check RPC status
if RPC_MANAGER_AVAILABLE:
# Load config from correct path (from skill root)
skill_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
config_path = os.path.join(skill_dir, "config", "config.json")
with open(config_path) as f:
rpc_config = json.load(f)
manager = RPCManager(rpc_config)
if args.test:
# Test RPC request
try:
result = manager.request(args.network, {
"jsonrpc": "2.0",
"method": "eth_blockNumber",
"params": [],
"id": 1
})
if args.json:
print(json.dumps({
"success": True,
"network": args.network,
"active_rpc": manager.get_active_rpc(args.network),
"result": result
}, indent=2))
else:
block = int(result.get("result", "0x0"), 16)
print(f"\n🔗 RPC Test: {args.network}")
print(f" Active RPC: {manager.get_active_rpc(args.network)}")
print(f" Block Number: #{block}")
print(f" Status: ✅ OK")
print()
except Exception as e:
if args.json:
print(json.dumps({
"success": False,
"network": args.network,
"error": str(e)
}, indent=2))
else:
print(f"\n❌ RPC Test Failed: {e}")
else:
# Show status report
report = manager.get_status_report(args.network)
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"\n🔗 RPC Status: {args.network.upper()}")
print(f" Active: {report['active_rpc']}")
print(f" Fallback Enabled: {'✅' if report['fallback_enabled'] else '❌'}")
print(f" Session Sticky: {'✅' if report['session_sticky'] else '❌'}")
print()
print(" Endpoints:")
for ep in report['endpoints']:
status = "✅ Active" if ep['is_active'] else "⏸️ Standby"
print(f" {status}: {ep['url']}")
print()
else:
print("❌ RPC Manager module not available")
else:
parser.print_help()
if __name__ == "__main__":
main()
```
### scripts/trading/preflight.py
```python
#!/usr/bin/env python3
"""
Preflight Check Module for Web3 Investor
Purpose: Check execution readiness before attempting transactions.
Returns available payment methods and recommendations.
Usage:
python preflight.py check
python preflight.py check --network base
"""
import argparse
import json
import os
import sys
import urllib.request
import urllib.error
from typing import Dict, Any, Optional
from enum import Enum
# ============================================================================
# Configuration
# ============================================================================
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "config.json")
class PaymentMethod(Enum):
"""Available payment methods."""
KEYSTORE_SIGNER = "keystore_signer"
EIP681_PAYMENT_LINK = "eip681_payment_link"
NONE = "none"
# ============================================================================
# Configuration Loading
# ============================================================================
def load_config() -> Dict[str, Any]:
"""Load configuration from config.json."""
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"⚠️ Failed to load config: {e}", file=sys.stderr)
return {}
# ============================================================================
# Signer API Check
# ============================================================================
def check_signer_api(network: str = "base", timeout: int = 5) -> Dict[str, Any]:
"""
Check if local signer API is available.
Returns:
Dict with status, message, and details
"""
config = load_config()
api_config = config.get("api", {})
base_url = api_config.get("url", "http://localhost:3000/api")
api_timeout = api_config.get("timeout_seconds", timeout)
# Try to reach the balances endpoint
url = f"{base_url}/wallet/balances?chain={network}"
try:
req = urllib.request.Request(
url,
headers={"User-Agent": "Web3-Investor-Preflight/0.5.0"},
method='GET'
)
with urllib.request.urlopen(req, timeout=api_timeout) as response:
data = json.loads(response.read().decode())
if data.get("success"):
return {
"available": True,
"status": "online",
"message": "Signer API is available",
"url": base_url,
"network": network
}
else:
return {
"available": False,
"status": "error",
"message": f"Signer API returned error: {data.get('error', 'Unknown')}",
"url": base_url
}
except urllib.error.URLError as e:
return {
"available": False,
"status": "offline",
"message": f"Signer API unreachable: {e.reason}",
"url": base_url
}
except Exception as e:
return {
"available": False,
"status": "error",
"message": f"Unexpected error: {str(e)}",
"url": base_url
}
# ============================================================================
# Execution Readiness Check
# ============================================================================
def check_execution_readiness(network: str = "base") -> Dict[str, Any]:
"""
Check what payment methods are available for transaction execution.
This function should be called BEFORE asking user for transaction details.
Args:
network: Network to check (default: base)
Returns:
Dict with recommended payment method and details
"""
result = {
"network": network,
"methods": [],
"recommended": None,
"message": ""
}
# 1. Check local signer API
signer_status = check_signer_api(network)
if signer_status["available"]:
result["methods"].append({
"method": PaymentMethod.KEYSTORE_SIGNER.value,
"status": "available",
"description": "Local keystore signer (preview → approve → execute)",
"details": signer_status
})
result["recommended"] = PaymentMethod.KEYSTORE_SIGNER.value
result["message"] = "✅ Local signer available. Use preview → approve → execute flow."
else:
result["methods"].append({
"method": PaymentMethod.KEYSTORE_SIGNER.value,
"status": "unavailable",
"description": "Local keystore signer",
"details": signer_status
})
# 2. Fallback to EIP-681
result["methods"].append({
"method": PaymentMethod.EIP681_PAYMENT_LINK.value,
"status": "available",
"description": "EIP-681 payment link (MetaMask mobile)",
"details": {
"supported_wallets": ["MetaMask", "Rainbow", "Trust Wallet"],
"requires": "Mobile wallet with USDT on the target chain"
}
})
result["recommended"] = PaymentMethod.EIP681_PAYMENT_LINK.value
result["message"] = "⚠️ No local signer. Use EIP-681 payment link for mobile wallet payment."
return result
def format_readiness_report(readiness: Dict[str, Any]) -> str:
"""Format execution readiness as human-readable report."""
lines = [
"=" * 60,
"🔍 Execution Readiness Check",
"=" * 60,
"",
f"Network: {readiness['network'].upper()}",
f"Recommended: {readiness['recommended']}",
"",
"Available Payment Methods:",
"-" * 40,
]
for method in readiness["methods"]:
status_icon = "✅" if method["status"] == "available" else "❌"
lines.append(f" {status_icon} {method['method']}: {method['status']}")
lines.append(f" {method['description']}")
lines.extend([
"",
"-" * 60,
readiness["message"],
""
])
return "\n".join(lines)
# ============================================================================
# CLI
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="Preflight check for Web3 Investor execution readiness",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python preflight.py check
python preflight.py check --network ethereum
"""
)
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# Check command
check_parser = subparsers.add_parser("check", help="Check execution readiness")
check_parser.add_argument("--network", default="base", help="Network to check")
check_parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
if args.command == "check":
readiness = check_execution_readiness(args.network)
if args.json:
# Convert Enum to string for JSON serialization
output = {
"network": readiness["network"],
"methods": readiness["methods"],
"recommended": readiness["recommended"],
"message": readiness["message"]
}
print(json.dumps(output, indent=2))
else:
print(format_readiness_report(readiness))
else:
parser.print_help()
if __name__ == "__main__":
main()
```
### scripts/discovery/find_opportunities.py
```python
#!/usr/bin/env python3
"""
Find investment opportunities from DefiLlama API.
Version: 0.2.0 - Refactored for LLM-based risk analysis
Key Changes:
- Removed local risk scoring (calculate_risk_score)
- Added actionable_addresses for execution readiness
- Integrated protocol registry from references/protocols.md
- Enhanced null/type safety for external API data
Usage:
python find_opportunities.py --min-apy 5 --chain ethereum
python find_opportunities.py --protocol aave-v3 --output json
python find_opportunities.py --llm-ready # Output optimized for LLM analysis
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime
from typing import Optional, List, Dict, Any
import urllib.request
import urllib.error
# ============================================================================
# Configuration
# ============================================================================
DEFILLAMA_BASE = "https://yields.llama.fi"
DEFILLAMA_PROTOCOLS = "https://api.llama.fi/protocols"
MAX_SAFE_APY_DEFAULT = 100 # Filter out extreme risks/scams by default
# Paths
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROTOCOLS_MD_PATH = os.path.join(SCRIPT_DIR, "..", "..", "references", "protocols.md")
PROTOCOLS_JSON_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "protocols.json")
# Chain mapping (DefiLlama naming)
CHAIN_MAPPING = {
"ethereum": "Ethereum",
"eth": "Ethereum",
"base": "Base",
"basechain": "Base",
"arbitrum": "Arbitrum",
"arb": "Arbitrum",
"optimism": "Optimism",
"op": "Optimism",
}
# ============================================================================
# Protocol Registry (Static Knowledge Layer)
# ============================================================================
def parse_protocols_md() -> Dict[str, Dict[str, Any]]:
"""
Parse protocols.md into a static registry.
Returns dict keyed by protocol name (lowercase).
"""
registry = {}
if not os.path.exists(PROTOCOLS_MD_PATH):
return registry
try:
with open(PROTOCOLS_MD_PATH, "r", encoding="utf-8") as f:
content = f.read()
# Parse protocol sections (### Protocol Name)
pattern = r"### ([A-Za-z0-9\s\-\.]+)\n((?:(?!\n### ).)*?)(?=\n### |$)"
matches = re.findall(pattern, content, re.DOTALL)
for name, section in matches:
name_lower = name.lower().strip()
info = {"name": name.strip()}
# Extract contract address
contract_match = re.search(r"\*\*Contract\*\*:\s*(`?0x[a-fA-F0-9]{40}`?)", section)
if contract_match:
addr = contract_match.group(1).strip("`")
info["primary_contract"] = addr
# Extract category
category_match = re.search(r"\*\*Category\*\*:\s*(\w+)", section)
if category_match:
info["category"] = category_match.group(1).lower()
# Extract risk level (for reference, not computed)
risk_match = re.search(r"\*\*Risk Level\*\*:\s*(\w+)", section)
if risk_match:
info["registry_risk"] = risk_match.group(1).lower()
# Extract docs URL
docs_match = re.search(r"\*\*Docs\*\*:\s*(https?://[^\s]+)", section)
if docs_match:
info["docs_url"] = docs_match.group(1)
# Extract TVL (for reference)
tvl_match = re.search(r"\*\*TVL\*\*:\s*>\s*\$?([\d.]+[BMK]?)", section)
if tvl_match:
info["registry_tvl"] = tvl_match.group(1)
registry[name_lower] = info
# Also add alternative keys
if "aave" in name_lower:
registry["aave-v3"] = info
registry["aave v3"] = info
if "compound" in name_lower:
registry["compound-v3"] = info
registry["compound v3"] = info
except Exception as e:
print(f"⚠️ Failed to parse protocols.md: {e}", file=sys.stderr)
return registry
def load_protocols_json() -> Dict[str, Dict[str, Any]]:
"""Load protocols from JSON config if exists."""
if not os.path.exists(PROTOCOLS_JSON_PATH):
return {}
try:
with open(PROTOCOLS_JSON_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def get_protocol_registry() -> Dict[str, Dict[str, Any]]:
"""
Merge static registry from MD and JSON.
JSON takes precedence for overrides.
"""
registry = parse_protocols_md()
json_registry = load_protocols_json()
registry.update(json_registry)
return registry
# ============================================================================
# MCP Server Integration (v0.5.0 - Multi-server support)
# ============================================================================
CONFIG_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "config.json")
# Simple in-memory cache for MCP results
_MCP_CACHE: Dict[str, Dict[str, Any]] = {}
_MCP_CACHE_TIME: Dict[str, float] = {}
def load_config() -> Dict[str, Any]:
"""Load configuration from config.json."""
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"⚠️ Failed to load config: {e}", file=sys.stderr)
return {}
def convert_rwa_product(product: Dict[str, Any], default_chain: str = "Base", server_name: str = "") -> Dict[str, Any]:
"""
Convert RWA product from MCP to standard opportunity format.
RWA products have different structure than DeFi pools.
This function normalizes them for consistent handling.
Note: RWA products define their own chain (usually Base for USDT payments),
not inherited from search parameters.
"""
try:
# Extract key fields from RWA product
product_id = product.get("id", "unknown")
name = product.get("name", "RWA Product")
expected_yield = product.get("expectedYieldAnnual", 0)
term_days = product.get("productTerm", 0)
min_subscription = product.get("minSubscriptionUsdt", 0)
receiving_address = product.get("receivingAddress", "")
# RWA products typically run on Base chain for USDT payments
# The receiving address determines the actual chain
# TODO: In future, MCP should return chain info with the product
product_chain = product.get("chain", "Base") # Default to Base for RWA products
# Convert annual yield to APY percentage
apy = expected_yield * 100 if expected_yield else 0
# Build standardized opportunity record
return {
"pool": f"rwa-{product_id}",
"protocol": "rwa-mcp",
"protocol_name": f"RWA: {name}",
"mcp_server": server_name,
"chain": product_chain, # Use product's own chain
"symbol": "USDT", # RWA products use USDT
"apy": round(apy, 2),
"apy_base": round(apy, 2),
"apy_reward": 0.0,
"tvl_usd": product.get("fundraisingScale", 0),
"underlying_tokens": [], # RWA doesn't have on-chain tokens
"reward_tokens": [],
"stablecoin": True, # USDT is stablecoin
"actionable_addresses": {
"deposit_contract_candidates": [receiving_address] if receiving_address else [],
"underlying_token_addresses": [],
"reward_token_addresses": [],
"has_actionable_address": bool(receiving_address),
"primary_contract": receiving_address,
"protocol_registry_match": False,
"docs_url": None
},
"url": f"https://mcp.prime.antalpha.com/product/{product_id}",
"audited": True, # RWA products are typically audited
"risk_signals": {
"has_il_risk": False, # RWA has no IL risk
"reward_type": "none",
"underlying_type": "rwa",
"category": "RWA",
"stablecoin": True,
"product_term_days": term_days,
"min_subscription_usdt": min_subscription,
"product_info": {
"name": name,
"term": term_days,
"start_date": product.get("startDate"),
"maturity_date": product.get("maturityDate"),
"asset_info": product.get("assetInformation", "")[:100]
}
}
}
except Exception as e:
print(f"⚠️ Failed to convert RWA product: {e}", file=sys.stderr)
return None
def fetch_mcp_server(server_config: Dict[str, Any], chain: str = "Ethereum") -> List[Dict[str, Any]]:
"""
Fetch yield opportunities from a single MCP server.
Returns list of opportunities or empty list on failure.
"""
server_name = server_config.get("name", "unknown")
primary_url = server_config.get("primary_url", "")
fallback_url = server_config.get("fallback_url", "")
timeout = server_config.get("timeout_seconds", 30)
if not primary_url and not fallback_url:
print(f"⚠️ MCP server '{server_name}': No URLs configured", file=sys.stderr)
return []
# MCP JSON-RPC request format
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "list_products",
"arguments": {}
},
"id": 1
}
def try_mcp_request(url: str) -> Optional[List[Dict[str, Any]]]:
"""Try to fetch from a single URL."""
try:
req = urllib.request.Request(
url,
data=json.dumps(payload).encode('utf-8'),
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "Web3-Investor/0.5.0"
},
method='POST'
)
with urllib.request.urlopen(req, timeout=timeout) as response:
raw_data = response.read().decode()
# Parse SSE format
result = None
for line in raw_data.split('\n'):
if line.startswith('data: '):
json_str = line[6:]
try:
result = json.loads(json_str)
break
except json.JSONDecodeError:
continue
if not result:
return None
# Parse MCP response structure
if "result" in result and "content" in result.get("result", {}):
content = result["result"]["content"]
if isinstance(content, list) and len(content) > 0:
opportunities = []
for item in content:
if item.get("type") == "text":
text_data = item.get("text", "")
try:
parsed = json.loads(text_data)
if isinstance(parsed, list):
for product in parsed:
opp = convert_rwa_product(product, chain, server_name)
if opp:
opportunities.append(opp)
elif isinstance(parsed, dict):
opp = convert_rwa_product(parsed, chain, server_name)
if opp:
opportunities.append(opp)
except json.JSONDecodeError:
continue
return opportunities
return None
except urllib.error.URLError as e:
print(f"⚠️ MCP '{server_name}' request to {url} failed: {e}", file=sys.stderr)
return None
except Exception as e:
print(f"⚠️ MCP '{server_name}' unexpected error: {e}", file=sys.stderr)
return None
# Try primary URL first
if primary_url:
result = try_mcp_request(primary_url)
if result is not None:
print(f"✅ MCP '{server_name}': {len(result)} products from primary URL", file=sys.stderr)
return result
# Fall back to secondary URL
if fallback_url:
result = try_mcp_request(fallback_url)
if result is not None:
print(f"✅ MCP '{server_name}': {len(result)} products from fallback URL", file=sys.stderr)
return result
print(f"❌ MCP '{server_name}': All endpoints failed", file=sys.stderr)
return []
def fetch_mcp_yields(chain: str = "Ethereum") -> List[Dict[str, Any]]:
"""
Fetch yield opportunities from all configured MCP servers.
Supports multiple servers with caching for performance.
"""
config = load_config()
mcp_config = config.get("discovery", {}).get("mcp", {})
if not mcp_config.get("enabled", False):
print("📊 MCP integration disabled in config", file=sys.stderr)
return []
servers = mcp_config.get("servers", [])
if not servers:
print("⚠️ No MCP servers configured", file=sys.stderr)
return []
cache_ttl = mcp_config.get("cache_ttl_seconds", 300)
all_opportunities = []
for server in servers:
server_name = server.get("name", "unknown")
cache_key = f"{server_name}:{chain}"
# Check cache
current_time = datetime.now().timestamp()
if cache_key in _MCP_CACHE and cache_key in _MCP_CACHE_TIME:
if current_time - _MCP_CACHE_TIME[cache_key] < cache_ttl:
cached = _MCP_CACHE[cache_key]
print(f"📦 MCP '{server_name}': Using cached data ({len(cached)} products)", file=sys.stderr)
all_opportunities.extend(cached)
continue
# Fetch fresh data
opportunities = fetch_mcp_server(server, chain)
# Update cache
if opportunities:
_MCP_CACHE[cache_key] = opportunities
_MCP_CACHE_TIME[cache_key] = current_time
all_opportunities.extend(opportunities)
return all_opportunities
# ============================================================================
# DefiLlama API (Dynamic Data Layer)
# ============================================================================
def fetch_yields() -> Dict[str, Any]:
"""Fetch all yield data from DefiLlama."""
url = f"{DEFILLAMA_BASE}/pools"
try:
req = urllib.request.Request(url, headers={"User-Agent": "Web3-Investor/0.2.0"})
with urllib.request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode())
except urllib.error.URLError as e:
print(f"❌ Error fetching yields: {e}", file=sys.stderr)
return {"data": []}
except Exception as e:
print(f"❌ Unexpected error fetching yields: {e}", file=sys.stderr)
return {"data": []}
def fetch_protocols() -> List[Dict[str, Any]]:
"""Fetch protocol list from DefiLlama."""
try:
req = urllib.request.Request(
DEFILLAMA_PROTOCOLS,
headers={"User-Agent": "Web3-Investor/0.2.0"}
)
with urllib.request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode())
except urllib.error.URLError as e:
print(f"❌ Error fetching protocols: {e}", file=sys.stderr)
return []
except Exception as e:
print(f"❌ Unexpected error fetching protocols: {e}", file=sys.stderr)
return []
# ============================================================================
# Data Extraction & Normalization
# ============================================================================
def safe_get(data: Any, key: str, default: Any = None) -> Any:
"""
Safely get value from dict with null/type protection.
Handles:
- None data
- Missing keys
- Null values
"""
if data is None:
return default
if not isinstance(data, dict):
return default
value = data.get(key, default)
return value if value is not None else default
def safe_float(value: Any, default: float = 0.0) -> float:
"""Safely convert to float with null protection."""
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
def safe_list(value: Any) -> List[Any]:
"""Safely convert to list with null protection."""
if value is None:
return []
if isinstance(value, list):
return value
return []
def safe_str(value: Any, default: str = "") -> str:
"""Safely convert to string with null protection."""
if value is None:
return default
return str(value)
def normalize_chain(chain: str) -> str:
"""Normalize chain name to DefiLlama format."""
return CHAIN_MAPPING.get(chain.lower(), chain)
# ============================================================================
# Actionable Addresses Extraction
# ============================================================================
def extract_actionable_addresses(
pool: Dict[str, Any],
protocol: Dict[str, Any],
registry: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""
Extract actionable addresses for execution readiness.
Priority:
1. Pool-level addresses (from DefiLlama)
2. Protocol-level addresses (from DefiLlama)
3. Registry addresses (from protocols.md/json)
"""
result = {
"deposit_contract_candidates": [],
"underlying_token_addresses": [],
"reward_token_addresses": [],
"has_actionable_address": False,
"primary_contract": None,
"protocol_registry_match": False,
"docs_url": None
}
# 1. Pool-level addresses
pool_addr = safe_get(pool, "pool")
if pool_addr and pool_addr.startswith("0x"):
result["deposit_contract_candidates"].append(pool_addr)
# Underlying tokens
underlying = safe_list(safe_get(pool, "underlyingTokens"))
result["underlying_token_addresses"] = [
addr for addr in underlying
if isinstance(addr, str) and addr.startswith("0x")
]
# Reward tokens
rewards = safe_list(safe_get(pool, "rewardTokens"))
result["reward_token_addresses"] = [
addr for addr in rewards
if isinstance(addr, str) and addr.startswith("0x")
]
# 2. Protocol-level addresses (governance, tokens)
protocol_tokens = safe_list(safe_get(protocol, "tokens"))
for token in protocol_tokens:
if isinstance(token, dict):
addr = safe_get(token, "address")
if addr and addr.startswith("0x"):
result["underlying_token_addresses"].append(addr)
# 3. Registry lookup
protocol_name = safe_get(pool, "project", "").lower()
registry_entry = registry.get(protocol_name, {})
if registry_entry:
result["protocol_registry_match"] = True
primary = registry_entry.get("primary_contract")
if primary and primary not in result["deposit_contract_candidates"]:
result["deposit_contract_candidates"].insert(0, primary)
result["primary_contract"] = primary
docs = registry_entry.get("docs_url")
if docs:
result["docs_url"] = docs
# Determine if actionable
result["has_actionable_address"] = bool(
result["deposit_contract_candidates"] or
result["underlying_token_addresses"]
)
return result
# ============================================================================
# Risk Data Collection (for LLM Analysis)
# ============================================================================
def detect_reward_type(pool: Dict[str, Any]) -> str:
"""Detect if product has single or multi-token rewards."""
reward_tokens = safe_list(safe_get(pool, "rewardTokens"))
if not reward_tokens:
return "none"
elif len(reward_tokens) == 1:
return "single"
else:
return "multi"
def detect_il_risk(pool: Dict[str, Any], category: str) -> bool:
"""Detect if product has impermanent loss risk."""
underlying = safe_list(safe_get(pool, "underlyingTokens"))
# DEX LP products typically have IL risk
dex_categories = ["dex", "dexs", "amm", "lp"]
if any(dex in category.lower() for dex in dex_categories):
return True
# Multi-asset pools with volatile tokens have IL risk
if len(underlying) >= 2 and not safe_get(pool, "stablecoin", False):
return True
return False
def detect_underlying_type(pool: Dict[str, Any], category: str, symbol: str) -> str:
"""Detect underlying asset type (RWA vs on-chain)."""
# RWA keywords
rwa_keywords = ["tbill", "treasury", "bond", "real-world", "rwa", "usdyc", "usdy"]
symbol_lower = symbol.lower()
for keyword in rwa_keywords:
if keyword in symbol_lower:
return "rwa"
# Check category
if "rwa" in category.lower():
return "rwa"
# Pure lending/yield protocols are on-chain
onchain_categories = ["lending", "yield", "staking", "liquid staking"]
if any(oc in category.lower() for oc in onchain_categories):
return "onchain"
# Mixed or unknown
underlying = safe_list(safe_get(pool, "underlyingTokens"))
if len(underlying) > 1:
return "mixed"
return "unknown"
def collect_risk_signals(
pool: Dict[str, Any],
protocol: Dict[str, Any],
registry: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""
Collect risk-related signals WITHOUT computing a score.
LLM will analyze these signals to determine risk.
"""
protocol_name = safe_get(pool, "project", "").lower()
registry_entry = registry.get(protocol_name, {})
category = registry_entry.get("category") or safe_get(protocol, "category", "")
symbol = safe_str(safe_get(pool, "symbol"))
return {
# Protocol maturity signals
"audits": safe_list(safe_get(protocol, "audits")),
"audit_count": len(safe_list(safe_get(protocol, "audits"))),
"has_audit": bool(safe_get(protocol, "audits")),
"audit_notes": safe_str(safe_get(protocol, "audit_note")),
# TVL signals
"tvl_usd": safe_float(safe_get(pool, "tvlUsd")),
"protocol_tvl": safe_float(safe_get(protocol, "tvl")),
# Age/maturity signals
"protocol_slug": safe_get(protocol, "slug"),
"known_protocol": bool(registry_entry),
"category": category,
# APY signals (high APY may indicate risk)
"apy": safe_float(safe_get(pool, "apy")),
"apy_base": safe_float(safe_get(pool, "apyBase")),
"apy_reward": safe_float(safe_get(pool, "apyReward")),
"apy_composition": "base+rewards" if safe_get(pool, "apyReward") else "base_only",
# Asset type signals
"stablecoin": safe_get(pool, "stablecoin", False),
"symbol": symbol,
# Chain signals
"chain": safe_str(safe_get(pool, "chain")),
# NEW in v0.2.1: Investment preference signals
"reward_type": detect_reward_type(pool), # "none" | "single" | "multi"
"has_il_risk": detect_il_risk(pool, category), # True | False
"underlying_type": detect_underlying_type(pool, category, symbol), # "rwa" | "onchain" | "mixed" | "unknown"
# Governance signals
"governance": safe_get(protocol, "governance"),
"has_gov_token": bool(safe_get(protocol, "governance")),
# Registry reference (if available)
"registry_risk": registry_entry.get("registry_risk"),
"registry_category": registry_entry.get("category"),
}
# ============================================================================
# Main Opportunity Finder
# ============================================================================
def check_execution_readiness_inline(network: str = "base") -> Dict[str, Any]:
"""
Inline version of execution readiness check.
Returns available payment methods without full preflight module import.
"""
config = load_config()
api_config = config.get("api", {})
base_url = api_config.get("url", "http://localhost:3000/api")
# Quick check if signer API is available
try:
req = urllib.request.Request(
f"{base_url}/wallet/balances?chain={network}",
headers={"User-Agent": "Web3-Investor/0.5.0"},
method='GET'
)
with urllib.request.urlopen(req, timeout=5) as response:
data = json.loads(response.read().decode())
if data.get("success"):
return {
"recommended": "keystore_signer",
"methods": ["keystore_signer", "eip681_payment_link"],
"message": "✅ Local signer available"
}
except:
pass
return {
"recommended": "eip681_payment_link",
"methods": ["eip681_payment_link"],
"message": "⚠️ No local signer. Use EIP-681 payment link."
}
def find_opportunities(
min_apy: float = 0,
max_apy: float = MAX_SAFE_APY_DEFAULT,
chain: str = "Ethereum",
min_tvl: float = 0,
limit: int = 20,
allow_high_apy: bool = False,
include_risk_signals: bool = True,
include_execution_readiness: bool = True
) -> List[Dict[str, Any]]:
"""
Find investment opportunities matching criteria.
Returns structured data for LLM-based risk analysis.
Supports both MCP server and DefiLlama as data sources.
Args:
min_apy: Minimum APY percentage
max_apy: Maximum APY percentage (default 100% to filter scams)
chain: Blockchain to search
min_tvl: Minimum TVL in USD
limit: Maximum results to return
allow_high_apy: If True, allow APY > 100%
include_risk_signals: Include risk_signals for LLM analysis
include_execution_readiness: Include execution_readiness for payment method guidance
Returns:
List of opportunity dicts with actionable_addresses, risk_signals, and execution_readiness
"""
# Load config to check MCP settings
config = load_config()
mcp_config = config.get("discovery", {}).get("mcp", {})
# Step 1: Fetch from MCP servers (with caching)
mcp_opportunities = []
if mcp_config.get("enabled", False):
mcp_opportunities = fetch_mcp_yields(chain=chain)
# Step 2: Fetch from DefiLlama API
yields_data = fetch_yields()
protocols_data = fetch_protocols()
if not yields_data.get("data"):
print("⚠️ No yield data received from DefiLlama", file=sys.stderr)
# If MCP has data but DefiLlama fails, return MCP data only
if mcp_opportunities:
return mcp_opportunities[:limit]
return []
# Build protocol lookup
protocol_map = {}
for p in protocols_data:
slug = safe_get(p, "slug", safe_get(p, "name", "")).lower()
protocol_map[slug] = p
protocol_map[safe_get(p, "name", "").lower()] = p
# Load static registry
registry = get_protocol_registry()
opportunities = []
normalized_chain = normalize_chain(chain)
for pool in yields_data.get("data", []):
# Filter by chain
pool_chain = safe_str(safe_get(pool, "chain", "")).lower()
if normalized_chain.lower() not in pool_chain and chain.lower() not in pool_chain:
continue
# Filter by APY
apy = safe_float(safe_get(pool, "apy"))
if apy < min_apy:
continue
if not allow_high_apy and apy > max_apy:
continue
# Filter by TVL
tvl = safe_float(safe_get(pool, "tvlUsd"))
if tvl < min_tvl:
continue
# Get protocol info
protocol_name = safe_str(safe_get(pool, "project"))
protocol = protocol_map.get(protocol_name.lower(), {})
# Extract actionable addresses
actionable = extract_actionable_addresses(pool, protocol, registry)
# Build opportunity record
opp = {
# Core identifiers
"pool": safe_str(safe_get(pool, "pool")),
"protocol": protocol_name,
"protocol_name": safe_get(protocol, "name", protocol_name),
"chain": safe_get(pool, "chain", "Ethereum"),
"symbol": safe_str(safe_get(pool, "symbol")),
# Yield metrics
"apy": round(apy, 2),
"apy_base": round(safe_float(safe_get(pool, "apyBase")), 2),
"apy_reward": round(safe_float(safe_get(pool, "apyReward")), 2),
"tvl_usd": tvl,
# Asset info
"underlying_tokens": safe_list(safe_get(pool, "underlyingTokens")),
"reward_tokens": safe_list(safe_get(pool, "rewardTokens")),
"stablecoin": safe_get(pool, "stablecoin", False),
# Execution readiness (NEW in v0.2.0)
"actionable_addresses": actionable,
# Links
"url": f"https://defillama.com/yield/pool/{safe_get(pool, 'pool', '')}",
# Legacy fields (for backward compatibility)
"audited": bool(safe_get(protocol, "audits")),
}
# Add risk signals for LLM analysis
if include_risk_signals:
opp["risk_signals"] = collect_risk_signals(pool, protocol, registry)
opportunities.append(opp)
# Step 3: Merge MCP and DefiLlama data
if mcp_opportunities:
# Use pool address as unique key for deduplication
existing_pools = {opp["pool"] for opp in opportunities}
added_count = 0
for mcp_opp in mcp_opportunities:
pool_id = mcp_opp.get("pool")
if pool_id and pool_id not in existing_pools:
opportunities.append(mcp_opp)
existing_pools.add(pool_id)
added_count += 1
if added_count > 0:
print(f"📈 Added {added_count} unique opportunities from MCP", file=sys.stderr)
total_count = len(opportunities)
mcp_count = len(mcp_opportunities)
defi_count = total_count - mcp_count + (len([o for o in mcp_opportunities if o.get("pool") in {opp["pool"] for opp in opportunities}]) if mcp_opportunities else 0)
print(f"📊 Total: {total_count} opportunities ({mcp_count} from MCP + ~{defi_count} from DefiLlama)", file=sys.stderr)
# Sort by APY (descending)
opportunities.sort(key=lambda x: x["apy"], reverse=True)
opportunities = opportunities[:limit]
# Attach execution readiness to each opportunity (NEW in v0.5.0)
if include_execution_readiness:
readiness = check_execution_readiness_inline(chain.lower())
for opp in opportunities:
opp["execution_readiness"] = readiness
return opportunities
# ============================================================================
# Output Formatting
# ============================================================================
def format_report(opportunities: List[Dict], output_format: str = "markdown") -> str:
"""Format opportunities as report."""
if not opportunities:
return "No opportunities found matching criteria."
if output_format == "json":
return json.dumps(opportunities, indent=2, ensure_ascii=False)
# Markdown format
lines = [
"# 💰 Investment Opportunities",
f"\n**Generated**: {datetime.now().isoformat()}",
f"**Found**: {len(opportunities)} pools\n",
"---\n"
]
for i, opp in enumerate(opportunities, 1):
lines.extend([
f"## {i}. {opp['protocol']} - {opp['symbol']}",
"",
f"| Metric | Value |",
f"|--------|-------|",
f"| APY | **{opp['apy']}%** |",
f"| TVL | ${opp['tvl_usd']:,.0f} |",
f"| Chain | {opp['chain']} |",
f"| Stablecoin | {'✅' if opp['stablecoin'] else '❌'} |",
])
# Show actionable status
actionable = opp.get("actionable_addresses", {})
if actionable.get("has_actionable_address"):
lines.append(f"| Actionable | ✅ Ready for execution |")
if actionable.get("primary_contract"):
lines.append(f"| Primary Contract | `{actionable['primary_contract']}` |")
else:
lines.append(f"| Actionable | ⚠️ Needs address lookup |")
lines.extend([
"",
f"**Pool**: `{opp['pool']}`",
f"**Link**: {opp['url']}",
""
])
# Show risk signals summary
risk = opp.get("risk_signals", {})
if risk:
lines.extend([
f"**Risk Signals**:",
f"- Audited: {'✅' if risk.get('has_audit') else '❌'}",
f"- Known Protocol: {'✅' if risk.get('known_protocol') else '❌'}",
f"- APY Composition: {risk.get('apy_composition', 'unknown')}",
""
])
return "\n".join(lines)
def format_llm_prompt(opportunities: List[Dict]) -> str:
"""
Format opportunities as LLM-ready prompt for risk analysis.
Output is optimized for LLM consumption.
"""
if not opportunities:
return "No opportunities to analyze."
lines = [
"# DeFi Investment Opportunities - Risk Analysis Request",
"",
"Please analyze the following investment opportunities and provide:",
"1. Risk assessment (Low/Medium/High) with reasoning",
"2. Key risk factors to watch",
"3. Recommended actions",
"",
"---\n"
]
for i, opp in enumerate(opportunities, 1):
lines.append(f"## Opportunity {i}")
lines.append("```json")
lines.append(json.dumps(opp, indent=2, ensure_ascii=False))
lines.append("```\n")
return "\n".join(lines)
# ============================================================================
# CLI
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="Find DeFi investment opportunities (v0.2.0 - LLM-ready)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python find_opportunities.py --min-apy 5 --chain ethereum
python find_opportunities.py --llm-ready --output json
python find_opportunities.py --allow-high-apy --limit 50
"""
)
parser.add_argument("--min-apy", type=float, default=0,
help="Minimum APY %%")
parser.add_argument("--max-apy", type=float, default=MAX_SAFE_APY_DEFAULT,
help=f"Maximum APY %% (default {MAX_SAFE_APY_DEFAULT}%)")
parser.add_argument("--allow-high-apy", action="store_true",
help="Allow APY > 100%% (high risk)")
parser.add_argument("--chain", default="Ethereum",
help="Blockchain to search")
parser.add_argument("--min-tvl", type=float, default=0,
help="Minimum TVL in USD")
parser.add_argument("--limit", type=int, default=20,
help="Maximum results")
parser.add_argument("--output", choices=["markdown", "json"], default="markdown",
help="Output format")
parser.add_argument("--llm-ready", action="store_true",
help="Output LLM-ready prompt for risk analysis")
parser.add_argument("--no-risk-signals", action="store_true",
help="Exclude risk signals from output")
args = parser.parse_args()
# Safety warning
if args.allow_high_apy:
print("⚠️ 警告: --allow-high-apy 已启用,可能包含高风险项目", file=sys.stderr)
opportunities = find_opportunities(
min_apy=args.min_apy,
max_apy=args.max_apy,
chain=args.chain,
min_tvl=args.min_tvl,
limit=args.limit,
allow_high_apy=args.allow_high_apy,
include_risk_signals=not args.no_risk_signals
)
# Output
if args.llm_ready:
print(format_llm_prompt(opportunities))
elif args.output == "json":
print(json.dumps(opportunities, indent=2, ensure_ascii=False))
else:
print(format_report(opportunities, "markdown"))
if __name__ == "__main__":
main()
```
### scripts/trading/eip681_payment.py
```python
#!/usr/bin/env python3
"""
EIP-681 Payment Link Generator (v0.3.3)
Generate EIP-681 compliant payment links and QR codes for MetaMask.
EIP-681 Format:
ethereum:<to>@<chainId>/transfer?address=<recipient>&uint256=<amount>
Usage:
python3 eip681_payment.py generate \
--token USDC \
--to 0x1F3A9A450428BbF161C4C33f10bd7AA1b2599a3e \
--amount 10 \
--network base
# With QR code (for desktop users)
python3 eip681_payment.py generate \
--token USDC \
--to 0x... \
--amount 10 \
--network base \
--qr-output /tmp/payment_qr.png
"""
import argparse
import json
import os
import sys
import subprocess
from typing import Optional, Dict, Any
# ============================================================================
# Configuration
# ============================================================================
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SKILL_DIR = os.path.dirname(SCRIPT_DIR)
# Token addresses by chain
TOKEN_ADDRESSES = {
"base": {
"USDC": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
"USDT": "0xfde4C96c8593536E11F39842a902aB0E47ec4E54",
"WETH": "0x4200000000000000000000000000000000000006",
"ETH": "0x0000000000000000000000000000000000000000", # Native
},
"ethereum": {
"USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"USDT": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
"WETH": "0xC02aaA39b223FE8D0A0e5C4F27ead9083C756Cc2",
"ETH": "0x0000000000000000000000000000000000000000", # Native
}
}
# Chain IDs
CHAIN_IDS = {
"base": 8453,
"ethereum": 1,
}
# Token decimals
TOKEN_DECIMALS = {
"USDC": 6,
"USDT": 6,
"WETH": 18,
"ETH": 18,
}
# QR Code options
QR_SKILL_PATH = os.path.expanduser("~/.openclaw/workspace/skills/qrcode-gen-yn/agent.py")
# Try npm qrcode as fallback
# ============================================================================
# EIP-681 Link Generation
# ============================================================================
def generate_eip681_link(
token: str,
to: str,
amount: float,
network: str = "base"
) -> Dict[str, Any]:
"""
Generate EIP-681 compliant payment link.
Args:
token: Token symbol (USDC, USDT, WETH, ETH)
to: Recipient address
amount: Amount to send (human readable, e.g., 10.5)
network: Network name (base, ethereum)
Returns:
Dict with link, calldata, and transaction details
"""
# Validate inputs
if network not in CHAIN_IDS:
return {
"success": False,
"error": f"Unsupported network: {network}. Supported: {list(CHAIN_IDS.keys())}"
}
if token.upper() not in TOKEN_ADDRESSES.get(network, {}):
return {
"success": False,
"error": f"Unsupported token: {token} on {network}. Supported: {list(TOKEN_ADDRESSES.get(network, {}).keys())}"
}
token = token.upper()
chain_id = CHAIN_IDS[network]
token_address = TOKEN_ADDRESSES[network][token]
decimals = TOKEN_DECIMALS.get(token, 18)
# Convert amount to smallest unit
amount_smallest = int(amount * (10 ** decimals))
# Check if native token (ETH)
is_native = token == "ETH"
if is_native:
# Native ETH transfer - simpler format
link = f"ethereum:{to}@{chain_id}?value={amount_smallest}"
calldata = None
else:
# ERC20 transfer
# Transfer function: 0xa9059cbb(to, amount)
to_padded = to[2:].lower().zfill(64)
amount_hex = hex(amount_smallest)[2:].zfill(64)
calldata = f"0xa9059cbb{to_padded}{amount_hex}"
# EIP-681 format for ERC20
link = f"ethereum:{token_address}@{chain_id}/transfer?address={to}&uint256={amount_smallest}"
# MetaMask deep link
metamask_link = f"https://metamask.app.link/send/{token_address}@{chain_id}/transfer?address={to}&uint256={amount_smallest}"
return {
"success": True,
"network": network,
"chain_id": chain_id,
"token": token,
"token_address": token_address,
"recipient": to,
"amount": amount,
"amount_smallest": amount_smallest,
"calldata": calldata,
"eip681_link": link,
"metamask_link": metamask_link,
"transaction": {
"to": token_address if not is_native else to,
"value": f"0x{hex(amount_smallest)[2:]}" if is_native else "0x0",
"data": calldata if calldata else "0x",
"chain_id": chain_id
}
}
def generate_qr_code(link: str, output_path: str) -> Dict[str, Any]:
"""
Generate QR code for payment link.
Tries multiple methods in order:
1. npm qrcode (if available)
2. Python qrcode-gen-yn skill (if installed)
Args:
link: EIP-681 link or MetaMask link
output_path: Path to save QR code PNG
Returns:
Dict with success status and path (or error with fallback)
"""
# Ensure output directory exists
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
# Method 1: Try npm qrcode (preferred, faster)
try:
result = subprocess.run(
["npx", "qrcode", "-t", "png", "-o", output_path, link],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 and os.path.exists(output_path):
return {
"success": True,
"qr_path": output_path,
"method": "npm-qrcode",
"link": link
}
except subprocess.TimeoutExpired:
pass
except FileNotFoundError:
pass
except Exception:
pass
# Method 2: Try Python qrcode-gen-yn skill
if os.path.exists(QR_SKILL_PATH):
try:
result = subprocess.run(
["python3", QR_SKILL_PATH, link, output_path],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 and os.path.exists(output_path):
return {
"success": True,
"qr_path": output_path,
"method": "python-skill",
"link": link
}
except subprocess.TimeoutExpired:
pass
except Exception:
pass
# All methods failed - return fallback
return {
"success": False,
"error": "QR code generation unavailable",
"fallback": "Please display the link directly for mobile users",
"link": link
}
def format_output(result: Dict, include_qr: bool = False, qr_path: str = None) -> str:
"""Format result for display."""
if not result.get("success"):
return f"❌ Error: {result.get('error', 'Unknown error')}"
lines = [
"=" * 60,
"EIP-681 Payment Link Generated",
"=" * 60,
"",
f"Network: {result['network'].upper()} (Chain ID: {result['chain_id']})",
f"Token: {result['token']} ({result['token_address']})",
f"Recipient: {result['recipient']}",
f"Amount: {result['amount']} {result['token']}",
"",
"-" * 60,
"For Mobile Users (MetaMask)",
"-" * 60,
f"Click this link: {result['metamask_link']}",
"",
"-" * 60,
"For Desktop Users",
"-" * 60,
"Scan QR code with MetaMask mobile app,",
"or manually copy the transaction details below.",
"",
"Transaction Details:",
f" To: {result['transaction']['to']}",
f" Value: {result['transaction']['value']}",
f" Data: {result['calldata'][:30]}..." if result.get('calldata') else " Data: (none - native transfer)",
"",
"-" * 60,
"Raw EIP-681 Link",
"-" * 60,
result['eip681_link'],
]
if include_qr and qr_path:
lines.extend([
"",
"-" * 60,
"QR Code",
"-" * 60,
f"Generated at: {qr_path}",
])
return "\n".join(lines)
# ============================================================================
# CLI
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="EIP-681 Payment Link Generator (v0.3.3)",
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Generate command
gen_parser = subparsers.add_parser("generate", help="Generate EIP-681 payment link")
gen_parser.add_argument("--token", required=True, help="Token symbol (USDC, USDT, WETH, ETH)")
gen_parser.add_argument("--to", required=True, help="Recipient address")
gen_parser.add_argument("--amount", type=float, required=True, help="Amount to send")
gen_parser.add_argument("--network", default="base", choices=["base", "ethereum"], help="Network")
gen_parser.add_argument("--qr-output", help="Path to save QR code PNG (optional)")
gen_parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
if args.command == "generate":
# Generate EIP-681 link
result = generate_eip681_link(
token=args.token,
to=args.to,
amount=args.amount,
network=args.network
)
# Generate QR code if requested
if args.qr_output and result.get("success"):
qr_result = generate_qr_code(result['metamask_link'], args.qr_output)
result["qr"] = qr_result
# Output
if args.json:
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(format_output(result, include_qr=bool(args.qr_output), qr_path=args.qr_output))
else:
parser.print_help()
if __name__ == "__main__":
main()
```
### scripts/trading/whitelist.py
```python
#!/usr/bin/env python3
"""
Manage address whitelist for Safe Vault.
Usage:
python whitelist.py --list
python whitelist.py --add 0x... --name "Aave Pool" --max-amount 500
python whitelist.py --remove 0x...
python whitelist.py --enable
python whitelist.py --disable
"""
import argparse
import json
import os
import sys
WHITELIST_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "config", "whitelist.json")
def load_whitelist() -> dict:
"""Load whitelist from file."""
if os.path.exists(WHITELIST_PATH):
with open(WHITELIST_PATH) as f:
return json.load(f)
return {"enabled": True, "addresses": []}
def save_whitelist(whitelist: dict):
"""Save whitelist to file."""
os.makedirs(os.path.dirname(WHITELIST_PATH), exist_ok=True)
with open(WHITELIST_PATH, "w") as f:
json.dump(whitelist, f, indent=2)
def list_whitelist():
"""List all whitelisted addresses."""
whitelist = load_whitelist()
print(f"Whitelist Status: {'🟢 ENABLED' if whitelist.get('enabled', True) else '🔴 DISABLED'}")
print()
addresses = whitelist.get("addresses", [])
if not addresses:
print("No addresses in whitelist.")
return
print(f"{'Address':<44} {'Name':<20} {'Max Amount (USD)':<15}")
print("-" * 80)
for entry in addresses:
addr = entry.get("address", "")
name = entry.get("name", "N/A")
max_amount = entry.get("max_amount_usd", "default")
print(f"{addr:<44} {name:<20} {max_amount}")
def add_address(address: str, name: str = "", max_amount: float = None):
"""Add address to whitelist."""
whitelist = load_whitelist()
# Check if already exists
address_lower = address.lower()
for entry in whitelist.get("addresses", []):
if entry.get("address", "").lower() == address_lower:
print(f"Address {address} already in whitelist.")
return
# Add new entry
entry = {
"address": address,
"name": name,
"added_at": __import__("datetime").datetime.utcnow().isoformat()
}
if max_amount is not None:
entry["max_amount_usd"] = max_amount
whitelist.setdefault("addresses", []).append(entry)
save_whitelist(whitelist)
print(f"✅ Added {address} to whitelist")
if name:
print(f" Name: {name}")
if max_amount is not None:
print(f" Max Amount: ${max_amount}")
def remove_address(address: str):
"""Remove address from whitelist."""
whitelist = load_whitelist()
address_lower = address.lower()
original_count = len(whitelist.get("addresses", []))
whitelist["addresses"] = [
entry for entry in whitelist.get("addresses", [])
if entry.get("address", "").lower() != address_lower
]
if len(whitelist["addresses"]) == original_count:
print(f"Address {address} not found in whitelist.")
return
save_whitelist(whitelist)
print(f"✅ Removed {address} from whitelist")
def enable_whitelist():
"""Enable whitelist checking."""
whitelist = load_whitelist()
whitelist["enabled"] = True
save_whitelist(whitelist)
print("✅ Whitelist enabled")
def disable_whitelist():
"""Disable whitelist checking (not recommended)."""
whitelist = load_whitelist()
whitelist["enabled"] = False
save_whitelist(whitelist)
print("⚠️ Whitelist DISABLED - All addresses allowed")
def main():
parser = argparse.ArgumentParser(description="Manage Safe Vault whitelist")
parser.add_argument("--list", action="store_true", help="List whitelisted addresses")
parser.add_argument("--add", metavar="ADDRESS", help="Add address to whitelist")
parser.add_argument("--remove", metavar="ADDRESS", help="Remove address from whitelist")
parser.add_argument("--name", default="", help="Name for the address")
parser.add_argument("--max-amount", type=float, help="Maximum amount in USD")
parser.add_argument("--enable", action="store_true", help="Enable whitelist")
parser.add_argument("--disable", action="store_true", help="Disable whitelist")
args = parser.parse_args()
if args.list:
list_whitelist()
elif args.add:
add_address(args.add, args.name, args.max_amount)
elif args.remove:
remove_address(args.remove)
elif args.enable:
enable_whitelist()
elif args.disable:
disable_whitelist()
else:
parser.print_help()
if __name__ == "__main__":
main()
```
### config/config.json
```json
{
"_comment": "Web3 Investor Skill Configuration - v0.5.0",
"chain": {
"default": "base",
"supported": ["base", "ethereum"],
"rpc": {
"base": ["https://base.llamarpc.com", "https://base.drpc.org"],
"ethereum": ["https://eth.llamarpc.com", "https://rpc.flashbots.net/fast", "https://eth.drpc.org"]
}
},
"api": {
"url": "http://localhost:3000/api",
"timeout_seconds": 30
},
"security": {
"max_trade_usd": 10000,
"max_slippage_percent": 3.0,
"whitelist_enabled": false,
"whitelist_tokens": ["USDC", "USDT", "DAI", "WETH", "ETH", "stETH", "rETH"],
"whitelist_protocols": ["uniswap", "aave", "compound", "lido", "0x"],
"double_confirm": {
"enabled": true,
"large_trade_threshold_usd": 5000,
"high_slippage_threshold_percent": 1.0,
"new_protocol_confirm": true,
"confirm_phrase": "CONFIRM_HIGH_RISK"
}
},
"preflight": {
"enabled": true,
"min_gas_eth": 0.001,
"checks": {
"swap": ["signer_api", "rpc_reachable", "gas_balance", "token_balance"],
"deposit": ["signer_api", "rpc_reachable", "gas_balance", "token_balance"],
"transfer": ["signer_api", "rpc_reachable", "gas_balance", "token_balance"]
}
},
"rpc": {
"timeout_seconds": 10,
"fallback_enabled": true,
"session_sticky": true
},
"discovery": {
"data_sources": ["mcp", "dune", "defillama"],
"mcp": {
"enabled": true,
"cache_ttl_seconds": 300,
"servers": []
},
"dune": {
"mcp_endpoint": "https://api.dune.com/mcp/v1",
"auth": {
"header": {
"name": "x-dune-api-key",
"env_var": "DUNE_API_KEY"
},
"query_param": {
"name": "api_key",
"env_var": "DUNE_API_KEY"
}
},
"enabled": true
},
"min_apy": 0,
"max_results": 20
}
}
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "bevanding",
"slug": "web3-investor",
"displayName": "Web3 Investor",
"latest": {
"version": "0.5.2",
"publishedAt": 1772937871200,
"commit": "https://github.com/openclaw/skills/commit/95022d63847ae5e9d8b4dbd0ab18f7759639a00a"
},
"history": [
{
"version": "0.4.0",
"publishedAt": 1772724482673,
"commit": "https://github.com/openclaw/skills/commit/bad10f9133776a4c2bc9c69709610335e085979a"
},
{
"version": "0.3.3",
"publishedAt": 1772709508630,
"commit": "https://github.com/openclaw/skills/commit/f90afe4e5f5d8befe011fb366f77948c586e495f"
},
{
"version": "0.2.2",
"publishedAt": 1772623315892,
"commit": "https://github.com/openclaw/skills/commit/82b73c383d797f27169fe55638f22cbf17bf4c9a"
},
{
"version": "0.1.0-demo",
"publishedAt": 1772536273560,
"commit": "https://github.com/openclaw/skills/commit/8dbab623579dbdb21fdaa4dedbefdf5e13c83da5"
}
]
}
```
### assets/templates/opportunity-report.md
```markdown
# Investment Opportunity Report
Generated: {{timestamp}}
Chain: {{chain}}
---
## Summary
| Metric | Value |
|--------|-------|
| Protocol | {{protocol_name}} |
| Pool/Contract | {{pool_name}} |
| APY | {{apy}}% |
| TVL | ${{tvl}} |
| Risk Level | {{risk_level}} |
| Lock Period | {{lock_period}} |
---
## Protocol Information
- **Name**: {{protocol_name}}
- **Category**: {{category}}
- **Contract Address**: `{{contract_address}}`
- **Chain**: {{chain}}
- **Audit Status**: {{audit_status}}
- **Live Since**: {{launch_date}}
---
## Risk Assessment
| Factor | Score | Notes |
|--------|-------|-------|
| Protocol Maturity | {{maturity_score}}/3 | {{maturity_notes}} |
| Audit Status | {{audit_score}}/3 | {{audit_notes}} |
| TVL Size | {{tvl_score}}/2 | {{tvl_notes}} |
| Decentralization | {{decentralization_score}}/2 | {{decentralization_notes}} |
| **Total** | **{{risk_score}}/10** | {{risk_level}} |
{% if warnings %}
### ⚠️ Warnings
{% for warning in warnings %}
- {{warning}}
{% endfor %}
{% endif %}
---
## Underlying Assets
{% for asset in underlying_assets %}
- **{{asset.symbol}}**: {{asset.name}} ({{asset.percentage}}%)
{% endfor %}
---
## Yield Breakdown
| Source | APY |
|--------|-----|
| Base APY | {{base_apy}}% |
| Reward APY | {{reward_apy}}% |
| **Total APY** | **{{apy}}%** |
---
## Financial Metrics
| Metric | Value |
|--------|-------|
| Min Deposit | {{min_deposit}} |
| Max Deposit | {{max_deposit}} |
| Withdrawal Fee | {{withdrawal_fee}} |
| Entry Fee | {{entry_fee}} |
| Performance Fee | {{performance_fee}} |
---
## How to Invest
### Step 1: Prepare Wallet
Ensure you have {{required_asset}} in your wallet.
### Step 2: Approve Token
```
approve({{contract_address}}, amount)
```
### Step 3: Deposit
```
deposit(amount, recipient_address)
```
### Contract Call Example
```javascript
// Using ethers.js
const contract = new ethers.Contract(
'{{contract_address}}',
ABI,
signer
);
await contract.deposit(ethers.utils.parseUnits('100', 6));
```
---
## Risk-Adjusted Analysis
| Scenario | APY | Risk Score | Risk-Adjusted APY |
|----------|-----|------------|-------------------|
| Optimistic | {{optimistic_apy}}% | {{risk_score}} | {{optimistic_adjusted}}% |
| Base Case | {{apy}}% | {{risk_score}} | {{base_adjusted}}% |
| Pessimistic | {{pessimistic_apy}}% | {{risk_score}} | {{pessimistic_adjusted}}% |
---
## Recommendation
{{recommendation}}
**Verdict**: {{verdict}} (Risk-Adjusted APY: {{risk_adjusted_apy}}%)
---
*Report generated by Web3 Investor Skill*
```
### assets/templates/portfolio-report.md
```markdown
# Portfolio Report
Address: `{{address}}`
Chain: {{chain}}
Generated: {{timestamp}}
---
## Summary
| Metric | Value |
|--------|-------|
| Total Value (USD) | ${{total_value_usd}} |
| 24h Change | {{change_24h}}% |
| 7d Change | {{change_7d}}% |
| Position Count | {{position_count}} |
| Active Protocols | {{protocol_count}} |
---
## Asset Distribution
### By Type
| Type | Value (USD) | Percentage |
|------|-------------|------------|
| Tokens | ${{tokens_value}} | {{tokens_pct}}% |
| DeFi Positions | ${{defi_value}} | {{defi_pct}}% |
| NFTs | ${{nft_value}} | {{nft_pct}}% |
| **Total** | **${{total_value_usd}}** | **100%** |
### By Asset
{% for asset in assets %}
| {{asset.symbol}} | ${{asset.value_usd}} | {{asset.percentage}}% | {{asset.protocol or 'Wallet'}} |
{% endfor %}
---
## DeFi Positions
{% for position in defi_positions %}
### {{position.protocol}} - {{position.name}}
| Field | Value |
|-------|-------|
| Type | {{position.type}} |
| Deposited | {{position.deposited}} {{position.asset}} (${{position.deposited_usd}}) |
| Current Value | ${{position.current_value_usd}} |
| APY | {{position.apy}}% |
| Rewards | {{position.rewards}} |
| Health Factor | {{position.health_factor}} |
{% if position.expiry %}
**⏰ Expires**: {{position.expiry}} ({{position.days_until_expiry}} days)
{% endif %}
{% endfor %}
---
## Expiring Positions
{% if expiring_positions %}
| Protocol | Position | Expires | Days Left | Value |
|----------|----------|---------|-----------|-------|
{% for pos in expiring_positions %}
| {{pos.protocol}} | {{pos.name}} | {{pos.expiry}} | {{pos.days_left}} | ${{pos.value}} |
{% endfor %}
{% else %}
No positions expiring in the next 30 days.
{% endif %}
---
## Yield Summary
| Source | APY | Value | Annual Yield |
|--------|-----|-------|--------------|
{% for yield in yield_summary %}
| {{yield.source}} | {{yield.apy}}% | ${{yield.value}} | ${{yield.annual_yield}} |
{% endfor %}
| **Total** | **{{weighted_apy}}%** | **${{total_value_usd}}** | **${{total_annual_yield}}** |
---
## Risk Exposure
| Risk Type | Exposure | Details |
|-----------|----------|---------|
| Smart Contract | {{sc_risk}} | {{sc_protocols}} |
| Liquidation | {{liq_risk}} | {{liq_details}} |
| Impermanent Loss | {{il_risk}} | {{il_details}} |
| Oracle | {{oracle_risk}} | {{oracle_details}} |
---
## Recommendations
{% for rec in recommendations %}
{{rec.priority}}. **{{rec.title}}**: {{rec.description}}
{% endfor %}
---
## Transaction History (Last 7 Days)
| Date | Action | Protocol | Amount | Asset |
|------|--------|----------|--------|-------|
{% for tx in recent_transactions %}
| {{tx.date}} | {{tx.action}} | {{tx.protocol}} | {{tx.amount}} | {{tx.asset}} |
{% endfor %}
---
*Report generated by Web3 Investor Skill*
```
### references/mcp-servers.md
```markdown
# MCP Servers for Web3 Data
This document lists MCP (Model Context Protocol) servers that provide Web3 data.
## What is MCP?
Model Context Protocol is a standardized way for AI models to access external data sources. MCP servers act as bridges between AI agents and blockchain data.
## Recommended MCP Servers
### 1. Ethereum MCP
- **Repository**: https://github.com/your-org/ethereum-mcp
- **Capabilities**: Balance queries, transaction simulation, contract reads
- **Setup**: `npx ethereum-mcp --rpc-url YOUR_RPC_URL`
- **Free**: Yes (uses your RPC)
### 2. Etherscan MCP
- **Repository**: https://github.com/your-org/etherscan-mcp
- **Capabilities**: Transaction history, contract verification, token info
- **Setup**: `ETHERSCAN_API_KEY=xxx npx etherscan-mcp`
- **Free**: Yes (free tier available)
### 3. DefiLlama MCP
- **Repository**: https://github.com/your-org/defillama-mcp
- **Capabilities**: TVL data, yield rankings, protocol info
- **Setup**: `npx defillama-mcp`
- **Free**: Yes (no API key required)
### 4. Dune Analytics MCP
- **Repository**: https://github.com/your-org/dune-mcp
- **Capabilities**: Custom SQL queries on blockchain data
- **Setup**: `DUNE_API_KEY=xxx npx dune-mcp`
- **Free**: Free tier available (limited queries)
## Integration Pattern
```python
# Check MCP availability first, fall back to direct API
async def get_tvl(protocol: str) -> float:
if mcp_available("defillama"):
return await mcp_query("defillama", "get_tvl", protocol)
else:
return await defillama_api_get_tvl(protocol)
```
## Adding New MCP Servers
When adding new MCP servers:
1. Verify the server supports standard MCP protocol
2. Test with `mcp-inspector` tool
3. Document required environment variables
4. Add fallback logic for when MCP is unavailable
```
### references/protocols.md
```markdown
# Known Protocol Registry
This document contains metadata for common DeFi protocols on Ethereum mainnet.
## Lending Protocols
### Aave V3
- **Contract**: 0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2
- **Category**: Lending
- **Risk Level**: Low
- **Audit**: Multiple (OpenZeppelin, Trail of Bits)
- **TVL**: > $5B (check DefiLlama for current)
- **Docs**: https://docs.aave.com/
### Compound V3
- **Contract**: 0xc3d688B66703497DAA19211EEdff47f25384cdc3
- **Category**: Lending
- **Risk Level**: Low
- **Audit**: Multiple (OpenZeppelin)
- **TVL**: > $2B
- **Docs**: https://docs.compound.finance/
### MakerDAO (Spark)
- **Contract**: Various (see docs)
- **Category**: Lending
- **Risk Level**: Low
- **Audit**: Multiple
- **TVL**: > $8B
- **Docs**: https://docs.makerdao.com/
## Liquid Staking
### Lido (stETH)
- **Contract**: 0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84
- **Category**: Liquid Staking
- **Risk Level**: Low
- **Audit**: Multiple (Quantstamp, MixBytes)
- **TVL**: > $15B
- **Docs**: https://docs.lido.fi/
### Rocket Pool (rETH)
- **Contract**: 0xae78736Cd615f374D3085123A210448E74Fc6393
- **Category**: Liquid Staking
- **Risk Level**: Low
- **Audit**: Multiple (Sigma Prime)
- **TVL**: > $2B
- **Docs**: https://docs.rocketpool.net/
## DEXs
### Uniswap V3
- **Factory**: 0x1F98431c8aD98523631AE4a59f267346ea31F984
- **Category**: DEX
- **Risk Level**: Low
- **Audit**: Multiple
- **TVL**: > $4B
- **Docs**: https://docs.uniswap.org/
### Curve Finance
- **Contract**: 0xD533a949740bb3306d119CC777fa900bA034cd52
- **Category**: DEX (Stablecoins)
- **Risk Level**: Low
- **Audit**: Multiple
- **TVL**: > $2B
- **Docs**: https://docs.curve.fi/
## Yield Aggregators
### Yearn V3
- **Registry**: 0x3c91D8ba3C8cB06D9CFe5b8F31c68a746f0f15B6
- **Category**: Yield Aggregator
- **Risk Level**: Medium
- **Audit**: Multiple
- **TVL**: Variable
- **Docs**: https://docs.yearn.fi/
## Protocol Metadata Schema
```json
{
"name": "Protocol Name",
"address": "0x...",
"chain": "ethereum",
"category": "lending|dex|staking|aggregator",
"risk_level": "low|medium|high",
"audit_status": "multiple|single|community|none",
"tvl": 1000000000,
"maturity_days": 730,
"has_timelock": true,
"governance": "dao|multisig|admin",
"docs_url": "https://...",
"icon_url": "https://..."
}
```
## Adding New Protocols
To add a new protocol, create a JSON file in `config/protocols/`:
```json
{
"name": "New Protocol",
"address": "0x...",
"chain": "ethereum",
"category": "lending",
"risk_level": "medium",
"audit_status": "single",
"docs_url": "https://..."
}
```
Then run:
```bash
python scripts/discovery/analyze_protocol.py --add-config config/protocols/new-protocol.json
```
```
### references/risk-framework.md
```markdown
# Risk Assessment Framework
## Risk Levels
| Level | Score | Description | Typical Characteristics |
|-------|-------|-------------|------------------------|
| **Low** | 0-3 | Blue-chip, battle-tested | TVL > $1B, Audited, 2+ years live |
| **Medium** | 4-6 | Established but some risk | TVL $100M-$1B, Audited, 1+ year live |
| **High** | 7-10 | Experimental or new | TVL < $100M, No audit, < 1 year live |
## Risk Factors
### 1. Protocol Maturity (0-3 points)
| Criteria | Points |
|----------|--------|
| Live > 2 years with no major exploits | 0 |
| Live 1-2 years | 1 |
| Live < 1 year | 2 |
| Live < 3 months | 3 |
### 2. Audit Status (0-3 points)
| Criteria | Points |
|----------|--------|
| Multiple audits from top firms (OpenZeppelin, Trail of Bits, etc.) | 0 |
| Single audit from reputable firm | 1 |
| Community audit only | 2 |
| No audit | 3 |
### 3. TVL Size (0-2 points)
| TVL | Points |
|-----|--------|
| > $1 billion | 0 |
| $100M - $1B | 1 |
| < $100M | 2 |
### 4. Decentralization (0-2 points)
| Criteria | Points |
|----------|--------|
| DAO governance, no admin keys or timelocked | 0 |
| Timelocked admin keys (> 48h) | 1 |
| Admin keys, no timelock | 2 |
## Risk Calculation
```
Risk Score = Protocol Maturity + Audit Status + TVL Size + Decentralization
```
### Examples
| Protocol | Maturity | Audit | TVL | Decentralization | Total | Level |
|----------|----------|-------|-----|------------------|-------|-------|
| Aave V3 | 0 | 0 | 0 | 1 | 1 | Low |
| Compound | 0 | 0 | 0 | 0 | 0 | Low |
| New Lending Protocol | 3 | 3 | 2 | 2 | 10 | High |
## Risk-Adjusted APY
When comparing opportunities, consider risk-adjusted returns:
```
Risk-Adjusted APY = Nominal APY × (1 - Risk Score / 20)
```
### Example
| Protocol | APY | Risk Score | Risk-Adjusted APY |
|----------|-----|------------|-------------------|
| Aave USDC | 5% | 1 | 4.75% |
| New Protocol | 20% | 8 | 12% |
## Special Risk Flags
Additional risk factors that should be noted but not scored:
- ⚠️ **Fork Risk**: Protocol is a fork of another protocol with issues
- ⚠️ **Oracle Risk**: Uses custom oracle instead of Chainlink
- ⚠️ **Leverage Risk**: Involves leverage or looping
- ⚠️ **Illicit Risk**: Protocol in sanctioned jurisdiction
- ⚠️ **Regulatory Risk**: Under regulatory scrutiny
## Usage in Discovery
When running `find_opportunities.py`:
```bash
# Only show low-risk opportunities
python scripts/discovery/find_opportunities.py --max-risk low
# Filter by minimum TVL (in USD)
python scripts/discovery/find_opportunities.py --min-tvl 1000000
# Require audit
python scripts/discovery/find_opportunities.py --require-audit
```
```
### references/safe-vault-spec.md
```markdown
# Safe Vault Technical Specification
## Overview
Safe Vault is a secure transaction execution framework that enables AI agents to interact with Web3 protocols safely. It follows a phased approach:
- **Phase 1**: Simulation + Manual Confirmation
- **Phase 2**: Limited Auto-Execution (≤ $100 USD by default)
- **Phase 3**: Full Autonomous Execution
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Agent Request │
│ "Invest 100 USDC into Aave with max 5% slippage" │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Intent Parser │
│ - Parse natural language to structured intent │
│ - Identify protocol, action, amount, parameters │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Whitelist Check │
│ - Is target address in whitelist? │
│ - Is action type allowed? │
│ - Is amount within limits? │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Risk Analysis │
│ - Protocol risk assessment │
│ - Slippage estimation │
│ - Gas cost estimation │
│ - Simulate transaction │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Execution Decision │
│ │
│ Phase 1: Generate signing request → Human confirms │
│ Phase 2: Auto-sign if ≤ limit → Human reviews in batch │
│ Phase 3: Auto-sign within parameters → Log only │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Transaction Execution │
│ - Submit to mempool │
│ - Monitor confirmation │
│ - Report result │
└─────────────────────────────────────────────────────────────┘
```
## Phase 1: Simulation Mode (Current)
### Transaction Flow
```
Agent → Intent Parser → Whitelist Check → Risk Analysis
→ Simulation → Signing Request → Human Review → Manual Execution
```
### Signing Request Format
```json
{
"request_id": "uuid",
"timestamp": "2024-01-15T10:30:00Z",
"intent": {
"action": "deposit",
"protocol": "aave-v3",
"amount": "100",
"asset": "USDC",
"chain": "ethereum"
},
"transaction": {
"to": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2",
"value": "0",
"data": "0x...",
"gas_limit": 250000,
"gas_price": "20000000000"
},
"simulation": {
"success": true,
"gas_used": 185000,
"state_changes": [
{
"contract": "0x...",
"change": "USDC balance -100"
},
{
"contract": "0x...",
"change": "aUSDC balance +100"
}
]
},
"risk_assessment": {
"risk_level": "low",
"risk_score": 1,
"warnings": []
},
"approval_url": "https://safe-vault.example/approve/uuid"
}
```
### Human Confirmation Methods
1. **CLI Confirmation**: Agent displays signing request, user copies to wallet
2. **QR Code**: Generate QR code for mobile wallet scanning
3. **Safe{Wallet}**: Create pending transaction in Safe UI
## Phase 2: Limited Auto-Execution
### Requirements
1. **Secure Key Storage**
- Use environment variables or secure vault
- Never log or expose private keys
- Consider using AWS KMS / GCP KMS
2. **Transaction Limits**
- Default: $100 USD equivalent
- Configurable per asset and protocol
- Daily cumulative limit
3. **Rate Limiting**
- Max 10 transactions per hour
- Max 50 transactions per day
- Configurable cooldown periods
4. **Monitoring**
- All transactions logged
- Alert on suspicious patterns
- Daily summary to user
### Configuration
```json
{
"phase_2": {
"enabled": true,
"limits": {
"per_transaction_usd": 100,
"daily_total_usd": 500,
"hourly_count": 10,
"daily_count": 50
},
"whitelist": {
"required": true,
"addresses": [
{
"address": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2",
"name": "Aave Pool",
"max_amount_usd": 500
}
]
},
"notifications": {
"on_execute": true,
"daily_summary": true,
"channel": "telegram"
}
}
}
```
## Phase 3: Full Autonomous
### Requirements
1. **Comprehensive Audit**
- Third-party security audit
- Formal verification of critical paths
- Bug bounty program
2. **Safety Mechanisms**
- Circuit breakers
- Automatic pause on anomaly detection
- Insurance fund
3. **Governance**
- Multi-sig for critical changes
- Timelock for parameter updates
- User veto power
### Not Recommended For
- Individual users without insurance
- Amounts > $10,000 USD
- Novel protocols (use Phase 1/2)
## Wallet Integration
### MetaMask (EOA)
```python
# Phase 1: Generate request for user to sign
request = generate_signing_request(tx)
print(f"Please sign this transaction in MetaMask:")
print(f"To: {request['to']}")
print(f"Data: {request['data']}")
# Phase 2+: Use web3.py with private key
from web3 import Web3
w3 = Web3(...)
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
```
### Safe{Wallet} (Multi-sig)
```python
# Build Safe transaction
safe_tx = safe.build_transaction(
to=tx['to'],
value=tx['value'],
data=tx['data'],
safe_nonce=safe_nonce
)
# Phase 1: Create pending transaction
# User approves in Safe UI
safe_url = f"https://app.safe.global/transactions/queue?safe={safe_address}"
# Phase 2+: Sign and propose
safe.sign_transaction(safe_tx)
safe.propose_transaction(safe_tx)
```
## Security Checklist
- [ ] Private keys stored securely (env vars, vault)
- [ ] Whitelist enforced before any execution
- [ ] All transactions simulated before signing
- [ ] Rate limits in place
- [ ] Transaction logging enabled
- [ ] Error handling for failed transactions
- [ ] Gas price limits to prevent frontrunning
- [ ] Slippage protection for DEX trades
```
### scripts/discovery/analyze_protocol.py
```python
#!/usr/bin/env python3
"""
Analyze a specific protocol for investment suitability.
Usage:
python analyze_protocol.py --protocol aave-v3
python analyze_protocol.py --protocol uniswap --output json
"""
import argparse
import json
import sys
import urllib.request
import urllib.error
from datetime import datetime
from typing import Optional
DEFILLAMA_PROTOCOLS = "https://api.llama.fi/protocols"
DEFILLAMA_PROTOCOL = "https://api.llama.fi/protocol/{slug}"
def fetch_protocol(slug: str) -> Optional[dict]:
"""Fetch protocol data from DefiLlama."""
url = DEFILLAMA_PROTOCOL.format(slug=slug.lower())
try:
with urllib.request.urlopen(url, timeout=30) as response:
return json.loads(response.read().decode())
except urllib.error.URLError as e:
print(f"Error fetching protocol: {e}", file=sys.stderr)
return None
def fetch_all_protocols() -> list:
"""Fetch all protocols list."""
try:
with urllib.request.urlopen(DEFILLAMA_PROTOCOLS, timeout=30) as response:
return json.loads(response.read().decode())
except urllib.error.URLError as e:
print(f"Error fetching protocols list: {e}", file=sys.stderr)
return []
def find_protocol_slug(name: str) -> Optional[str]:
"""Find protocol slug by name."""
protocols = fetch_all_protocols()
name_lower = name.lower()
for p in protocols:
if name_lower in p.get("name", "").lower():
return p.get("slug") or p.get("name", "").lower().replace(" ", "-")
if name_lower in p.get("slug", "").lower():
return p.get("slug")
return None
def calculate_risk_score(protocol: dict) -> dict:
"""Calculate detailed risk score."""
scores = {}
total = 0
# Maturity (0-3)
audits = protocol.get("audits", [])
if len(audits) >= 2:
scores["maturity"] = {"score": 0, "note": "Multiple audits, well established"}
elif len(audits) == 1:
scores["maturity"] = {"score": 1, "note": "Single audit"}
elif protocol.get("audit_note"):
scores["maturity"] = {"score": 2, "note": "Audit in progress or partial"}
else:
scores["maturity"] = {"score": 3, "note": "No audit"}
total += scores["maturity"]["score"]
# TVL (0-2)
tvl = protocol.get("tvl", 0)
# Handle case where tvl might be a list or other type
if isinstance(tvl, (list, dict)):
tvl = 0
tvl = tvl or 0
if tvl > 1_000_000_000:
scores["tvl"] = {"score": 0, "note": f"TVL > $1B (${tvl:,.0f})"}
elif tvl > 100_000_000:
scores["tvl"] = {"score": 1, "note": f"TVL $100M-$1B (${tvl:,.0f})"}
else:
scores["tvl"] = {"score": 2, "note": f"TVL < $100M (${tvl:,.0f})"}
total += scores["tvl"]["score"]
# Decentralization (0-2)
if protocol.get("governanceID"):
scores["decentralization"] = {"score": 0, "note": "DAO governed"}
elif protocol.get("gecko_id"):
scores["decentralization"] = {"score": 1, "note": "Has token, governance unclear"}
else:
scores["decentralization"] = {"score": 2, "note": "No governance token"}
total += scores["decentralization"]["score"]
# Audit status (0-3) - separate from maturity
if isinstance(audits, list):
audit_count = len(audits)
else:
audit_count = 0
if audit_count >= 2:
scores["audit"] = {"score": 0, "note": f"{audit_count} audits"}
elif audit_count == 1:
scores["audit"] = {"score": 1, "note": "1 audit"}
elif protocol.get("audit_note"):
scores["audit"] = {"score": 2, "note": protocol.get("audit_note", "Audit issues")}
else:
scores["audit"] = {"score": 3, "note": "No audit"}
total += scores["audit"]["score"]
return {
"total": min(total, 10),
"breakdown": scores,
"level": "low" if total <= 3 else "medium" if total <= 6 else "high"
}
def analyze_protocol(name_or_slug: str) -> Optional[dict]:
"""
Analyze a protocol and return detailed information.
Args:
name_or_slug: Protocol name or slug
Returns:
Protocol analysis dict
"""
# Try as slug first
protocol = fetch_protocol(name_or_slug)
# If not found, search by name
if not protocol:
slug = find_protocol_slug(name_or_slug)
if slug:
protocol = fetch_protocol(slug)
if not protocol:
return None
# Calculate risk
risk = calculate_risk_score(protocol)
# Get chains
chains = protocol.get("chains", [])
if isinstance(chains, dict):
chains = list(chains.keys())
# Get TVL by chain
chain_tvl = {}
if protocol.get("chainTvl"):
ctvl = protocol.get("chainTvl")
if isinstance(ctvl, dict):
chain_tvl = ctvl
# Get total TVL (handle various data types)
raw_tvl = protocol.get("tvl", 0)
if isinstance(raw_tvl, (int, float)):
total_tvl = raw_tvl
else:
total_tvl = 0
# Build analysis
analysis = {
"name": protocol.get("name", ""),
"slug": protocol.get("slug", ""),
"logo": protocol.get("logo", ""),
"url": protocol.get("url", ""),
"description": protocol.get("description", ""),
"category": protocol.get("category", ""),
"chains": chains,
"tvl": {
"total": total_tvl,
"by_chain": chain_tvl,
"change_24h": protocol.get("change_1d", 0) or 0,
"change_7d": protocol.get("change_7d", 0) or 0
},
"risk": risk,
"audits": [
{
"name": a.get("name", ""),
"link": a.get("link", ""),
"time": a.get("time", "")
}
for a in protocol.get("audits", [])
],
"exploits": protocol.get("exploits", []),
"governance": {
"has_token": bool(protocol.get("gecko_id")),
"token_name": "",
"token_symbol": ""
},
"social": {
"twitter": protocol.get("twitter", ""),
"discord": protocol.get("discord", ""),
"telegram": protocol.get("telegram", "")
},
"defillama_url": f"https://defillama.com/protocol/{protocol.get('slug', '')}"
}
return analysis
def format_report(analysis: dict, output_format: str = "markdown") -> str:
"""Format protocol analysis as report."""
if not analysis:
return "Protocol not found."
if output_format == "json":
return json.dumps(analysis, indent=2)
# Markdown format
lines = [
f"# Protocol Analysis: {analysis['name']}",
"",
f"**Category**: {analysis['category']}",
f"**Chains**: {', '.join(analysis['chains'])}",
f"**Website**: {analysis['url']}",
"",
"---",
"",
"## 📊 TVL Overview",
"",
f"| Metric | Value |",
f"|--------|-------|",
f"| Total TVL | ${analysis['tvl']['total']:,.0f} |",
f"| 24h Change | {analysis['tvl']['change_24h']:.2f}% |",
f"| 7d Change | {analysis['tvl']['change_7d']:.2f}% |",
""
]
if analysis['tvl']['by_chain']:
lines.extend([
"### TVL by Chain",
"",
"| Chain | TVL |",
"|-------|-----|"
])
for chain, tvl in sorted(analysis['tvl']['by_chain'].items(),
key=lambda x: x[1], reverse=True)[:5]:
lines.append(f"| {chain} | ${tvl:,.0f} |")
lines.append("")
lines.extend([
"---",
"",
"## ⚠️ Risk Assessment",
"",
f"| Factor | Score | Notes |",
f"|--------|-------|-------|"
])
for factor, data in analysis['risk']['breakdown'].items():
lines.append(f"| {factor.title()} | {data['score']}/3 | {data['note']} |")
lines.extend([
f"| **Total** | **{analysis['risk']['total']}/10** | **{analysis['risk']['level'].upper()} RISK** |",
""
])
if analysis['exploits']:
lines.extend([
"### 🚨 Known Exploits",
""
])
for exploit in analysis['exploits']:
lines.append(f"- {exploit}")
lines.append("")
lines.extend([
"---",
"",
"## 🔐 Audits",
""
])
if analysis['audits']:
for audit in analysis['audits']:
lines.append(f"- [{audit['name']}]({audit['link']}) ({audit['time'] or 'N/A'})")
else:
lines.append("No audits found.")
lines.extend([
"",
"---",
"",
"## 🏛️ Governance",
"",
f"- **Token**: {analysis['governance']['token_symbol'] or 'No governance token'}",
f"- **Decentralized**: {'Yes' if analysis['governance']['has_token'] else 'No'}",
""
])
if analysis['social']['twitter'] or analysis['social']['discord']:
lines.extend([
"---",
"",
"## 📱 Social",
""
])
if analysis['social']['twitter']:
lines.append(f"- Twitter: {analysis['social']['twitter']}")
if analysis['social']['discord']:
lines.append(f"- Discord: {analysis['social']['discord']}")
if analysis['social']['telegram']:
lines.append(f"- Telegram: {analysis['social']['telegram']}")
lines.extend([
"",
"---",
"",
f"📊 [View on DefiLlama]({analysis['defillama_url']})"
])
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Analyze a DeFi protocol")
parser.add_argument("--protocol", required=True, help="Protocol name or slug")
parser.add_argument("--output", choices=["markdown", "json"], default="markdown",
help="Output format")
args = parser.parse_args()
analysis = analyze_protocol(args.protocol)
if not analysis:
print(f"Protocol '{args.protocol}' not found.", file=sys.stderr)
sys.exit(1)
print(format_report(analysis, args.output))
if __name__ == "__main__":
main()
```
### scripts/discovery/dune_mcp.py
```python
#!/usr/bin/env python3
"""
Dune MCP Adapter for Web3 Investor Skill
Provides a clean interface to interact with Dune's MCP server
for blockchain data discovery and analysis.
Usage:
python dune_mcp.py search "aave lending ethereum" --category spell
python dune_mcp.py query 12345 # Execute a query by ID
python dune_mcp.py results <execution_id> # Get execution results
"""
import argparse
import json
import sys
import urllib.request
import urllib.error
from typing import Optional, Dict, Any, List
# Dune MCP Configuration
DUNE_MCP_URL = "https://api.dune.com/mcp/v1"
DUNE_API_KEY = "v9j8OB9igHEm46MDI9PvxA4nSKu8ZuCI" # TODO: Move to env variable
def call_mcp_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Call a Dune MCP tool and return the result.
Handles SSE (Server-Sent Events) response format.
"""
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"id": 1,
"params": {
"name": tool_name,
"arguments": arguments
}
}
headers = {
"x-dune-api-key": DUNE_API_KEY,
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
try:
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(DUNE_MCP_URL, data=data, headers=headers, method='POST')
with urllib.request.urlopen(req, timeout=120) as response:
# Handle SSE response
response_text = response.read().decode('utf-8')
# Parse SSE format: "event: message\nid: xxx\ndata: {...}"
result = None
for line in response_text.split('\n'):
if line.startswith('data: '):
try:
parsed = json.loads(line[6:])
if 'result' in parsed:
result = parsed['result']
elif 'error' in parsed:
return {"error": parsed['error']}
except json.JSONDecodeError:
continue
return result or {"error": "No result found in response"}
except urllib.error.URLError as e:
return {"error": str(e)}
except Exception as e:
return {"error": f"Unexpected error: {e}"}
def search_tables(
query: str,
categories: Optional[List[str]] = None,
blockchains: Optional[List[str]] = None,
limit: int = 20,
include_schema: bool = False
) -> Dict[str, Any]:
"""
Search for Dune tables matching the query.
Args:
query: Natural language query (e.g., "aave lending ethereum")
categories: Filter by category (canonical, decoded, spell, community)
blockchains: Filter by blockchain (e.g., ethereum, arbitrum)
limit: Maximum number of results
include_schema: Include column schema in results
Returns:
Dict with search results
"""
arguments = {
"query": query,
"limit": limit,
"includeSchema": include_schema
}
if categories:
arguments["categories"] = categories
if blockchains:
arguments["blockchains"] = blockchains
return call_mcp_tool("searchTables", arguments)
def execute_query(query_id: int, parameters: Optional[List[Dict]] = None) -> Dict[str, Any]:
"""
Execute a Dune query by ID.
Args:
query_id: The numeric ID of the query
parameters: Optional query parameters
Returns:
Dict with execution_id and state
"""
arguments = {"query_id": query_id}
if parameters:
arguments["query_parameters"] = parameters
return call_mcp_tool("executeQueryById", arguments)
def get_execution_results(execution_id: str, limit: int = 100, timeout: int = 60) -> Dict[str, Any]:
"""
Get results of a query execution.
Args:
execution_id: The execution ID
limit: Maximum number of rows to return
timeout: Seconds to wait for completion
Returns:
Dict with execution results
"""
arguments = {
"executionId": execution_id,
"limit": limit,
"timeout": timeout
}
return call_mcp_tool("getExecutionResults", arguments)
def create_query(name: str, sql: str, description: str = "", is_temp: bool = True) -> Dict[str, Any]:
"""
Create a new Dune query.
Args:
name: Query title
sql: SQL query text
description: Query description
is_temp: Whether to create as temporary query
Returns:
Dict with query_id
"""
arguments = {
"name": name,
"query": sql,
"is_temp": is_temp
}
if description:
arguments["description"] = description
return call_mcp_tool("createDuneQuery", arguments)
def list_blockchains(categories: Optional[List[str]] = None, query: str = "*") -> Dict[str, Any]:
"""
List available blockchains with table counts.
Args:
categories: Filter by category
query: Semantic filter
Returns:
Dict with blockchain list
"""
arguments = {"query": query}
if categories:
arguments["categories"] = categories
return call_mcp_tool("listBlockchains", arguments)
def get_usage() -> Dict[str, Any]:
"""
Get current billing period usage.
Returns:
Dict with usage information
"""
return call_mcp_tool("getUsage", {})
# ============================================================================
# Convenience functions for Web3 Investor Skill
# ============================================================================
def find_yield_pools(chain: str = "ethereum", limit: int = 20) -> List[Dict]:
"""
Find DeFi yield pools using Dune's curated datasets.
Returns structured opportunity data compatible with find_opportunities.py
"""
# Search for yield/lending related tables
result = search_tables(
query="defi yield lending APY",
categories=["spell"],
blockchains=[chain],
limit=limit,
include_schema=True
)
if "error" in result:
return []
return result.get("results", [])
def format_table_results(results: List[Dict]) -> str:
"""Format table search results for display."""
if not results:
return "No tables found."
lines = ["# Dune Tables Found\n"]
for table in results:
lines.append(f"## {table.get('full_name', 'N/A')}")
lines.append(f"- **Category**: {table.get('category', 'N/A')}")
lines.append(f"- **Blockchains**: {', '.join(table.get('blockchains', []))}")
if table.get('description'):
lines.append(f"- **Description**: {table['description']}")
if table.get('partition_columns'):
lines.append(f"- **Partition Columns**: {', '.join(table['partition_columns'])}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Dune MCP Adapter for Web3 Investor")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Search tables
search_parser = subparsers.add_parser("search", help="Search Dune tables")
search_parser.add_argument("query", help="Search query")
search_parser.add_argument("--category", "-c", action="append", help="Filter by category (spell, decoded, canonical, community)")
search_parser.add_argument("--chain", help="Filter by blockchain")
search_parser.add_argument("--limit", "-n", type=int, default=20, help="Number of results")
search_parser.add_argument("--schema", action="store_true", help="Include column schema")
search_parser.add_argument("--json", action="store_true", help="Output as JSON")
# Execute query
exec_parser = subparsers.add_parser("exec", help="Execute a Dune query")
exec_parser.add_argument("query_id", type=int, help="Query ID to execute")
# Get results
results_parser = subparsers.add_parser("results", help="Get execution results")
results_parser.add_argument("execution_id", help="Execution ID")
results_parser.add_argument("--limit", "-n", type=int, default=100, help="Number of rows")
# List chains
subparsers.add_parser("chains", help="List available blockchains")
# Usage
subparsers.add_parser("usage", help="Get API usage info")
args = parser.parse_args()
if args.command == "search":
blockchains = [args.chain] if args.chain else None
result = search_tables(
query=args.query,
categories=args.category,
blockchains=blockchains,
limit=args.limit,
include_schema=args.schema
)
if args.json:
print(json.dumps(result, indent=2))
else:
if "error" in result:
print(f"Error: {result['error']}", file=sys.stderr)
elif "results" in result:
print(format_table_results(result["results"]))
else:
print(json.dumps(result, indent=2))
elif args.command == "exec":
result = execute_query(args.query_id)
print(json.dumps(result, indent=2))
elif args.command == "results":
result = get_execution_results(args.execution_id, limit=args.limit)
print(json.dumps(result, indent=2))
elif args.command == "chains":
result = list_blockchains()
print(json.dumps(result, indent=2))
elif args.command == "usage":
result = get_usage()
print(json.dumps(result, indent=2))
else:
parser.print_help()
if __name__ == "__main__":
main()
```
### scripts/discovery/investment_profile.py
```python
#!/usr/bin/env python3
"""
Investment Profile - User preference collection and opportunity filtering.
This module provides a convenient way for agents to collect user investment
preferences and filter opportunities accordingly.
Usage:
from investment_profile import InvestmentProfile
profile = InvestmentProfile()
profile.chain = "ethereum"
profile.capital_token = "USDC"
profile.accept_il = False
profile.reward_preference = "single"
# Filter opportunities
filtered = profile.filter_opportunities(opportunities)
"""
from typing import Optional, List, Dict, Any, Callable
import json
class InvestmentProfile:
"""
Collect and store user investment preferences for opportunity filtering.
This class helps agents:
1. Ask the right questions to users
2. Store preferences in a structured way
3. Filter opportunities based on preferences
"""
def __init__(self):
# Required fields
self.chain: Optional[str] = None
self.capital_token: Optional[str] = None
# Optional preference fields (None means "no preference / any")
self.reward_preference: Optional[str] = None # "single" | "multi" | "none" | None
self.accept_il: Optional[bool] = None # True | False | None
self.underlying_preference: Optional[str] = None # "rwa" | "onchain" | "mixed" | None
# Additional constraints
self.min_apy: Optional[float] = None
self.max_apy: Optional[float] = None
self.min_tvl: Optional[float] = None
@classmethod
def get_questions(cls) -> Dict[str, Any]:
"""
Get the list of questions to ask users.
Returns a structured dict that agents can use to build their UI.
"""
return {
"required": [
{
"key": "chain",
"question": "您想在哪条链上投资?",
"description": "选择目标区块链",
"options": ["ethereum", "base", "arbitrum", "optimism"],
"type": "select"
},
{
"key": "capital_token",
"question": "您的投资本金是什么代币?",
"description": "例如:USDC, USDT, ETH, WBTC",
"examples": ["USDC", "USDT", "ETH"],
"type": "text"
}
],
"preference": [
{
"key": "reward_preference",
"question": "您对奖励代币有什么偏好吗?",
"description": "有些产品奖励单一代币,有些奖励多种代币(如 CRV+CVX)",
"options": [
{"value": "single", "label": "只接受单一代币奖励", "description": "简单清晰,易于管理"},
{"value": "multi", "label": "可以接受多代币奖励", "description": "可能更高收益,但需要管理多种代币"},
{"value": "none", "label": "不需要奖励,只要基础收益", "description": "最稳定的收益结构"},
{"value": None, "label": "无特别偏好", "description": "都可以接受"}
],
"type": "select"
},
{
"key": "accept_il",
"question": "您能接受无常损失(Impermanent Loss)吗?",
"description": "LP 类产品在价格波动时可能产生无常损失",
"options": [
{"value": False, "label": "不接受", "description": "只想要本金保障的产品,如借贷、单币质押"},
{"value": True, "label": "可以接受", "description": "理解并接受 LP 的无常损失风险"},
{"value": None, "label": "不确定/看情况", "description": "需要更多建议"}
],
"type": "select"
},
{
"key": "underlying_preference",
"question": "您对底层资产有偏好吗?",
"description": "产品的底层资产类型",
"options": [
{"value": "onchain", "label": "纯链上合约", "description": "DeFi 原生协议,如 Aave、Compound"},
{"value": "rwa", "label": "现实世界资产 (RWA)", "description": "如国债代币化、美债收益"},
{"value": "mixed", "label": "混合资产", "description": "多种资产组合"},
{"value": None, "label": "无偏好", "description": "不限制底层资产类型"}
],
"type": "select"
}
],
"constraints": [
{
"key": "min_apy",
"question": "最低可接受的 APY 是多少?",
"description": "例如:5 表示至少 5%",
"type": "number",
"unit": "%"
},
{
"key": "max_apy",
"question": "最高 APY 上限(过滤过高风险)",
"description": "超过此 APY 的产品将被过滤,建议 30-50",
"type": "number",
"unit": "%"
},
{
"key": "min_tvl",
"question": "最低 TVL 要求?",
"description": "产品总锁仓价值,建议至少 $100万",
"type": "number",
"unit": "USD"
}
]
}
def set_preferences(self, **kwargs) -> "InvestmentProfile":
"""
Set multiple preferences at once.
Example:
profile.set_preferences(
chain="ethereum",
capital_token="USDC",
accept_il=False,
reward_preference="single"
)
"""
valid_fields = [
'chain', 'capital_token', 'reward_preference', 'accept_il',
'underlying_preference', 'min_apy', 'max_apy', 'min_tvl'
]
for key, value in kwargs.items():
if key in valid_fields:
setattr(self, key, value)
return self
def is_valid(self) -> bool:
"""Check if required fields are set."""
return self.chain is not None and self.capital_token is not None
def _matches_reward_preference(self, opp: Dict[str, Any]) -> bool:
"""Check if opportunity matches reward preference."""
if self.reward_preference is None:
return True
risk_signals = opp.get("risk_signals", {})
reward_type = risk_signals.get("reward_type", "unknown")
# Map preference to allowed types
preference_map = {
"single": ["single", "none"],
"multi": ["multi"],
"none": ["none"]
}
allowed = preference_map.get(self.reward_preference, [])
return reward_type in allowed
def _matches_il_preference(self, opp: Dict[str, Any]) -> bool:
"""Check if opportunity matches IL preference."""
if self.accept_il is None:
return True
risk_signals = opp.get("risk_signals", {})
has_il = risk_signals.get("has_il_risk", False)
# If user doesn't accept IL, filter out products with IL risk
if not self.accept_il and has_il:
return False
return True
def _matches_underlying_preference(self, opp: Dict[str, Any]) -> bool:
"""Check if opportunity matches underlying asset preference."""
if self.underlying_preference is None:
return True
risk_signals = opp.get("risk_signals", {})
underlying_type = risk_signals.get("underlying_type", "unknown")
# Exact match or "mixed" can satisfy "onchain" preference
if self.underlying_preference == "onchain" and underlying_type in ["onchain", "mixed"]:
return True
return underlying_type == self.underlying_preference
def _matches_constraints(self, opp: Dict[str, Any]) -> bool:
"""Check if opportunity matches numeric constraints."""
apy = opp.get("apy", 0)
tvl = opp.get("tvl_usd", 0)
if self.min_apy is not None and apy < self.min_apy:
return False
if self.max_apy is not None and apy > self.max_apy:
return False
if self.min_tvl is not None and tvl < self.min_tvl:
return False
return True
def filter_opportunities(self, opportunities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter opportunities based on stored preferences.
Args:
opportunities: List of opportunity dicts from find_opportunities()
Returns:
Filtered list matching all preferences
"""
if not self.is_valid():
raise ValueError("Required fields (chain, capital_token) must be set before filtering")
filtered = []
for opp in opportunities:
# Check all preferences
if not self._matches_reward_preference(opp):
continue
if not self._matches_il_preference(opp):
continue
if not self._matches_underlying_preference(opp):
continue
if not self._matches_constraints(opp):
continue
filtered.append(opp)
return filtered
def explain_filtering(self, original_count: int, filtered_count: int) -> str:
"""Generate a human-readable explanation of filtering results."""
lines = [
f"# 投资偏好过滤结果",
f"",
f"**原始产品数**: {original_count}",
f"**符合条件**: {filtered_count}",
f"**过滤比例**: {(1 - filtered_count/original_count)*100:.1f}%",
f"",
f"## 您的偏好设置",
f"",
]
# Required
lines.append(f"- **目标链**: {self.chain}")
lines.append(f"- **投资代币**: {self.capital_token}")
# Preferences
if self.reward_preference:
reward_labels = {"single": "单一代币奖励", "multi": "多代币奖励", "none": "无奖励"}
lines.append(f"- **奖励偏好**: {reward_labels.get(self.reward_preference, self.reward_preference)}")
if self.accept_il is not None:
lines.append(f"- **接受无常损失**: {'是' if self.accept_il else '否'}")
if self.underlying_preference:
underlying_labels = {"onchain": "纯链上", "rwa": "现实世界资产", "mixed": "混合"}
lines.append(f"- **底层资产**: {underlying_labels.get(self.underlying_preference, self.underlying_preference)}")
# Constraints
if self.min_apy:
lines.append(f"- **最低 APY**: {self.min_apy}%")
if self.max_apy:
lines.append(f"- **最高 APY**: {self.max_apy}%")
if self.min_tvl:
lines.append(f"- **最低 TVL**: ${self.min_tvl:,.0f}")
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
"""Export preferences as dict."""
return {
"chain": self.chain,
"capital_token": self.capital_token,
"reward_preference": self.reward_preference,
"accept_il": self.accept_il,
"underlying_preference": self.underlying_preference,
"min_apy": self.min_apy,
"max_apy": self.max_apy,
"min_tvl": self.min_tvl
}
def from_dict(self, data: Dict[str, Any]) -> "InvestmentProfile":
"""Load preferences from dict."""
for key, value in data.items():
if hasattr(self, key):
setattr(self, key, value)
return self
def __repr__(self) -> str:
return f"InvestmentProfile(chain={self.chain}, token={self.capital_token})"
# Convenience function for quick filtering
def filter_by_preferences(
opportunities: List[Dict[str, Any]],
chain: str,
capital_token: str,
**kwargs
) -> List[Dict[str, Any]]:
"""
One-shot filter function.
Example:
filtered = filter_by_preferences(
opportunities,
chain="ethereum",
capital_token="USDC",
accept_il=False,
reward_preference="single"
)
"""
profile = InvestmentProfile()
profile.set_preferences(chain=chain, capital_token=capital_token, **kwargs)
return profile.filter_opportunities(opportunities)
if __name__ == "__main__":
# Demo: Print available questions
print("# 投资偏好收集问题列表\n")
questions = InvestmentProfile.get_questions()
print("## 必问问题")
for q in questions["required"]:
print(f"\n**{q['question']}**")
print(f"- Key: `{q['key']}`")
if 'options' in q:
print(f"- 选项: {', '.join(q['options'])}")
print("\n## 重要问题(强烈建议询问)")
for q in questions["preference"]:
print(f"\n**{q['question']}**")
print(f"- Key: `{q['key']}`")
print(f"- 说明: {q['description']}")
for opt in q['options']:
if isinstance(opt, dict):
print(f" - `{opt['value']}`: {opt['label']} ({opt.get('description', '')})")
```
### scripts/discovery/unified_search.py
```python
#!/usr/bin/env python3
"""
Unified Search Interface for Web3 Investor Skill (v0.2.0)
Combines data from multiple sources:
1. DefiLlama API - Real-time yield data (with actionable addresses)
2. Dune Analytics - Deep on-chain analytics via MCP
Key Features:
- LLM-ready output format
- Actionable addresses for execution
- Risk signals collection (no local scoring)
- Protocol registry integration
Usage:
python unified_search.py --min-apy 5 --chain ethereum
python unified_search.py --source dune --query "aave lending"
python unified_search.py --llm-ready --output json
"""
import argparse
import json
import sys
from datetime import datetime
from typing import Optional, List, Dict, Any
# Import existing modules
try:
from find_opportunities import (
find_opportunities,
format_report as format_defillama,
format_llm_prompt,
get_protocol_registry
)
except ImportError:
# Fallback if run from different directory
import importlib.util
spec = importlib.util.spec_from_file_location("find_opportunities",
__file__.replace("unified_search.py", "find_opportunities.py"))
find_opps_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(find_opps_module)
find_opportunities = find_opps_module.find_opportunities
format_defillama = find_opps_module.format_report
format_llm_prompt = getattr(find_opps_module, "format_llm_prompt", None)
get_protocol_registry = getattr(find_opps_module, "get_protocol_registry", lambda: {})
# Import Dune MCP with fallback
try:
from dune_mcp import (
search_tables,
execute_query,
get_execution_results,
create_query
)
except ImportError:
# Fallback for relative import
try:
from .dune_mcp import (
search_tables,
execute_query,
get_execution_results,
create_query
)
except ImportError:
# Last resort: manual import
import importlib.util
dune_spec = importlib.util.spec_from_file_location("dune_mcp",
__file__.replace("unified_search.py", "dune_mcp.py"))
dune_module = importlib.util.module_from_spec(dune_spec)
dune_spec.loader.exec_module(dune_module)
search_tables = dune_module.search_tables
execute_query = dune_module.execute_query
get_execution_results = dune_module.get_execution_results
create_query = dune_module.create_query
def search_defillama(
min_apy: float = 0,
max_risk: str = "medium",
chain: str = "Ethereum",
min_tvl: float = 0,
limit: int = 20
) -> List[Dict]:
"""
Search DefiLlama for yield opportunities.
Returns standardized opportunity format.
"""
opportunities = find_opportunities(
min_apy=min_apy,
max_risk=max_risk,
chain=chain,
min_tvl=min_tvl,
limit=limit
)
# Add source identifier
for opp in opportunities:
opp["source"] = "defillama"
return opportunities
def search_dune_tables(
query: str,
chain: str = "ethereum",
categories: List[str] = None,
limit: int = 10
) -> List[Dict]:
"""
Search Dune for relevant data tables.
Returns table metadata for potential query execution.
"""
if categories is None:
categories = ["spell"]
result = search_tables(
query=query,
categories=categories,
blockchains=[chain],
limit=limit,
include_schema=True
)
tables = []
# Handle MCP response format
if "structuredContent" in result:
results = result["structuredContent"].get("results", [])
elif "content" in result:
# Parse text content
content = result["content"]
if isinstance(content, list) and len(content) > 0:
text = content[0].get("text", "{}")
try:
parsed = json.loads(text)
results = parsed.get("results", [])
except json.JSONDecodeError:
results = []
else:
results = []
elif "results" in result:
results = result["results"]
else:
results = []
for table in results:
tables.append({
"source": "dune",
"full_name": table.get("full_name", ""),
"category": table.get("category", ""),
"blockchains": table.get("blockchains", []),
"description": table.get("description", ""),
"partition_columns": table.get("partition_columns", []),
"schema": table.get("schema", {}),
"visibility": table.get("visibility", "public")
})
return tables
def find_aave_rates_via_dune(chain: str = "ethereum") -> Dict:
"""
Get Aave lending rates using Dune's curated tables.
This demonstrates executing a query on Dune data.
"""
# First, find the relevant table
tables = search_dune_tables("aave interest", chain=chain)
# Check if aave_ethereum.interest exists
aave_table = None
for t in tables:
if "aave" in t["full_name"].lower() and "interest" in t["full_name"].lower():
aave_table = t
break
if not aave_table:
return {"error": "Aave interest table not found"}
# Create a query to get current rates
# Note: This requires knowing the table schema
sql = f"""
SELECT
asset_symbol,
asset_address,
supply_rate,
variable_borrow_rate,
stable_borrow_rate,
block_date
FROM {aave_table['full_name']}
WHERE block_date = (SELECT MAX(block_date) FROM {aave_table['full_name']})
ORDER BY supply_rate DESC
LIMIT 20
"""
result = create_query(
name=f"Aave {chain} Rates - {datetime.now().strftime('%Y-%m-%d')}",
sql=sql,
description="Current Aave lending rates",
is_temp=True
)
return {
"table": aave_table["full_name"],
"query_result": result
}
def unified_search(
min_apy: float = 0,
chain: str = "Ethereum",
min_tvl: float = 0,
sources: List[str] = None,
limit: int = 20,
include_risk_signals: bool = True
) -> Dict[str, Any]:
"""
Search across all configured data sources.
v0.2.0: Removed max_risk filter (LLM will analyze risk)
Args:
min_apy: Minimum APY percentage
chain: Blockchain to search
min_tvl: Minimum TVL in USD
sources: Data sources to use (defillama, dune)
limit: Maximum results per source
include_risk_signals: Include risk signals for LLM analysis
Returns:
Dict with opportunities from each source
"""
if sources is None:
sources = ["defillama", "dune"]
results = {
"generated_at": datetime.now().isoformat(),
"version": "0.2.0",
"parameters": {
"min_apy": min_apy,
"chain": chain,
"min_tvl": min_tvl,
"sources": sources
},
"sources": {}
}
# DefiLlama - Real-time yields with actionable addresses
if "defillama" in sources:
try:
opps = find_opportunities(
min_apy=min_apy,
chain=chain,
min_tvl=min_tvl,
limit=limit,
include_risk_signals=include_risk_signals
)
results["sources"]["defillama"] = {
"status": "success",
"count": len(opps),
"opportunities": opps
}
except Exception as e:
results["sources"]["defillama"] = {
"status": "error",
"error": str(e)
}
# Dune - Data tables for custom analysis
if "dune" in sources:
try:
tables = search_dune_tables(
query=f"yield lending {chain}",
chain=chain.lower(),
limit=limit
)
results["sources"]["dune"] = {
"status": "success",
"count": len(tables),
"tables": tables
}
except Exception as e:
results["sources"]["dune"] = {
"status": "error",
"error": str(e)
}
return results
def format_unified_report(results: Dict) -> str:
"""Format unified search results as markdown report (v0.2.0)."""
lines = [
"# 💰 Unified Investment Search Results",
f"\n**Generated**: {results['generated_at']}",
f"**Version**: {results.get('version', 'unknown')}",
f"**Parameters**: min_apy={results['parameters']['min_apy']}%, "
f"chain={results['parameters']['chain']}",
"\n---\n"
]
# DefiLlama results
if "defillama" in results["sources"]:
source = results["sources"]["defillama"]
lines.append("## 📊 DefiLlama - Real-time Yields\n")
if source["status"] == "success" and source.get("opportunities"):
for opp in source["opportunities"]:
# Actionable status
actionable = opp.get("actionable_addresses", {})
action_status = "✅ Ready" if actionable.get("has_actionable_address") else "⚠️ Needs lookup"
lines.extend([
f"### {opp.get('protocol', 'N/A')} - {opp.get('symbol', 'N/A')}",
f"- **APY**: {opp.get('apy', 0):.2f}%",
f"- **TVL**: ${opp.get('tvl_usd', 0):,.0f}",
f"- **Chain**: {opp.get('chain', 'N/A')}",
f"- **Actionable**: {action_status}",
f"- **Link**: {opp.get('url', 'N/A')}",
""
])
# Risk signals summary
risk = opp.get("risk_signals", {})
if risk:
lines.append(f"**Risk Signals**: Audited={'✅' if risk.get('has_audit') else '❌'}, "
f"Known={'✅' if risk.get('known_protocol') else '❌'}, "
f"APY Type={risk.get('apy_composition', 'unknown')}")
lines.append("")
else:
lines.append(f"Status: {source.get('status', 'unknown')}")
if source.get("error"):
lines.append(f"Error: {source['error']}")
lines.append("")
# Dune results
if "dune" in results["sources"]:
source = results["sources"]["dune"]
lines.append("## 🧙 Dune Analytics - Data Tables\n")
if source["status"] == "success" and source.get("tables"):
lines.append("Available tables for custom analysis:\n")
for table in source["tables"]:
chains = ", ".join(table.get("blockchains", [])[:3])
lines.append(f"- `{table['full_name']}` ({table.get('category', 'N/A')}) - Chains: {chains}")
else:
lines.append(f"Status: {source.get('status', 'unknown')}")
if source.get("error"):
lines.append(f"Error: {source['error']}")
lines.append("")
return "\n".join(lines)
def format_llm_ready_output(results: Dict) -> str:
"""
Format results as LLM-ready prompt for risk analysis.
Optimized for LLM consumption with structured data.
"""
lines = [
"# DeFi Investment Search Results - LLM Analysis Request",
f"\nGenerated: {results['generated_at']}",
"",
"## Task",
"Analyze the following investment opportunities and provide:",
"1. Risk assessment (Low/Medium/High) for each opportunity",
"2. Key risk factors and warnings",
"3. Recommended execution approach",
"4. Any additional data needed for decision",
"",
"---\n"
]
# DefiLlama opportunities
if "defillama" in results["sources"]:
source = results["sources"]["defillama"]
if source["status"] == "success" and source.get("opportunities"):
lines.append("## Opportunities from DefiLlama\n")
for i, opp in enumerate(source["opportunities"], 1):
lines.append(f"### Opportunity {i}")
lines.append("```json")
lines.append(json.dumps(opp, indent=2, ensure_ascii=False))
lines.append("```\n")
# Dune tables
if "dune" in results["sources"]:
source = results["sources"]["dune"]
if source["status"] == "success" and source.get("tables"):
lines.append("## Available Dune Data Tables\n")
for table in source["tables"]:
lines.append(f"- `{table['full_name']}`: {table.get('description', 'No description')}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Unified search across DefiLlama and Dune (v0.2.0 - LLM-ready)"
)
parser.add_argument("--min-apy", type=float, default=0, help="Minimum APY %%")
parser.add_argument("--chain", default="Ethereum", help="Blockchain to search")
parser.add_argument("--min-tvl", type=float, default=0, help="Minimum TVL in USD")
parser.add_argument("--limit", "-n", type=int, default=20, help="Results per source")
parser.add_argument("--source", action="append", choices=["defillama", "dune"],
help="Specific sources to use (default: all)")
parser.add_argument("--output", "-o", choices=["markdown", "json"], default="markdown",
help="Output format")
parser.add_argument("--llm-ready", action="store_true",
help="Output LLM-ready prompt for risk analysis")
parser.add_argument("--no-risk-signals", action="store_true",
help="Exclude risk signals from output")
parser.add_argument("--dune-query", type=str, help="Custom Dune table search query")
args = parser.parse_args()
# Custom Dune search
if args.dune_query:
tables = search_dune_tables(
query=args.dune_query,
chain=args.chain.lower(),
limit=args.limit
)
if args.output == "json":
print(json.dumps(tables, indent=2))
else:
for t in tables:
print(f"{t['full_name']:40} | {t['category']:10} | {', '.join(t['blockchains'][:2])}")
return
# Unified search
sources = args.source if args.source else None
results = unified_search(
min_apy=args.min_apy,
chain=args.chain,
min_tvl=args.min_tvl,
sources=sources,
limit=args.limit,
include_risk_signals=not args.no_risk_signals
)
# Output formatting
if args.llm_ready:
print(format_llm_ready_output(results))
elif args.output == "json":
print(json.dumps(results, indent=2, ensure_ascii=False))
else:
print(format_unified_report(results))
if __name__ == "__main__":
main()
```
### scripts/portfolio/indexer.py
```python
#!/usr/bin/env python3
"""
On-chain portfolio indexer with blockchain explorer API support.
Usage:
python indexer.py --address 0x...
python indexer.py --address 0x... --chain base
python indexer.py --address 0x... --output json
"""
import argparse
import json
import os
import sys
import urllib.request
from datetime import datetime
# ============================================================================
# Chain Configurations
# ============================================================================
CHAIN_CONFIGS = {
"ethereum": {
"name": "Ethereum",
"chain_id": 1,
"rpcs": ["https://cloudflare-eth.com", "https://mainnet.gateway.fm"],
"alchemy_template": "https://eth-mainnet.g.alchemy.com/v2/{api_key}",
"explorer_api": "https://api.etherscan.io/v2/api",
"explorer_env_key": "ETHERSCAN_API_KEY",
},
"base": {
"name": "Base",
"chain_id": 8453,
"rpcs": ["https://mainnet.base.org", "https://base.llamarpc.com"],
"alchemy_template": "https://base-mainnet.g.alchemy.com/v2/{api_key}",
"explorer_api": "https://api.basescan.org/api",
"explorer_env_key": "BASESCAN_API_KEY",
},
}
# Price symbol to CoinGecko ID mapping
SYMBOL_TO_COINGECKO = {
"ETH": "ethereum", "WETH": "ethereum", "USDC": "usd-coin",
"USDT": "tether", "DAI": "dai", "LINK": "chainlink",
"UNI": "uniswap", "AAVE": "aave", "stETH": "staked-ether",
}
# ============================================================================
# Blockchain Explorer API (Primary - Requires Free API Key)
# ============================================================================
def get_explorer_api_key(chain: str) -> str:
"""Get explorer API key from environment or config."""
config = CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])
env_key = config.get("explorer_env_key", "ETHERSCAN_API_KEY")
# Check environment variable
api_key = os.environ.get(env_key) or os.environ.get("ETHERSCAN_API_KEY")
if api_key:
return api_key
# Check config file
cfg = get_config()
return cfg.get("portfolio", {}).get("etherscan_api_key", "")
def get_balance_from_explorer(address: str, chain: str = "ethereum") -> float:
"""Get native balance using blockchain explorer API."""
config = CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])
api_key = get_explorer_api_key(chain)
if not api_key:
print(f"⚠️ 未配置 {chain} 链的区块链浏览器 API Key", file=sys.stderr)
return 0
# Etherscan V2 API format
url = f"{config['explorer_api']}?chainid={config['chain_id']}&module=account&action=balance&address={address}&apikey={api_key}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "Web3-Investor/0.3.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode())
if data.get("status") == "1":
return int(data.get("result", "0")) / 1e18
else:
print(f"⚠️ Explorer API 返回错误: {data.get('message', 'Unknown')}", file=sys.stderr)
except Exception as e:
print(f"⚠️ Explorer API 请求失败: {e}", file=sys.stderr)
return 0
def get_tokens_from_explorer(address: str, chain: str = "ethereum") -> list:
"""Get token list using blockchain explorer API."""
config = CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])
api_key = get_explorer_api_key(chain)
if not api_key:
return []
url = f"{config['explorer_api']}?chainid={config['chain_id']}&module=account&action=tokenlist&address={address}&apikey={api_key}"
tokens = []
try:
req = urllib.request.Request(url, headers={"User-Agent": "Web3-Investor/0.3.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode())
if data.get("status") == "1" and data.get("result"):
for t in data["result"]:
try:
decimals = int(t.get("tokenDecimal", 18))
balance = int(t.get("value", "0")) / (10 ** decimals)
if balance > 0:
tokens.append({
"address": t.get("contractAddress", ""),
"symbol": t.get("tokenSymbol", "?"),
"name": t.get("tokenName", "?"),
"balance": balance,
"decimals": decimals
})
except (ValueError, TypeError):
continue
except Exception as e:
print(f"⚠️ Token list 请求失败: {e}", file=sys.stderr)
return tokens
# ============================================================================
# RPC Fallback (Secondary)
# ============================================================================
def get_rpc_url(chain: str) -> str:
"""Get RPC URL with environment/config fallback."""
env_var = f"WEB3_INVESTOR_{chain.upper()}_RPC_URL"
rpc = os.environ.get(env_var) or os.environ.get("WEB3_INVESTOR_RPC_URL")
if rpc:
return rpc
alchemy_key = os.environ.get("ALCHEMY_API_KEY")
if alchemy_key:
return CHAIN_CONFIGS[chain]["alchemy_template"].format(api_key=alchemy_key)
return CHAIN_CONFIGS[chain]["rpcs"][0]
def get_balance_rpc(address: str, chain: str) -> float:
"""Get native balance via RPC (fallback)."""
try:
payload = {"jsonrpc": "2.0", "method": "eth_getBalance", "params": [address, "latest"], "id": 1}
req = urllib.request.Request(get_rpc_url(chain), data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read().decode())
return int(result.get("result", "0x0"), 16) / 1e18
except Exception as e:
print(f"⚠️ RPC error: {e}", file=sys.stderr)
return 0
# ============================================================================
# Prices
# ============================================================================
def get_prices(tokens: list) -> dict:
"""Get prices from CoinGecko."""
ids = {SYMBOL_TO_COINGECKO.get(t.get("symbol", "").upper()) for t in tokens}
ids.discard(None)
if not ids:
return {}
try:
url = f"https://api.coingecko.com/api/v3/simple/price?ids={','.join(ids)}&vs_currencies=usd"
with urllib.request.urlopen(url, timeout=15) as resp:
return json.loads(resp.read().decode())
except Exception as e:
print(f"⚠️ Price error: {e}", file=sys.stderr)
return {}
# ============================================================================
# Portfolio Builder
# ============================================================================
def build_portfolio(address: str, chain: str, native: float, tokens: list, source: str) -> dict:
"""Build portfolio response."""
prices = get_prices(tokens)
eth_price = prices.get("ethereum", {}).get("usd", 2000)
total = native * eth_price
for t in tokens:
price_id = SYMBOL_TO_COINGECKO.get(t.get("symbol", "").upper())
t["price_usd"] = prices.get(price_id, {}).get("usd", 0) if price_id else 0
t["value_usd"] = t.get("balance", 0) * t.get("price_usd", 0)
total += t["value_usd"]
all_tokens = [{
"address": "0x0000000000000000000000000000000000000000",
"symbol": "ETH", "name": "Ethereum",
"balance": native, "price_usd": eth_price, "value_usd": native * eth_price
}] + [t for t in tokens if t.get("value_usd", 0) > 0.01]
all_tokens.sort(key=lambda x: x.get("value_usd", 0), reverse=True)
return {
"address": address.lower(), "chain": chain,
"timestamp": datetime.utcnow().isoformat() + "Z",
"total_value_usd": round(total, 2), "tokens": all_tokens,
"defi_positions": [], "expiring_positions": [], "data_source": source
}
def get_portfolio(address: str, chain: str = "ethereum") -> dict:
"""Get portfolio with automatic method selection (explorer first, then RPC)."""
chain = chain.lower()
# Try explorer API first
print(f"🔍 查询 {chain} 链资产(使用区块链浏览器)...", file=sys.stderr)
native = get_balance_from_explorer(address, chain)
tokens = get_tokens_from_explorer(address, chain)
if native > 0 or tokens:
print(f"✅ 区块链浏览器查询成功!", file=sys.stderr)
return build_portfolio(address, chain, native, tokens, "explorer")
# Fallback to RPC
print(f"⚠️ 浏览器无数据,使用 RPC 节点...", file=sys.stderr)
native = get_balance_rpc(address, chain)
return build_portfolio(address, chain, native, [], "rpc")
# ============================================================================
# Output Formatting
# ============================================================================
def format_report(portfolio: dict, fmt: str = "markdown") -> str:
"""Format portfolio as report."""
if fmt == "json":
return json.dumps(portfolio, indent=2, ensure_ascii=False)
lines = [
f"# 📊 投资组合报告",
f"\n| 项目 | 值 |", f"|------|-----|",
f"| 地址 | `{portfolio['address']}` |",
f"| 链 | {portfolio['chain'].title()} |",
f"| 总价值 | **${portfolio['total_value_usd']:,.2f}** |",
f"| 数据源 | {portfolio['data_source']} |",
f"\n---\n", "## 🪙 代币持仓\n",
"| 代币 | 余额 | 价格 | 价值 |", "|------|------|------|------|"
]
for t in portfolio.get("tokens", []):
lines.append(f"| {t.get('symbol', '?')} | {t.get('balance', 0):.4f} | ${t.get('price_usd', 0):.2f} | ${t.get('value_usd', 0):,.2f} |")
return "\n".join(lines)
# ============================================================================
# Main
# ============================================================================
def main():
parser = argparse.ArgumentParser(description="Portfolio indexer")
parser.add_argument("--address", required=True, help="Wallet address")
parser.add_argument("--chain", default="ethereum", choices=["ethereum", "base"])
parser.add_argument("--output", choices=["markdown", "json"], default="markdown")
args = parser.parse_args()
portfolio = get_portfolio(args.address, args.chain)
print(format_report(portfolio, args.output))
if __name__ == "__main__":
main()
```
### scripts/schemas/output_schema.py
```python
"""
Output Schema Module
Provides standardized output formats for all transaction operations.
Inspired by xaut-trade's structured output design.
Key Features:
- Consistent structure across swap/transfer/deposit operations
- Clear stage indication (preview/ready_to_execute)
- Risk warnings and confirmation requirements
- Machine-readable JSON and human-readable text output
"""
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Any
from enum import Enum
from datetime import datetime
import uuid
import json
class Stage(str, Enum):
"""Transaction stage."""
PREVIEW = "preview"
READY_TO_EXECUTE = "ready_to_execute"
EXECUTED = "executed"
FAILED = "failed"
class RiskLevel(str, Enum):
"""Risk severity level."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
@dataclass
class RiskWarning:
"""Risk warning item."""
type: str
level: RiskLevel
message: str
details: Optional[Dict[str, Any]] = None
@dataclass
class InputInfo:
"""Transaction input information."""
token: str
amount: str
chain: str
token_out: Optional[str] = None # For swap
protocol: Optional[str] = None # For deposit
@dataclass
class QuoteInfo:
"""Quote information (varies by transaction type)."""
estimated_output: Optional[str] = None
rate: Optional[str] = None
slippage_bps: Optional[int] = None
min_amount_out: Optional[str] = None
gas_estimate: Optional[str] = None
apy: Optional[str] = None # For deposit/staking
@dataclass
class PreviewOutput:
"""
Standardized preview output format.
Used for all transaction types: swap, transfer, deposit.
"""
stage: Stage
request_id: str
timestamp: str
# Input
input: InputInfo
# Quote (varies by type)
quote: QuoteInfo
# Risk warnings
risk_warnings: List[RiskWarning]
# Confirmation
requires_double_confirm: bool = False
double_confirm_reasons: List[str] = field(default_factory=list)
confirm_instruction: Optional[str] = None
# Navigation
next_step: str = "approve"
# Raw data (for debugging)
raw_preview_id: Optional[str] = None
raw_transaction: Optional[Dict] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result = {
"stage": self.stage.value,
"request_id": self.request_id,
"timestamp": self.timestamp,
"input": asdict(self.input),
"quote": asdict(self.quote),
"risk_warnings": [
{
"type": w.type,
"level": w.level.value,
"message": w.message,
**({"details": w.details} if w.details else {})
}
for w in self.risk_warnings
],
"requires_double_confirm": self.requires_double_confirm,
"next_step": self.next_step
}
if self.requires_double_confirm:
result["double_confirm_reasons"] = self.double_confirm_reasons
if self.confirm_instruction:
result["confirm_instruction"] = self.confirm_instruction
if self.raw_preview_id:
result["preview_id"] = self.raw_preview_id
return result
def to_json(self, indent: int = 2) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
def to_text(self) -> str:
"""Convert to human-readable text."""
lines = []
# Header
lines.append(f"📋 Preview ID: {self.raw_preview_id or self.request_id}")
lines.append(f" Stage: {self.stage.value}")
lines.append("")
# Input
lines.append(" Input:")
lines.append(f" Token: {self.input.token}")
lines.append(f" Amount: {self.input.amount}")
lines.append(f" Chain: {self.input.chain}")
if self.input.token_out:
lines.append(f" → Output: {self.input.token_out}")
if self.input.protocol:
lines.append(f" Protocol: {self.input.protocol}")
lines.append("")
# Quote
if self.quote.estimated_output or self.quote.rate:
lines.append(" Quote:")
if self.quote.rate:
lines.append(f" Rate: {self.quote.rate}")
if self.quote.estimated_output:
lines.append(f" Estimated Output: {self.quote.estimated_output}")
if self.quote.slippage_bps:
lines.append(f" Slippage: {self.quote.slippage_bps} bps")
if self.quote.min_amount_out:
lines.append(f" Min Output: {self.quote.min_amount_out}")
if self.quote.gas_estimate:
lines.append(f" Gas Estimate: {self.quote.gas_estimate}")
if self.quote.apy:
lines.append(f" APY: {self.quote.apy}")
lines.append("")
# Risk warnings
if self.risk_warnings:
lines.append(" ⚠️ Risk Warnings:")
for w in self.risk_warnings:
level_emoji = {"low": "🟢", "medium": "🟡", "high": "🔴"}[w.level.value]
lines.append(f" {level_emoji} {w.type}: {w.message}")
lines.append("")
# Double confirm
if self.requires_double_confirm:
lines.append(" 🔒 Double Confirmation Required:")
for reason in self.double_confirm_reasons:
lines.append(f" - {reason}")
if self.confirm_instruction:
lines.append(f" {self.confirm_instruction}")
lines.append("")
# Next step
lines.append(f" Next Step: {self.next_step}")
return "\n".join(lines)
@dataclass
class ApprovalResult:
"""Standardized approval result."""
success: bool
approval_id: Optional[str] = None
preview_id: Optional[str] = None
approved_at: Optional[str] = None
expires_at: Optional[str] = None
error: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result = {
"success": self.success,
"approval_id": self.approval_id,
"preview_id": self.preview_id,
"approved_at": self.approved_at
}
if self.expires_at:
result["expires_at"] = self.expires_at
if self.error:
result["error"] = self.error
return result
def to_json(self, indent: int = 2) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
def to_text(self) -> str:
"""Convert to human-readable text."""
if self.success:
lines = [
f"✅ Approval ID: {self.approval_id}",
f" Preview ID: {self.preview_id}",
f" Approved At: {self.approved_at}",
f" Next Step: execute --approval-id {self.approval_id}"
]
else:
error = self.error or {}
lines = [
f"❌ Approval Failed",
f" Error: {error.get('message', 'Unknown')}"
]
return "\n".join(lines)
@dataclass
class ExecutionResult:
"""Standardized execution result."""
success: bool
tx_hash: Optional[str] = None
explorer_url: Optional[str] = None
network: Optional[str] = None
executed_at: Optional[str] = None
error: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result = {
"success": self.success,
"tx_hash": self.tx_hash,
"network": self.network,
"executed_at": self.executed_at
}
if self.explorer_url:
result["explorer_url"] = self.explorer_url
if self.error:
result["error"] = self.error
return result
def to_json(self, indent: int = 2) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
def to_text(self) -> str:
"""Convert to human-readable text."""
if self.success:
lines = [
"🚀 Transaction Submitted!",
f" TX Hash: {self.tx_hash}",
f" Network: {self.network}",
f" Explorer: {self.explorer_url}"
]
else:
error = self.error or {}
lines = [
"❌ Execution Failed",
f" Error: {error.get('message', 'Unknown')}"
]
return "\n".join(lines)
# Factory functions
def create_preview_output(
input_info: InputInfo,
quote_info: QuoteInfo,
risk_warnings: List[RiskWarning],
requires_double_confirm: bool = False,
double_confirm_reasons: List[str] = None,
confirm_instruction: str = None,
preview_id: str = None,
raw_transaction: Dict = None
) -> PreviewOutput:
"""Create a standardized preview output."""
return PreviewOutput(
stage=Stage.PREVIEW,
request_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat() + "Z",
input=input_info,
quote=quote_info,
risk_warnings=risk_warnings,
requires_double_confirm=requires_double_confirm,
double_confirm_reasons=double_confirm_reasons or [],
confirm_instruction=confirm_instruction,
next_step="approve",
raw_preview_id=preview_id,
raw_transaction=raw_transaction
)
def create_approval_result(
success: bool,
approval_id: str = None,
preview_id: str = None,
error: Dict = None
) -> ApprovalResult:
"""Create a standardized approval result."""
return ApprovalResult(
success=success,
approval_id=approval_id,
preview_id=preview_id,
approved_at=datetime.utcnow().isoformat() + "Z" if success else None,
error=error
)
def create_execution_result(
success: bool,
tx_hash: str = None,
network: str = None,
explorer_url: str = None,
error: Dict = None
) -> ExecutionResult:
"""Create a standardized execution result."""
return ExecutionResult(
success=success,
tx_hash=tx_hash,
network=network,
explorer_url=explorer_url,
executed_at=datetime.utcnow().isoformat() + "Z" if success else None,
error=error
)
if __name__ == "__main__":
# Test output
preview = create_preview_output(
input_info=InputInfo(
token="USDC",
amount="10000",
chain="base",
token_out="WETH"
),
quote_info=QuoteInfo(
estimated_output="5.0",
rate="1 WETH = 2000 USDC",
slippage_bps=50,
min_amount_out="4.975"
),
risk_warnings=[
RiskWarning(
type="large_trade",
level=RiskLevel.MEDIUM,
message="Large trade size, consider price impact"
)
],
requires_double_confirm=True,
double_confirm_reasons=["Large trade: 10000 >= threshold 5000"],
confirm_instruction='To approve, use: --confirm "CONFIRM_HIGH_RISK"',
preview_id="test-preview-id-123"
)
print("=== JSON Output ===")
print(preview.to_json())
print("\n=== Text Output ===")
print(preview.to_text())
```
### scripts/trading/safe_vault.py
```python
#!/usr/bin/env python3
"""
Safe Vault - Secure transaction signing and execution (v0.2.0)
Features:
- Balance pre-check before transaction preparation
- Deposit preview for Aave/Compound/Lido
- Whitelist and limit protection
- Local signing capability
Usage:
python safe_vault.py --check-balance --address 0x... --chain ethereum
python safe_vault.py --preview-deposit --protocol aave --asset USDC --amount 1000
python safe_vault.py --prepare-tx --to 0x... --value 0 --data 0x...
"""
import argparse
import json
import os
import sys
import uuid
from datetime import datetime
from typing import Optional, Dict, Any, List
import urllib.request
import urllib.error
# ============================================================================
# Configuration
# ============================================================================
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "config.json")
WHITELIST_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "whitelist.json")
PROTOCOLS_PATH = os.path.join(SCRIPT_DIR, "..", "..", "config", "protocols.json")
# Chain configurations
CHAIN_CONFIGS = {
"ethereum": {
"chain_id": 1,
"rpcs": ["https://cloudflare-eth.com", "https://mainnet.gateway.fm"],
"explorer_api": "https://api.etherscan.io/v2/api",
"explorer_env_key": "ETHERSCAN_API_KEY",
"native_token": "ETH"
},
"base": {
"chain_id": 8453,
"rpcs": ["https://mainnet.base.org"],
"explorer_api": "https://api.basescan.org/api",
"explorer_env_key": "BASESCAN_API_KEY",
"native_token": "ETH"
}
}
# Known token addresses (Ethereum mainnet)
KNOWN_TOKENS = {
"USDC": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"USDT": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
"DAI": "0x6B175474E89094C44Da98b954EesdeeAC495271d0F",
"WETH": "0xC02aaA39b223FE8D0A0e5C4F27ead9083C756Cc2",
"stETH": "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84",
"rETH": "0xae78736Cd615f374D3085123A210448E74Fc6393",
}
# Protocol contract addresses (Ethereum mainnet)
PROTOCOL_CONTRACTS = {
"aave": {
"pool": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2",
"pool_proxy": "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2"
},
"compound": {
"comet": "0xc3d688B66703497DAA19211EEdff47f25384cdc3"
},
"lido": {
"steth": "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84"
}
}
# Error messages
ERROR_MESSAGES = {
"NO_PRIVATE_KEY": "❌ 无法签名交易:未配置私钥。请设置 WEB3_INVESTOR_PRIVATE_KEY 环境变量。",
"WHITELIST_BLOCKED": "⛔ 交易被拒绝:目标地址不在白名单中。",
"EXCEEDS_LIMIT": "⚠️ 交易金额超过限额。当前限额为 ${limit} USD。",
"SIMULATION_FAILED": "🔴 交易模拟失败:{reason}",
"RPC_ERROR": "📡 无法连接到区块链网络:{error}",
"INSUFFICIENT_BALANCE": "💰 余额不足:需要 {need} {token},实际余额 {balance}。",
"INVALID_ADDRESS": "📛 无效的地址格式:{address}",
"SIGNING_UNAVAILABLE": "🔐 本地签名功能不可用。请安装 eth-account。",
"UNKNOWN_PROTOCOL": "❓ 未知的协议:{protocol}。支持的协议:aave, compound, lido",
"UNKNOWN_TOKEN": "❓ 未知的代币:{token}。已知代币:USDC, USDT, DAI, WETH, stETH, rETH",
}
# ============================================================================
# Utility Functions
# ============================================================================
def load_config() -> dict:
"""Load configuration."""
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH) as f:
return json.load(f)
return {"trading": {"mode": "simulation", "whitelist_enabled": True, "default_limit_usd": 100}}
def load_whitelist() -> dict:
"""Load whitelist."""
if os.path.exists(WHITELIST_PATH):
with open(WHITELIST_PATH) as f:
return json.load(f)
return {"addresses": [], "enabled": True}
def load_protocols() -> dict:
"""Load protocol registry."""
if os.path.exists(PROTOCOLS_PATH):
with open(PROTOCOLS_PATH) as f:
return json.load(f)
return {"protocols": {}}
def get_rpc_url(chain: str) -> str:
"""Get RPC URL for chain."""
env_var = f"WEB3_INVESTOR_{chain.upper()}_RPC_URL"
rpc = os.environ.get(env_var) or os.environ.get("WEB3_INVESTOR_RPC_URL")
if rpc:
return rpc
alchemy_key = os.environ.get("ALCHEMY_API_KEY")
if alchemy_key:
return f"https://eth-mainnet.g.alchemy.com/v2/{alchemy_key}"
return CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])["rpcs"][0]
def get_chain_id(chain: str) -> int:
"""Get chain ID for chain name."""
return CHAIN_CONFIGS.get(chain, CHAIN_CONFIGS["ethereum"])["chain_id"]
# ============================================================================
# Balance Pre-Check
# ============================================================================
def get_native_balance(address: str, chain: str = "ethereum") -> float:
"""
Get native token balance (ETH) for an address.
Returns balance in ETH (not wei).
"""
rpc_url = get_rpc_url(chain)
payload = {
"jsonrpc": "2.0",
"method": "eth_getBalance",
"params": [address, "latest"],
"id": 1
}
try:
req = urllib.request.Request(
rpc_url,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read().decode())
balance_wei = int(result.get("result", "0x0"), 16)
return balance_wei / 1e18
except Exception as e:
print(f"⚠️ 无法获取余额: {e}", file=sys.stderr)
return 0
def get_erc20_balance(wallet: str, token: str, chain: str = "ethereum") -> float:
"""
Get ERC20 token balance for an address.
Args:
wallet: Wallet address
token: Token symbol or address
chain: Blockchain name
Returns:
Balance as float (normalized by decimals)
"""
# Resolve token address
if token.upper() in KNOWN_TOKENS:
token_address = KNOWN_TOKENS[token.upper()]
elif token.startswith("0x"):
token_address = token
else:
return 0
rpc_url = get_rpc_url(chain)
# ERC20 balanceOf(address) call
# selector: 0x70a08231 + padded address
data = "0x70a08231" + wallet[2:].lower().zfill(64)
payload = {
"jsonrpc": "2.0",
"method": "eth_call",
"params": [{"to": token_address, "data": data}, "latest"],
"id": 1
}
try:
req = urllib.request.Request(
rpc_url,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read().decode())
balance_raw = int(result.get("result", "0x0"), 16)
# Assume 18 decimals for most tokens, 6 for USDC/USDT
decimals = 6 if token.upper() in ["USDC", "USDT"] else 18
return balance_raw / (10 ** decimals)
except Exception as e:
print(f"⚠️ 无法获取代币余额: {e}", file=sys.stderr)
return 0
def check_balance_before_deposit(
wallet: str,
token: str,
amount: float,
chain: str = "ethereum"
) -> Dict[str, Any]:
"""
Check if wallet has sufficient balance for deposit.
Returns:
dict with sufficient (bool), balance, needed, message
"""
result = {
"sufficient": False,
"token": token,
"chain": chain,
"needed": amount,
"balance": 0,
"message": ""
}
# Check native token (ETH)
if token.upper() in ["ETH", "WETH"]:
balance = get_native_balance(wallet, chain)
result["balance"] = balance
if balance >= amount:
result["sufficient"] = True
result["message"] = f"✅ 余额充足:{balance:.4f} ETH >= {amount} ETH"
else:
result["message"] = ERROR_MESSAGES["INSUFFICIENT_BALANCE"].format(
need=amount, token="ETH", balance=balance
)
return result
# Check ERC20 token
balance = get_erc20_balance(wallet, token, chain)
result["balance"] = balance
if balance >= amount:
result["sufficient"] = True
result["message"] = f"✅ 余额充足:{balance:.4f} {token} >= {amount} {token}"
else:
result["message"] = ERROR_MESSAGES["INSUFFICIENT_BALANCE"].format(
need=amount, token=token, balance=balance
)
return result
# ============================================================================
# Deposit Preview (Calldata Generation)
# ============================================================================
def encode_uint256(value: int) -> str:
"""Encode uint256 for calldata."""
return hex(value)[2:].zfill(64)
def encode_address(address: str) -> str:
"""Encode address for calldata."""
return address[2:].lower().zfill(64)
def generate_aave_deposit_calldata(
asset: str,
amount: float,
on_behalf_of: str = None,
referral_code: int = 0
) -> Dict[str, Any]:
"""
Generate calldata for Aave V3 supply (deposit).
Function: supply(address asset, uint256 amount, address onBehalfOf, uint16 referralCode)
Selector: 0x617ba037
"""
# Resolve asset address
if asset.upper() in KNOWN_TOKENS:
asset_address = KNOWN_TOKENS[asset.upper()]
elif asset.startswith("0x"):
asset_address = asset
else:
return {"error": ERROR_MESSAGES["UNKNOWN_TOKEN"].format(token=asset)}
# Convert amount to wei (assume 18 decimals, 6 for stablecoins)
decimals = 6 if asset.upper() in ["USDC", "USDT"] else 18
amount_wei = int(amount * (10 ** decimals))
# onBehalfOf defaults to caller (we'll use a placeholder)
on_behalf = on_behalf_of or "0x0000000000000000000000000000000000000001"
# Build calldata
selector = "617ba037" # supply(address,uint256,address,uint16)
calldata = "0x" + selector + \
encode_address(asset_address) + \
encode_uint256(amount_wei) + \
encode_address(on_behalf) + \
encode_uint256(referral_code)
return {
"protocol": "aave",
"method": "supply",
"to": PROTOCOL_CONTRACTS["aave"]["pool"],
"asset": asset_address,
"amount": amount,
"amount_wei": amount_wei,
"calldata": calldata,
"description": f"Deposit {amount} {asset} to Aave V3"
}
def generate_compound_deposit_calldata(
asset: str,
amount: float
) -> Dict[str, Any]:
"""
Generate calldata for Compound V3 supply (deposit).
Function: supply(address asset, uint256 amount)
Selector: 0xd0e30db0 (for deposit)
Note: Compound V3 uses different pattern
"""
# Compound V3 Comet uses: supply(address asset, uint256 amount)
if asset.upper() in KNOWN_TOKENS:
asset_address = KNOWN_TOKENS[asset.upper()]
elif asset.startswith("0x"):
asset_address = asset
else:
return {"error": ERROR_MESSAGES["UNKNOWN_TOKEN"].format(token=asset)}
decimals = 6 if asset.upper() in ["USDC", "USDT"] else 18
amount_wei = int(amount * (10 ** decimals))
# Compound V3 supply selector
selector = "0xe8eda9df" # supply(address,uint256)
calldata = "0x" + selector + \
encode_address(asset_address) + \
encode_uint256(amount_wei)
return {
"protocol": "compound",
"method": "supply",
"to": PROTOCOL_CONTRACTS["compound"]["comet"],
"asset": asset_address,
"amount": amount,
"amount_wei": amount_wei,
"calldata": calldata,
"description": f"Deposit {amount} {asset} to Compound V3"
}
def generate_lido_deposit_calldata(
amount: float
) -> Dict[str, Any]:
"""
Generate calldata for Lido stETH deposit.
Function: submit(address _referral) - sends ETH and receives stETH
Selector: 0x386497fd
"""
amount_wei = int(amount * 1e18)
# Lido submit selector with empty referral
selector = "386497fd" # submit(address)
calldata = "0x" + selector + encode_address("0x0000000000000000000000000000000000000000")
return {
"protocol": "lido",
"method": "submit",
"to": PROTOCOL_CONTRACTS["lido"]["steth"],
"asset": "ETH",
"amount": amount,
"amount_wei": amount_wei,
"calldata": calldata,
"value_wei": amount_wei, # Need to send ETH
"description": f"Stake {amount} ETH with Lido to receive stETH"
}
def preview_deposit(
protocol: str,
asset: str,
amount: float,
wallet: str = None,
chain: str = "ethereum"
) -> Dict[str, Any]:
"""
Generate deposit preview with calldata and balance check.
Args:
protocol: Protocol name (aave, compound, lido)
asset: Asset symbol or address
amount: Amount to deposit
wallet: Wallet address for balance check
chain: Blockchain name
Returns:
dict with calldata, balance check, and execution preview
"""
protocol_lower = protocol.lower()
# Generate calldata based on protocol
if protocol_lower == "aave":
calldata_result = generate_aave_deposit_calldata(asset, amount)
elif protocol_lower == "compound":
calldata_result = generate_compound_deposit_calldata(asset, amount)
elif protocol_lower == "lido":
calldata_result = generate_lido_deposit_calldata(amount)
else:
return {
"error": ERROR_MESSAGES["UNKNOWN_PROTOCOL"].format(protocol=protocol),
"supported_protocols": ["aave", "compound", "lido"]
}
if "error" in calldata_result:
return calldata_result
result = {
"protocol": protocol_lower,
"asset": asset,
"amount": amount,
"chain": chain,
"calldata": calldata_result,
"preview": {
"to": calldata_result["to"],
"value": calldata_result.get("value_wei", 0),
"data": calldata_result["calldata"],
"gas_estimate": 250000 # Estimated gas for deposit
}
}
# Balance check if wallet provided
if wallet:
balance_check = check_balance_before_deposit(
wallet=wallet,
token=asset,
amount=amount,
chain=chain
)
result["balance_check"] = balance_check
if not balance_check["sufficient"]:
result["can_execute"] = False
result["warning"] = balance_check["message"]
else:
result["can_execute"] = True
else:
result["balance_check"] = {"checked": False, "message": "Wallet not provided"}
result["can_execute"] = None # Unknown
return result
# ============================================================================
# Whitelist & Security
# ============================================================================
def check_whitelist(address: str, amount_usd: float = 0) -> dict:
"""Check if address is in whitelist and within limits."""
config = load_config()
whitelist = load_whitelist()
if not config["trading"].get("whitelist_enabled", True):
return {"allowed": True, "reason": "Whitelist disabled", "limit": float("inf")}
if not whitelist.get("enabled", True):
return {"allowed": True, "reason": "Whitelist disabled", "limit": float("inf")}
address_lower = address.lower()
for entry in whitelist.get("addresses", []):
if entry.get("address", "").lower() == address_lower:
limit = entry.get("max_amount_usd", config["trading"].get("default_limit_usd", 100))
if amount_usd <= limit:
return {"allowed": True, "reason": f"Within limit", "limit": limit}
else:
return {"allowed": False, "reason": f"Exceeds limit", "limit": limit}
return {"allowed": False, "reason": "Address not in whitelist", "limit": 0}
def get_eth_price_usd() -> float:
"""Get current ETH price from CoinGecko."""
try:
url = "https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd"
with urllib.request.urlopen(url, timeout=10) as response:
data = json.loads(response.read().decode())
return data["ethereum"]["usd"]
except:
return 2000
# ============================================================================
# Transaction Preparation
# ============================================================================
def prepare_transaction(
to: str,
value: str = "0",
data: str = "0x",
gas_limit: int = 300000,
description: str = "",
chain: str = "ethereum"
) -> dict:
"""Prepare a transaction for signing."""
config = load_config()
mode = config["trading"].get("mode", "simulation")
# Estimate value in USD
try:
value_eth = float(value) / 1e18
value_usd = value_eth * get_eth_price_usd()
except:
value_usd = 0
# Check whitelist
whitelist_result = check_whitelist(to, value_usd)
# Generate request
request_id = str(uuid.uuid4())
request = {
"request_id": request_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
"mode": mode,
"chain": chain,
"chain_id": get_chain_id(chain),
"transaction": {
"to": to,
"value": value,
"data": data,
"gas_limit": gas_limit,
"chain_id": get_chain_id(chain),
},
"value_usd": round(value_usd, 2),
"whitelist": whitelist_result,
"description": description,
"approval_required": mode == "simulation" or not whitelist_result["allowed"]
}
return request
# ============================================================================
# CLI
# ============================================================================
def format_preview_output(result: Dict) -> str:
"""Format deposit preview for display."""
if "error" in result:
return f"❌ 错误: {result['error']}"
lines = [
f"# 📋 存款预览 - {result['protocol'].upper()}",
"",
f"| 项目 | 值 |",
f"|------|-----|",
f"| 协议 | {result['protocol']} |",
f"| 资产 | {result['asset']} |",
f"| 金额 | {result['amount']} |",
f"| 链 | {result['chain']} |",
"",
"## 交易数据",
f"- **To**: `{result['preview']['to']}`",
f"- **Value**: {result['preview']['value']} wei",
f"- **Data**: `{result['preview']['data'][:60]}...`",
f"- **Gas Estimate**: {result['preview']['gas_estimate']:,}",
]
if result.get("balance_check"):
bc = result["balance_check"]
lines.extend([
"",
"## 余额检查",
f"- **余额**: {bc.get('balance', 'N/A')} {bc.get('token', '')}",
f"- **状态**: {bc.get('message', 'N/A')}",
])
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Safe Vault - Secure transaction manager (v0.2.0)",
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Check balance
balance_parser = subparsers.add_parser("balance", help="Check wallet balance")
balance_parser.add_argument("--wallet", "-w", required=True, help="Wallet address")
balance_parser.add_argument("--token", "-t", default="ETH", help="Token symbol or address")
balance_parser.add_argument("--chain", "-c", default="ethereum", help="Blockchain")
balance_parser.add_argument("--json", action="store_true", help="JSON output")
# Preview deposit
preview_parser = subparsers.add_parser("preview-deposit", help="Preview deposit transaction")
preview_parser.add_argument("--protocol", "-p", required=True,
choices=["aave", "compound", "lido"],
help="Protocol name")
preview_parser.add_argument("--asset", "-a", required=True, help="Asset symbol")
preview_parser.add_argument("--amount", "-m", type=float, required=True, help="Amount to deposit")
preview_parser.add_argument("--wallet", "-w", help="Wallet for balance check")
preview_parser.add_argument("--chain", "-c", default="ethereum", help="Blockchain")
preview_parser.add_argument("--json", action="store_true", help="JSON output")
# Prepare transaction
tx_parser = subparsers.add_parser("prepare-tx", help="Prepare transaction")
tx_parser.add_argument("--to", required=True, help="Target address")
tx_parser.add_argument("--value", default="0", help="Value in wei")
tx_parser.add_argument("--data", default="0x", help="Calldata")
tx_parser.add_argument("--chain", default="ethereum", help="Blockchain")
tx_parser.add_argument("--json", action="store_true", help="JSON output")
# Check signing capability
subparsers.add_parser("check-signing", help="Check local signing capability")
args = parser.parse_args()
if args.command == "balance":
result = check_balance_before_deposit(
wallet=args.wallet,
token=args.token,
amount=0, # Just checking balance
chain=args.chain
)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"💰 {args.token} 余额: {result['balance']:.6f}")
elif args.command == "preview-deposit":
result = preview_deposit(
protocol=args.protocol,
asset=args.asset,
amount=args.amount,
wallet=args.wallet,
chain=args.chain
)
if args.json:
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(format_preview_output(result))
elif args.command == "prepare-tx":
result = prepare_transaction(
to=args.to,
value=args.value,
data=args.data,
chain=args.chain
)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"✅ 交易已准备")
print(f" To: {result['transaction']['to']}")
print(f" Value: {result['transaction']['value']} wei (${result['value_usd']})")
elif args.command == "check-signing":
try:
from eth_account import Account
pk = os.environ.get("WEB3_INVESTOR_PRIVATE_KEY")
print("✅ eth-account 已安装")
print(f" 私钥配置: {'✅ 是' if pk else '❌ 否'}")
except ImportError:
print("❌ eth-account 未安装")
print(" 运行: pip install eth-account")
else:
parser.print_help()
if __name__ == "__main__":
main()
```
### scripts/trading/simulate_tx.py
```python
#!/usr/bin/env python3
"""
Simulate a transaction before execution.
Usage:
python simulate_tx.py --to 0x... --value 0 --data 0x...
python simulate_tx.py --to 0x... --data 0x... --output json
"""
import argparse
import json
import sys
import os
# Add parent to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from trading.safe_vault import prepare_transaction, format_signing_request
def main():
parser = argparse.ArgumentParser(description="Simulate Ethereum transaction")
parser.add_argument("--to", required=True, help="Target address")
parser.add_argument("--value", default="0", help="Value in wei")
parser.add_argument("--data", default="0x", help="Transaction calldata")
parser.add_argument("--gas-limit", type=int, default=300000, help="Gas limit")
parser.add_argument("--output", choices=["text", "json"], default="text", help="Output format")
args = parser.parse_args()
request = prepare_transaction(
to=args.to,
value=args.value,
data=args.data,
gas_limit=args.gas_limit
)
if args.output == "json":
print(json.dumps(request, indent=2))
else:
print(format_signing_request(request))
if __name__ == "__main__":
main()
```
### scripts/utils/preflight.py
```python
"""
Pre-flight Checks Module
Provides environment validation before executing transactions.
Supports multiple check types that can be configured per transaction type.
"""
import os
import json
import requests
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from urllib.parse import urljoin
try:
from .rpc_manager import RPCManager
RPC_MANAGER_AVAILABLE = True
except ImportError:
try:
from utils.rpc_manager import RPCManager
RPC_MANAGER_AVAILABLE = True
except ImportError:
RPC_MANAGER_AVAILABLE = False
class CheckSeverity(Enum):
"""Severity levels for check results."""
CRITICAL = "critical" # Hard stop, cannot proceed
WARNING = "warning" # Warning, can proceed with caution
INFO = "info" # Informational only
@dataclass
class CheckResult:
"""Result of a single pre-flight check."""
name: str
passed: bool
severity: CheckSeverity
message: str
fix_hint: str = ""
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class PreflightReport:
"""Complete pre-flight check report."""
all_passed: bool
critical_passed: bool
results: List[CheckResult]
summary: str = ""
class PreflightChecker:
"""
Pre-flight checker for transaction execution.
Usage:
checker = PreflightChecker(config)
report = checker.run(transaction_type="swap", params={...})
if not report.all_passed:
raise PreflightError(report)
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.checks_registry = self._build_checks_registry()
def _build_checks_registry(self) -> Dict[str, Callable]:
"""Build registry of available check functions."""
return {
"signer_api": self._check_signer_api,
"rpc_reachable": self._check_rpc_reachable,
"gas_balance": self._check_gas_balance,
"token_balance": self._check_token_balance,
}
def run(
self,
transaction_type: str,
params: Optional[Dict[str, Any]] = None
) -> PreflightReport:
"""
Run pre-flight checks for a transaction.
Args:
transaction_type: Type of transaction (swap, transfer, deposit, etc.)
params: Transaction parameters (token, amount, network, etc.)
Returns:
PreflightReport with all check results
"""
params = params or {}
# Get check list for this transaction type from config
check_list = self._get_check_list(transaction_type)
results = []
for check_name in check_list:
check_fn = self.checks_registry.get(check_name)
if check_fn:
try:
result = check_fn(params)
except Exception as e:
result = CheckResult(
name=check_name,
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"Check failed with exception: {str(e)}",
fix_hint="Check logs for details"
)
results.append(result)
# Determine overall status
critical_results = [r for r in results if r.severity == CheckSeverity.CRITICAL]
critical_passed = all(r.passed for r in critical_results)
all_passed = all(r.passed for r in results)
summary = self._build_summary(results, critical_passed)
return PreflightReport(
all_passed=all_passed,
critical_passed=critical_passed,
results=results,
summary=summary
)
def _get_check_list(self, transaction_type: str) -> List[str]:
"""Get list of checks to run for a transaction type."""
preflight_config = self.config.get("preflight", {})
# Get checks for this specific type
checks_config = preflight_config.get("checks", {})
type_checks = checks_config.get(transaction_type)
if type_checks:
return type_checks
# Fall back to swap checks
return checks_config.get("swap", ["api", "rpc"])
def _build_summary(self, results: List[CheckResult], critical_passed: bool) -> str:
"""Build human-readable summary of check results."""
passed = sum(1 for r in results if r.passed)
total = len(results)
if critical_passed:
return f"Pre-flight checks: {passed}/{total} passed"
failed_critical = [r for r in results if not r.passed and r.severity == CheckSeverity.CRITICAL]
failed_names = ", ".join(r.name for r in failed_critical)
return f"Pre-flight checks failed: {failed_names}"
# ==================== Individual Check Functions ====================
def _check_signer_api(self, params: Dict[str, Any]) -> CheckResult:
"""Check if signer API is reachable."""
# Support both old and new config paths
api_config = self.config.get("api", {})
api_url = api_config.get("url") or api_config.get("url", "")
timeout = api_config.get("timeout_seconds", 30)
# Handle environment variable syntax ${VAR:default}
if api_url and api_url.startswith("${"):
import os
import re
match = re.match(r'\$\{([^:}]+):?(.*?)\}', api_url)
if match:
env_var = match.group(1)
default = match.group(2)
api_url = os.environ.get(env_var, default)
if not api_url:
return CheckResult(
name="signer_api",
passed=False,
severity=CheckSeverity.CRITICAL,
message="API URL not configured",
fix_hint="Set WEB3_INVESTOR_API_URL environment variable or update config.json"
)
try:
# Try to call health check or balances endpoint
health_url = urljoin(api_url, "/wallet/balances")
response = requests.get(health_url, timeout=timeout)
if response.status_code in [200, 401]: # 401 is OK (auth required but service up)
return CheckResult(
name="signer_api",
passed=True,
severity=CheckSeverity.CRITICAL,
message="Signer API is reachable",
details={"url": api_url, "status": response.status_code}
)
else:
return CheckResult(
name="signer_api",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"Signer API returned unexpected status: {response.status_code}",
fix_hint="Check signer service status"
)
except requests.exceptions.ConnectionError:
return CheckResult(
name="signer_api",
passed=False,
severity=CheckSeverity.CRITICAL,
message="Cannot connect to signer API",
fix_hint="Ensure signer service is running on the configured port"
)
except requests.exceptions.Timeout:
return CheckResult(
name="signer_api",
passed=False,
severity=CheckSeverity.WARNING,
message="Signer API connection timed out",
fix_hint="Check network connectivity or increase timeout in config"
)
except Exception as e:
return CheckResult(
name="signer_api",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"Unexpected error checking signer API: {str(e)}",
fix_hint="Check configuration and network"
)
def _check_rpc_reachable(self, params: Dict[str, Any]) -> CheckResult:
"""Check if RPC endpoint is reachable (with fallback support)."""
network = params.get("network", self.config.get("discovery", {}).get("default_chain", "base"))
# Try using RPC Manager for fallback support
if RPC_MANAGER_AVAILABLE:
try:
manager = RPCManager(self.config)
url = manager.get_active_rpc(network)
if not url:
return CheckResult(
name="rpc_reachable",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"No RPC endpoint configured for network: {network}",
fix_hint=f"Add RPC endpoint for {network} in config.json"
)
success, block_number = manager.health_check(network, url)
if success:
return CheckResult(
name="rpc_reachable",
passed=True,
severity=CheckSeverity.CRITICAL,
message=f"RPC reachable (block #{block_number})",
details={"rpc_url": url, "block_number": block_number, "fallback_available": manager.rpc_config.get("fallback_enabled", False)}
)
# Primary failed, try fallback
if manager.rpc_config.get("fallback_enabled", False):
fallback_url = manager.find_healthy_fallback(network)
if fallback_url:
success, block_number = manager.health_check(network, fallback_url)
if success:
return CheckResult(
name="rpc_reachable",
passed=True,
severity=CheckSeverity.WARNING,
message=f"RPC reachable via fallback (block #{block_number})",
details={"rpc_url": fallback_url, "block_number": block_number, "original_failed": url}
)
return CheckResult(
name="rpc_reachable",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"RPC unreachable: {url}",
fix_hint="Check RPC URL or configure fallback RPCs in config.json"
)
except Exception as e:
return CheckResult(
name="rpc_reachable",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"RPC check failed: {str(e)}",
fix_hint="Check RPC configuration"
)
# Fallback: direct check without RPC Manager
rpc_config = self.config.get("rpc", {})
rpc_urls = rpc_config.get("endpoints", {}).get(network, [])
if not rpc_urls:
return CheckResult(
name="rpc_reachable",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"No RPC endpoint configured for network: {network}",
fix_hint=f"Add RPC endpoint for {network} in config.json"
)
primary_rpc = rpc_urls[0]
timeout = rpc_config.get("timeout_seconds", 10)
try:
response = requests.post(
primary_rpc,
json={
"jsonrpc": "2.0",
"method": "eth_blockNumber",
"params": [],
"id": 1
},
timeout=timeout,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
data = response.json()
if "result" in data:
block_number = int(data["result"], 16)
return CheckResult(
name="rpc_reachable",
passed=True,
severity=CheckSeverity.CRITICAL,
message=f"RPC reachable (block #{block_number})",
details={"rpc_url": primary_rpc, "block_number": block_number}
)
return CheckResult(
name="rpc_reachable",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"RPC returned unexpected response: {response.status_code}",
fix_hint="Check RPC URL and network connectivity"
)
except requests.exceptions.RequestException as e:
return CheckResult(
name="rpc_reachable",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"Cannot connect to RPC: {str(e)}",
fix_hint="Check RPC URL or configure fallback RPCs"
)
def _check_env_completeness(self, params: Dict[str, Any]) -> CheckResult:
"""Check if required environment variables are set."""
preflight_config = self.config.get("preflight", {})
required_vars = preflight_config.get("required_env_vars", [])
missing = []
for var in required_vars:
if not os.environ.get(var):
missing.append(var)
if missing:
return CheckResult(
name="env_completeness",
passed=False,
severity=CheckSeverity.WARNING,
message=f"Missing environment variables: {', '.join(missing)}",
fix_hint="Add missing variables to ~/.bashrc or .env file"
)
return CheckResult(
name="env_completeness",
passed=True,
severity=CheckSeverity.INFO,
message="All required environment variables present"
)
def _check_gas_balance(self, params: Dict[str, Any]) -> CheckResult:
"""Check if wallet has sufficient gas balance."""
# This check requires the signer API to be available
# It will be skipped if signer API is not reachable
api_url = self.config.get("api", {}).get("url", "")
network = params.get("network", "base")
if not api_url:
return CheckResult(
name="gas_balance",
passed=False,
severity=CheckSeverity.WARNING,
message="Cannot check gas balance: API URL not configured",
fix_hint="Configure API base URL"
)
try:
# Get native token balance (ETH for ethereum, ETH for base)
response = requests.get(
f"{api_url}/wallet/balances?chain={network}",
timeout=10
)
if response.status_code == 200:
data = response.json()
balances = data.get("balances", [])
# Find native token (ETH)
native_balance = None
for bal in balances:
if bal.get("symbol") in ["ETH", "WETH"]:
native_balance = float(bal.get("balance", 0))
break
min_gas = self.config.get("preflight", {}).get("min_gas_balance_eth", 0.001)
if native_balance is None or native_balance < min_gas:
return CheckResult(
name="gas_balance",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"Insufficient gas balance: {native_balance or 0} ETH (min: {min_gas})",
fix_hint=f"Top up wallet with at least {min_gas} ETH for gas fees"
)
return CheckResult(
name="gas_balance",
passed=True,
severity=CheckSeverity.CRITICAL,
message=f"Gas balance sufficient: {native_balance} ETH",
details={"balance_eth": native_balance, "min_required": min_gas}
)
return CheckResult(
name="gas_balance",
passed=False,
severity=CheckSeverity.WARNING,
message="Could not fetch gas balance from API",
fix_hint="Check API connectivity"
)
except Exception as e:
return CheckResult(
name="gas_balance",
passed=False,
severity=CheckSeverity.WARNING,
message=f"Error checking gas balance: {str(e)}",
fix_hint="Check API status"
)
def _check_token_balance(self, params: Dict[str, Any]) -> CheckResult:
"""Check if wallet has sufficient token balance for transaction."""
token = params.get("from_token") or params.get("asset")
amount = params.get("amount")
network = params.get("network", "base")
if not token or not amount:
return CheckResult(
name="token_balance",
passed=True,
severity=CheckSeverity.INFO,
message="Token balance check skipped: token or amount not specified",
details={"token": token, "amount": amount}
)
api_url = self.config.get("api", {}).get("url", "")
try:
response = requests.get(
f"{api_url}/wallet/balances?chain={network}&tokens={token}",
timeout=10
)
if response.status_code == 200:
data = response.json()
balances = data.get("balances", [])
token_balance = None
for bal in balances:
if bal.get("symbol", "").upper() == token.upper():
token_balance = float(bal.get("balance", 0))
break
amount_float = float(amount)
if token_balance is None or token_balance < amount_float:
return CheckResult(
name="token_balance",
passed=False,
severity=CheckSeverity.CRITICAL,
message=f"Insufficient {token} balance: {token_balance or 0} (required: {amount})",
fix_hint=f"Deposit {amount} {token} to wallet before proceeding"
)
return CheckResult(
name="token_balance",
passed=True,
severity=CheckSeverity.CRITICAL,
message=f"Token balance sufficient: {token_balance} {token}",
details={"token": token, "balance": token_balance, "required": amount_float}
)
return CheckResult(
name="token_balance",
passed=False,
severity=CheckSeverity.WARNING,
message="Could not fetch token balance from API",
fix_hint="Check API connectivity"
)
except Exception as e:
return CheckResult(
name="token_balance",
passed=False,
severity=CheckSeverity.WARNING,
message=f"Error checking token balance: {str(e)}",
fix_hint="Check API status"
)
def _check_allowance(self, params: Dict[str, Any]) -> CheckResult:
"""Check if token allowance is sufficient (for swap/deposit only)."""
tx_type = params.get("type", "")
# Allowance check only relevant for swap and deposit
if tx_type not in ["swap", "deposit"]:
return CheckResult(
name="allowance",
passed=True,
severity=CheckSeverity.INFO,
message="Allowance check not required for this transaction type"
)
# This is a soft check - actual allowance will be checked during preview
return CheckResult(
name="allowance",
passed=True,
severity=CheckSeverity.INFO,
message="Allowance will be checked during transaction preview"
)
class PreflightError(Exception):
"""Exception raised when pre-flight checks fail."""
def __init__(self, report: PreflightReport):
self.report = report
self.message = report.summary
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary for API responses."""
return {
"success": False,
"error": {
"code": "E014",
"message": "Pre-flight checks failed",
"details": {
"summary": self.report.summary,
"failed_checks": [
{
"name": r.name,
"message": r.message,
"fix_hint": r.fix_hint,
"severity": r.severity.value
}
for r in self.report.results if not r.passed
]
}
}
}
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"""Load configuration from config.json."""
if config_path is None:
script_dir = os.path.dirname(os.path.abspath(__file__))
skill_dir = os.path.dirname(os.path.dirname(script_dir)) # utils -> scripts -> skill_dir
config_path = os.path.join(skill_dir, "config", "config.json")
if os.path.exists(config_path):
with open(config_path) as f:
return json.load(f)
return {}
if __name__ == "__main__":
# Simple test
config = load_config()
checker = PreflightChecker(config)
report = checker.run("swap", {"network": "base", "from_token": "USDC", "amount": "100"})
print(f"All passed: {report.all_passed}")
print(f"Summary: {report.summary}")
for r in report.results:
status = "✅" if r.passed else "❌"
print(f"{status} {r.name}: {r.message}")
```
### scripts/utils/rpc_manager.py
```python
"""
RPC Manager Module
Provides RPC fallback and session stickiness for blockchain operations.
Inspired by xaut-trade's RPC resilience design.
Key Features:
- Automatic fallback to secondary RPC nodes on network errors
- Session stickiness: once a fallback is selected, use it for the entire session
- Clear distinction between network errors (should fallback) and business errors (should not)
"""
import os
import json
import time
import requests
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum
class RPCErrors(Exception):
"""Exception raised when all RPC endpoints fail."""
pass
@dataclass
class RPCConfig:
"""RPC configuration for a single chain."""
chain: str
endpoints: List[str]
timeout_seconds: int = 10
fallback_enabled: bool = True
session_sticky: bool = True
@dataclass
class RPCStatus:
"""Status of an RPC endpoint."""
url: str
is_active: bool = False
last_error: Optional[str] = None
last_success_time: Optional[float] = None
block_number: Optional[int] = None
class RPCManager:
"""
Manages RPC connections with fallback support.
Design Philosophy (from xaut-trade):
- Only trigger fallback for NETWORK errors (429, 502, 503, timeout, connection refused)
- Do NOT trigger fallback for business errors (insufficient balance, contract revert, etc.)
- Once a fallback is selected, use it for the entire session (session stickiness)
Usage:
manager = RPCManager(config)
# Get active RPC URL (may be primary or fallback)
rpc_url = manager.get_active_rpc("base")
# Make request with automatic fallback
result = manager.request("base", {
"jsonrpc": "2.0",
"method": "eth_call",
"params": [...],
"id": 1
})
"""
# Error patterns that should trigger fallback
NETWORK_ERROR_PATTERNS = [
"429", # Too Many Requests
"502", # Bad Gateway
"503", # Service Unavailable
"timeout",
"timed out",
"connection refused",
"ECONNREFUSED",
"ENOTFOUND",
"EAI_AGAIN", # DNS lookup error
"rate limit",
"too many requests",
]
def __init__(self, config: Dict[str, Any]):
"""
Initialize RPC Manager.
Args:
config: Full config dict from config.json
"""
self.config = config
self.rpc_config = config.get("rpc", {})
# Session state: active RPC per chain
self._active_rpc: Dict[str, str] = {}
# Status tracking
self._rpc_status: Dict[str, Dict[str, RPCStatus]] = {}
# Initialize status for all configured endpoints
self._init_status()
def _init_status(self):
"""Initialize RPC status tracking."""
endpoints_config = self.rpc_config.get("endpoints", {})
for chain, urls in endpoints_config.items():
self._rpc_status[chain] = {}
for url in urls:
self._rpc_status[chain][url] = RPCStatus(url=url)
def get_endpoints(self, chain: str) -> List[str]:
"""Get all configured endpoints for a chain."""
# Support new config structure: chain.rpc
chain_config = self.config.get("chain", {})
rpc_endpoints = chain_config.get("rpc", {})
if rpc_endpoints:
return rpc_endpoints.get(chain, [])
# Fallback to old structure: rpc.endpoints
return self.rpc_config.get("endpoints", {}).get(chain, [])
def get_active_rpc(self, chain: str) -> Optional[str]:
"""
Get the currently active RPC URL for a chain.
Returns the session-sticky fallback if one was selected,
otherwise returns the primary endpoint.
"""
# Check session sticky fallback
if self.rpc_config.get("session_sticky", True):
if chain in self._active_rpc:
return self._active_rpc[chain]
# Return primary (first) endpoint
endpoints = self.get_endpoints(chain)
return endpoints[0] if endpoints else None
def set_active_rpc(self, chain: str, url: str):
"""Set the active RPC for a chain (for session stickiness)."""
if self.rpc_config.get("session_sticky", True):
self._active_rpc[chain] = url
# Update status
if chain in self._rpc_status and url in self._rpc_status[chain]:
self._rpc_status[chain][url].is_active = True
self._rpc_status[chain][url].last_success_time = time.time()
def is_network_error(self, error: Exception) -> bool:
"""
Check if an error is a network error that should trigger fallback.
Network errors: 429, 502, 503, timeout, connection refused
Business errors: insufficient balance, contract revert, invalid params
"""
error_str = str(error).lower()
for pattern in self.NETWORK_ERROR_PATTERNS:
if pattern.lower() in error_str:
return True
return False
def health_check(self, chain: str, url: str) -> tuple[bool, Optional[int]]:
"""
Perform a quick health check on an RPC endpoint.
Returns:
(success, block_number) tuple
"""
timeout = self.rpc_config.get("timeout_seconds", 10)
try:
response = requests.post(
url,
json={
"jsonrpc": "2.0",
"method": "eth_blockNumber",
"params": [],
"id": 1
},
timeout=timeout,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
data = response.json()
if "result" in data:
block_number = int(data["result"], 16)
return True, block_number
return False, None
except Exception:
return False, None
def find_healthy_fallback(self, chain: str) -> Optional[str]:
"""
Find a healthy fallback RPC endpoint.
Tries each endpoint in order, returns the first one that responds.
"""
endpoints = self.get_endpoints(chain)
current_active = self.get_active_rpc(chain)
for url in endpoints:
# Skip the current active (which failed)
if url == current_active:
continue
success, block = self.health_check(chain, url)
if success:
self.set_active_rpc(chain, url)
return url
return None
def request(
self,
chain: str,
payload: Dict[str, Any],
prefer_url: Optional[str] = None
) -> Dict[str, Any]:
"""
Make an RPC request with automatic fallback.
Args:
chain: Chain name (base, ethereum)
payload: JSON-RPC payload
prefer_url: Preferred URL (optional, overrides session state)
Returns:
JSON-RPC response
Raises:
RPCErrors: If all endpoints fail
"""
timeout = self.rpc_config.get("timeout_seconds", 10)
fallback_enabled = self.rpc_config.get("fallback_enabled", True)
# Determine which URL to use
if prefer_url:
url = prefer_url
else:
url = self.get_active_rpc(chain)
if not url:
raise RPCErrors(f"No RPC endpoint configured for chain: {chain}")
# Try the primary/active URL
try:
response = requests.post(
url,
json=payload,
timeout=timeout,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
# Check for JSON-RPC error (business error, don't fallback)
if "error" in result:
return result
# Success
self.set_active_rpc(chain, url)
return result
# Non-200 status - check if it's a network error
if self._is_status_network_error(response.status_code):
raise requests.exceptions.RequestException(f"HTTP {response.status_code}")
# Other HTTP errors - return as-is
return {"error": {"code": response.status_code, "message": response.text}}
except requests.exceptions.RequestException as e:
# Check if we should try fallback
if not fallback_enabled:
raise RPCErrors(f"RPC request failed: {str(e)}")
if not self.is_network_error(e):
# Business error, don't fallback
raise RPCErrors(f"RPC request failed: {str(e)}")
# Try fallback
fallback_url = self.find_healthy_fallback(chain)
if fallback_url:
# Retry with fallback
try:
response = requests.post(
fallback_url,
json=payload,
timeout=timeout,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
self.set_active_rpc(chain, fallback_url)
return result
except requests.exceptions.RequestException:
pass
# All endpoints failed
raise RPCErrors(
f"All RPC endpoints failed for {chain}. "
f"Tried: {len(self.get_endpoints(chain))} endpoints."
)
def _is_status_network_error(self, status_code: int) -> bool:
"""Check if HTTP status code indicates a network error."""
network_status_codes = [429, 502, 503, 504]
return status_code in network_status_codes
def get_status_report(self, chain: str) -> Dict[str, Any]:
"""Get status report for all RPC endpoints of a chain."""
endpoints = self.get_endpoints(chain)
active = self.get_active_rpc(chain)
report = {
"chain": chain,
"active_rpc": active,
"fallback_enabled": self.rpc_config.get("fallback_enabled", True),
"session_sticky": self.rpc_config.get("session_sticky", True),
"endpoints": []
}
for url in endpoints:
status = self._rpc_status.get(chain, {}).get(url)
report["endpoints"].append({
"url": url,
"is_active": url == active,
"last_error": status.last_error if status else None,
"last_success_time": status.last_success_time if status else None
})
return report
# Singleton instance for session-level stickiness
_rpc_manager_instance: Optional[RPCManager] = None
def get_rpc_manager(config: Optional[Dict[str, Any]] = None) -> RPCManager:
"""
Get or create the singleton RPC manager.
Args:
config: Config dict (required on first call)
Returns:
RPCManager instance
"""
global _rpc_manager_instance
if _rpc_manager_instance is None:
if config is None:
raise ValueError("Config required for first initialization")
_rpc_manager_instance = RPCManager(config)
return _rpc_manager_instance
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"""Load configuration from config.json."""
if config_path is None:
# Try multiple paths to find config.json
script_dir = os.path.dirname(os.path.abspath(__file__))
# Path 1: scripts/utils/.. -> skills/web3-investor/
skill_dir = os.path.dirname(os.path.dirname(script_dir))
config_path = os.path.join(skill_dir, "config", "config.json")
# Path 2: If running from workspace root
if not os.path.exists(config_path):
config_path = "skills/web3-investor/config/config.json"
# Path 3: If running from skill directory
if not os.path.exists(config_path):
config_path = "config/config.json"
if os.path.exists(config_path):
with open(config_path) as f:
return json.load(f)
return {}
if __name__ == "__main__":
# Simple test
config = load_config()
manager = RPCManager(config)
print("RPC Manager Test")
print("=" * 50)
for chain in ["base", "ethereum"]:
print(f"\n{chain.upper()}:")
print(f" Primary: {manager.get_endpoints(chain)[0] if manager.get_endpoints(chain) else 'N/A'}")
print(f" Active: {manager.get_active_rpc(chain)}")
# Health check
url = manager.get_active_rpc(chain)
if url:
success, block = manager.health_check(chain, url)
print(f" Health: {'✅ OK' if success else '❌ Failed'}")
if block:
print(f" Block: #{block}")
print("\n" + "=" * 50)
print("Status Report:")
print(json.dumps(manager.get_status_report("base"), indent=2))
```