skill-integration-tester
Validate multi-skill workflows defined in CLAUDE.md by checking skill existence, inter-skill data contracts (JSON schema compatibility), file naming conventions, and handoff integrity. Use when adding new workflows, modifying skill outputs, or verifying pipeline health before release.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install tradermonty-claude-trading-skills-skill-integration-tester
Repository
Skill path: skills/skill-integration-tester
Validate multi-skill workflows defined in CLAUDE.md by checking skill existence, inter-skill data contracts (JSON schema compatibility), file naming conventions, and handoff integrity. Use when adding new workflows, modifying skill outputs, or verifying pipeline health before release.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Data / AI, Testing, Integration.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: tradermonty.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install skill-integration-tester into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/tradermonty/claude-trading-skills before adding skill-integration-tester to shared team environments
- Use skill-integration-tester for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: skill-integration-tester
description: Validate multi-skill workflows defined in CLAUDE.md by checking skill existence, inter-skill data contracts (JSON schema compatibility), file naming conventions, and handoff integrity. Use when adding new workflows, modifying skill outputs, or verifying pipeline health before release.
---
# Skill Integration Tester
## Overview
Validate multi-skill workflows defined in CLAUDE.md (Daily Market Monitoring,
Weekly Strategy Review, Earnings Momentum Trading, etc.) by executing each step
in sequence. Check inter-skill data contracts for JSON schema compatibility
between output of step N and input of step N+1, verify file naming conventions,
and report broken handoffs. Supports dry-run mode with synthetic fixtures.
## When to Use
- After adding or modifying a multi-skill workflow in CLAUDE.md
- After changing a skill's output format (JSON schema, file naming)
- Before releasing new skills to verify pipeline compatibility
- When debugging broken handoffs between consecutive workflow steps
- As a CI pre-check for pull requests touching skill scripts
## Prerequisites
- Python 3.9+
- No API keys required
- No third-party Python packages required (uses only standard library)
## Workflow
### Step 1: Run Integration Validation
Execute the validation script against the project's CLAUDE.md:
```bash
python3 skills/skill-integration-tester/scripts/validate_workflows.py \
--output-dir reports/
```
This parses all `**Workflow Name:**` blocks from the Multi-Skill Workflows
section, resolves each step's display name to a skill directory, and validates
existence, contracts, and naming.
### Step 2: Validate a Specific Workflow
Target a single workflow by name substring:
```bash
python3 skills/skill-integration-tester/scripts/validate_workflows.py \
--workflow "Earnings Momentum" \
--output-dir reports/
```
### Step 3: Dry-Run with Synthetic Fixtures
Create synthetic fixture JSON files for each skill's expected output and
validate contract compatibility without real data:
```bash
python3 skills/skill-integration-tester/scripts/validate_workflows.py \
--dry-run \
--output-dir reports/
```
Fixture files are written to `reports/fixtures/` with `_fixture` flag set.
### Step 4: Review Results
Open the generated Markdown report for a human-readable summary, or parse
the JSON report for programmatic consumption. Each workflow shows:
- Step-by-step skill existence checks
- Handoff contract validation (PASS / FAIL / N/A)
- File naming convention violations
- Overall workflow status (valid / broken / warning)
### Step 5: Fix Broken Handoffs
For each `FAIL` handoff, verify that:
1. The producer skill's output contains all required fields
2. The consumer skill's input parameter accepts the producer's output format
3. File naming patterns are consistent between producer output and consumer input
## Output Format
### JSON Report
```json
{
"schema_version": "1.0",
"generated_at": "2026-03-01T12:00:00+00:00",
"dry_run": false,
"summary": {
"total_workflows": 8,
"valid": 6,
"broken": 1,
"warnings": 1
},
"workflows": [
{
"workflow": "Daily Market Monitoring",
"step_count": 4,
"status": "valid",
"steps": [...],
"handoffs": [...],
"naming_violations": []
}
]
}
```
### Markdown Report
Structured report with per-workflow sections showing step validation,
handoff status, and naming violations.
Reports are saved to `reports/` with filenames
`integration_test_YYYY-MM-DD_HHMMSS.{json,md}`.
## Resources
- `scripts/validate_workflows.py` -- Main validation script
- `references/workflow_contracts.md` -- Contract definitions and handoff patterns
## Key Principles
1. No API keys required -- all validation is local and offline
2. Non-destructive -- reads SKILL.md and CLAUDE.md only, never modifies skills
3. Deterministic -- same inputs always produce same validation results
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/validate_workflows.py
```python
#!/usr/bin/env python3
"""Validate multi-skill workflows defined in CLAUDE.md.
Parses workflow definitions, checks skill existence, validates inter-skill
data contracts (JSON schema compatibility), verifies file naming conventions,
and reports broken handoffs. Supports dry-run mode with synthetic fixtures.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# ── Display-name → directory-name mapping ────────────────────────────
_DISPLAY_MAP: dict[str, str] = {
"economic calendar fetcher": "economic-calendar-fetcher",
"earnings calendar": "earnings-calendar",
"market news analyst": "market-news-analyst",
"breadth chart analyst": "breadth-chart-analyst",
"sector analyst": "sector-analyst",
"technical analyst": "technical-analyst",
"market environment analysis": "market-environment-analysis",
"us market bubble detector": "us-market-bubble-detector",
"us stock analysis": "us-stock-analysis",
"backtest expert": "backtest-expert",
"options strategy advisor": "options-strategy-advisor",
"portfolio manager": "portfolio-manager",
"earnings trade analyzer": "earnings-trade-analyzer",
"pead screener": "pead-screener",
"pair trade screener": "pair-trade-screener",
"value dividend screener": "value-dividend-screener",
"dividend growth pullback screener": "dividend-growth-pullback-screener",
"position sizer": "position-sizer",
"data quality checker": "data-quality-checker",
"theme detector": "theme-detector",
"finviz screener": "finviz-screener",
"vcp screener": "vcp-screener",
"canslim screener": "canslim-screener",
"edge candidate agent": "edge-candidate-agent",
"edge hint extractor": "edge-hint-extractor",
"edge concept synthesizer": "edge-concept-synthesizer",
"edge strategy designer": "edge-strategy-designer",
"edge strategy reviewer": "edge-strategy-reviewer",
"edge pipeline orchestrator": "edge-pipeline-orchestrator",
"kanchi dividend sop": "kanchi-dividend-sop",
"kanchi dividend review monitor": "kanchi-dividend-review-monitor",
"kanchi dividend us tax accounting": "kanchi-dividend-us-tax-accounting",
"market breadth analyzer": "market-breadth-analyzer",
"uptrend analyzer": "uptrend-analyzer",
"ftd detector": "ftd-detector",
"institutional flow tracker": "institutional-flow-tracker",
"screener skills": "_meta_screener_skills",
"analysis skills": "_meta_analysis_skills",
"monitor breakout entries with stop-loss": "_meta_monitor",
"manage market-neutral positions": "_meta_manage",
"monitor z-score signals and spread convergence": "_meta_monitor_zscore",
"feed review findings back to kanchi-dividend-sop before any additional buys": "_meta_feedback",
}
# ── Skill output contracts ───────────────────────────────────────────
SKILL_CONTRACTS: dict[str, dict[str, Any]] = {
"economic-calendar-fetcher": {
"output_format": "json+md",
"output_pattern": "economic_calendar_*.{json,md}",
"output_fields": ["date", "event", "impact", "country"],
"api_keys": ["FMP_API_KEY"],
},
"earnings-calendar": {
"output_format": "json+md",
"output_pattern": "earnings_calendar_*.{json,md}",
"output_fields": ["symbol", "date", "eps_estimate", "revenue_estimate"],
"api_keys": ["FMP_API_KEY"],
},
"earnings-trade-analyzer": {
"output_format": "json+md",
"output_pattern": "earnings_trade_*.{json,md}",
"output_fields": [
"symbol",
"grade",
"gap_pct",
"volume_ratio",
"trend_score",
],
"api_keys": ["FMP_API_KEY"],
},
"pead-screener": {
"output_format": "json+md",
"output_pattern": "pead_*.{json,md}",
"output_fields": ["symbol", "status", "entry_price", "stop_loss"],
"api_keys": ["FMP_API_KEY"],
},
"edge-candidate-agent": {
"output_format": "json",
"output_pattern": "market_summary.json",
"output_fields": ["date", "market_summary", "anomalies"],
"api_keys": [],
},
"edge-hint-extractor": {
"output_format": "yaml",
"output_pattern": "hints.yaml",
"output_fields": ["hints"],
"api_keys": [],
},
"edge-concept-synthesizer": {
"output_format": "yaml",
"output_pattern": "edge_concepts.yaml",
"output_fields": ["concepts"],
"api_keys": [],
},
"edge-strategy-designer": {
"output_format": "yaml",
"output_pattern": "strategy_drafts/*.yaml",
"output_fields": ["strategy_name", "entry", "exit", "risk"],
"api_keys": [],
},
"edge-strategy-reviewer": {
"output_format": "yaml",
"output_pattern": "review.yaml",
"output_fields": ["verdict", "score", "feedback"],
"api_keys": [],
},
"value-dividend-screener": {
"output_format": "json",
"output_pattern": "dividend_screen_*.json",
"output_fields": ["symbol", "dividend_yield", "payout_ratio", "score"],
"api_keys": ["FMP_API_KEY"],
},
"dividend-growth-pullback-screener": {
"output_format": "json",
"output_pattern": "dividend_growth_*.json",
"output_fields": ["symbol", "div_growth_rate", "rsi", "score"],
"api_keys": ["FMP_API_KEY"],
},
"position-sizer": {
"output_format": "json+md",
"output_pattern": "position_size_*.{json,md}",
"output_fields": ["shares", "position_value", "risk_amount"],
"api_keys": [],
},
"data-quality-checker": {
"output_format": "json+md",
"output_pattern": "data_quality_*.{json,md}",
"output_fields": ["file", "checks", "pass_count", "fail_count"],
"api_keys": [],
},
"theme-detector": {
"output_format": "json+md",
"output_pattern": "theme_*.{json,md}",
"output_fields": ["themes", "sector_correlation"],
"api_keys": [],
},
"pair-trade-screener": {
"output_format": "json",
"output_pattern": "pair_trade_*.json",
"output_fields": [
"pair",
"correlation",
"cointegration_pvalue",
"z_score",
],
"api_keys": ["FMP_API_KEY"],
},
}
# ── Handoff contracts (producer → consumer) ──────────────────────────
HANDOFF_CONTRACTS: dict[tuple[str, str], dict[str, Any]] = {
("earnings-trade-analyzer", "pead-screener"): {
"mechanism": "file_param",
"param": "--candidates-json",
"required_fields": ["symbol", "grade", "gap_pct"],
"description": (
"PEAD Screener reads earnings-trade-analyzer JSON output via --candidates-json"
),
},
("edge-candidate-agent", "edge-hint-extractor"): {
"mechanism": "file_param",
"param": "--market-summary,--anomalies",
"required_fields": ["market_summary", "anomalies"],
"description": ("Hint extractor reads market_summary.json and anomalies.json"),
},
("edge-hint-extractor", "edge-concept-synthesizer"): {
"mechanism": "file_param",
"param": "--hints",
"required_fields": ["hints"],
"description": "Concept synthesizer reads hints.yaml",
},
("edge-concept-synthesizer", "edge-strategy-designer"): {
"mechanism": "file_param",
"param": "--concepts",
"required_fields": ["concepts"],
"description": "Strategy designer reads edge_concepts.yaml",
},
("edge-strategy-designer", "edge-strategy-reviewer"): {
"mechanism": "file_param",
"param": "--drafts-dir",
"required_fields": ["strategy_name", "entry", "exit"],
"description": "Strategy reviewer reads strategy draft YAML files",
},
}
# ── Core functions ───────────────────────────────────────────────────
def resolve_skill_name(display_name: str) -> str:
"""Convert a display name to a skill directory name."""
normalized = display_name.strip().lower()
if normalized in _DISPLAY_MAP:
return _DISPLAY_MAP[normalized]
# Algorithmic fallback: lowercase, replace whitespace with hyphens
return re.sub(r"\s+", "-", normalized)
def parse_workflows(text: str) -> dict[str, list[dict[str, str]]]:
"""Parse multi-skill workflow definitions from CLAUDE.md content.
Returns dict mapping workflow name to list of steps, each step
being {"skill_display": ..., "action": ...}.
"""
workflows: dict[str, list[dict[str, str]]] = {}
workflow_pattern = re.compile(r"\*\*(.+?):\*\*\s*\n((?:\s*\d+\.\s+.+\n?)+)", re.MULTILINE)
step_pattern = re.compile(r"\d+\.\s+(.+?)\s*\u2192\s*(.+?)(?:\s*\n|$)")
for wf_match in workflow_pattern.finditer(text):
name = wf_match.group(1).strip()
steps_text = wf_match.group(2)
steps: list[dict[str, str]] = []
for step_match in step_pattern.finditer(steps_text):
raw_skill = step_match.group(1).strip()
# Strip parenthetical notes like (Mode B), (--ohlcv)
skill_display = re.sub(r"\s*\(.*?\)\s*$", "", raw_skill).strip()
action = step_match.group(2).strip()
steps.append({"skill_display": skill_display, "action": action})
if steps:
workflows[name] = steps
return workflows
def parse_frontmatter_name(skill_md_path: Path) -> str | None:
"""Extract the name field from SKILL.md YAML frontmatter."""
try:
content = skill_md_path.read_text(encoding="utf-8")
except OSError:
return None
match = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL)
if not match:
return None
for line in match.group(1).split("\n"):
m = re.match(r"^name:\s*(.+)", line)
if m:
return m.group(1).strip()
return None
def check_skill_exists(skill_name: str, skills_dir: Path) -> bool:
"""Check if a skill directory with SKILL.md exists."""
return (skills_dir / skill_name / "SKILL.md").is_file()
def check_naming_conventions(skill_name: str, skills_dir: Path) -> list[str]:
"""Check file naming conventions for a skill. Returns violations."""
violations: list[str] = []
skill_dir = skills_dir / skill_name
if not skill_dir.is_dir():
return [f"{skill_name}: Skill directory not found"]
# SKILL.md must exist
skill_md = skill_dir / "SKILL.md"
if not skill_md.is_file():
violations.append(f"{skill_name}: Missing SKILL.md")
else:
# Frontmatter name must match directory name
fm_name = parse_frontmatter_name(skill_md)
if fm_name and fm_name != skill_name:
violations.append(f"{skill_name}: SKILL.md name '{fm_name}' does not match directory")
# Script filenames must be snake_case
scripts_dir_path = skill_dir / "scripts"
if scripts_dir_path.is_dir():
for py_file in scripts_dir_path.glob("*.py"):
if py_file.name == "__init__.py":
continue
if not re.match(r"^[a-z][a-z0-9_]*\.py$", py_file.name):
violations.append(f"{skill_name}: Script '{py_file.name}' not in snake_case")
# Directory name must be lowercase-hyphenated
if not re.match(r"^[a-z][a-z0-9-]*$", skill_name):
violations.append(f"{skill_name}: Directory name not in lowercase-hyphen format")
return violations
def validate_handoff(producer: str, consumer: str, skills_dir: Path) -> dict[str, Any]:
"""Validate data contract between two consecutive workflow steps."""
result: dict[str, Any] = {
"producer": producer,
"consumer": consumer,
"status": "unknown",
"details": [],
}
if not check_skill_exists(producer, skills_dir):
result["status"] = "broken"
result["details"].append(f"Producer skill not found: {producer}")
return result
if not check_skill_exists(consumer, skills_dir):
result["status"] = "broken"
result["details"].append(f"Consumer skill not found: {consumer}")
return result
pair = (producer, consumer)
if pair in HANDOFF_CONTRACTS:
contract = HANDOFF_CONTRACTS[pair]
producer_contract = SKILL_CONTRACTS.get(producer, {})
producer_fields = set(producer_contract.get("output_fields", []))
required = set(contract.get("required_fields", []))
missing = required - producer_fields
if missing:
result["status"] = "broken"
result["details"].append(
f"Missing required fields in {producer} output: {sorted(missing)}"
)
else:
result["status"] = "valid"
result["details"].append(f"Contract valid: {contract['description']}")
else:
result["status"] = "no_contract"
result["details"].append(f"No handoff contract defined for {producer} \u2192 {consumer}")
return result
def validate_workflow(
name: str,
steps: list[dict[str, str]],
skills_dir: Path,
) -> dict[str, Any]:
"""Validate a complete workflow end-to-end."""
result: dict[str, Any] = {
"workflow": name,
"step_count": len(steps),
"steps": [],
"handoffs": [],
"naming_violations": [],
"status": "valid",
}
for i, step in enumerate(steps):
skill_name = resolve_skill_name(step["skill_display"])
# Skip meta-steps (manual actions, not real skills)
is_meta = skill_name.startswith("_meta_")
exists = is_meta or check_skill_exists(skill_name, skills_dir)
step_result = {
"index": i + 1,
"skill_display": step["skill_display"],
"skill_name": skill_name,
"action": step["action"],
"exists": exists,
"is_meta": is_meta,
"has_contract": skill_name in SKILL_CONTRACTS,
}
result["steps"].append(step_result)
if not exists:
result["status"] = "broken"
# Naming conventions (skip meta-steps)
if not is_meta:
violations = check_naming_conventions(skill_name, skills_dir)
result["naming_violations"].extend(violations)
# Handoff with previous step
if i > 0:
prev_skill = resolve_skill_name(steps[i - 1]["skill_display"])
if not prev_skill.startswith("_meta_") and not is_meta:
handoff = validate_handoff(prev_skill, skill_name, skills_dir)
result["handoffs"].append(handoff)
if handoff["status"] == "broken":
result["status"] = "broken"
if result["naming_violations"] and result["status"] == "valid":
result["status"] = "warning"
return result
def create_dry_run_fixtures(
workflows: dict[str, list[dict[str, str]]],
output_dir: Path,
) -> list[str]:
"""Create synthetic fixture files for dry-run mode."""
created: list[str] = []
fixtures_dir = output_dir / "fixtures"
fixtures_dir.mkdir(parents=True, exist_ok=True)
seen: set = set()
for _wf_name, steps in workflows.items():
for step in steps:
skill_name = resolve_skill_name(step["skill_display"])
if skill_name in seen or skill_name not in SKILL_CONTRACTS:
continue
seen.add(skill_name)
contract = SKILL_CONTRACTS[skill_name]
fields = contract.get("output_fields", [])
fixture: dict[str, Any] = {
"_fixture": True,
"_skill": skill_name,
"schema_version": "1.0",
}
for field in fields:
fixture[field] = f"<synthetic_{field}>"
fixture_path = fixtures_dir / f"{skill_name}_fixture.json"
fixture_path.write_text(json.dumps(fixture, indent=2) + "\n", encoding="utf-8")
created.append(str(fixture_path))
return created
def generate_report(
results: list[dict[str, Any]],
dry_run: bool,
fixtures: list[str],
output_dir: Path,
) -> tuple[Path, Path]:
"""Generate JSON and Markdown reports."""
output_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H%M%S")
total = len(results)
valid = sum(1 for r in results if r["status"] == "valid")
broken = sum(1 for r in results if r["status"] == "broken")
warnings = sum(1 for r in results if r["status"] == "warning")
report_data: dict[str, Any] = {
"schema_version": "1.0",
"generated_at": datetime.now(timezone.utc).isoformat(),
"dry_run": dry_run,
"summary": {
"total_workflows": total,
"valid": valid,
"broken": broken,
"warnings": warnings,
},
"workflows": results,
}
if dry_run:
report_data["fixtures_created"] = fixtures
json_path = output_dir / f"integration_test_{ts}.json"
json_path.write_text(
json.dumps(report_data, indent=2, default=str) + "\n",
encoding="utf-8",
)
# ── Markdown report ──
lines = [
"# Skill Integration Test Report",
"",
f"**Generated:** {report_data['generated_at']}",
f"**Mode:** {'Dry-run' if dry_run else 'Live'}",
"",
"## Summary",
"",
"| Metric | Count |",
"|--------|-------|",
f"| Total Workflows | {total} |",
f"| Valid | {valid} |",
f"| Broken | {broken} |",
f"| Warnings | {warnings} |",
"",
]
for r in results:
status_label = {
"valid": "PASS",
"broken": "FAIL",
"warning": "WARN",
}.get(r["status"], "?")
lines.append(f"## [{status_label}] {r['workflow']}")
lines.append("")
lines.append(f"Steps: {r['step_count']}")
lines.append("")
for step in r["steps"]:
exists_mark = "ok" if step["exists"] else "MISSING"
contract_mark = "ok" if step["has_contract"] else "none"
meta_note = " (meta)" if step.get("is_meta") else ""
lines.append(
f" {step['index']}. **{step['skill_display']}** "
f"(`{step['skill_name']}`){meta_note} "
f"\u2014 exists: {exists_mark}, contract: {contract_mark}"
)
if r["handoffs"]:
lines.append("")
lines.append("### Handoffs")
lines.append("")
for h in r["handoffs"]:
h_label = {
"valid": "PASS",
"broken": "FAIL",
"no_contract": "N/A",
}.get(h["status"], "?")
lines.append(f"- [{h_label}] {h['producer']} \u2192 {h['consumer']}")
for d in h["details"]:
lines.append(f" - {d}")
if r["naming_violations"]:
lines.append("")
lines.append("### Naming Violations")
lines.append("")
for v in r["naming_violations"]:
lines.append(f"- {v}")
lines.append("")
md_path = output_dir / f"integration_test_{ts}.md"
md_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return json_path, md_path
# ── CLI entry point ──────────────────────────────────────────────────
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description=("Validate multi-skill workflows defined in CLAUDE.md"),
)
parser.add_argument(
"--claude-md",
type=str,
default=None,
help="Path to CLAUDE.md (default: auto-detect from project root)",
)
parser.add_argument(
"--skills-dir",
type=str,
default=None,
help=("Path to skills/ directory (default: auto-detect from project root)"),
)
parser.add_argument(
"--workflow",
type=str,
default=None,
help="Validate a specific workflow by name substring (default: all)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Create synthetic fixtures and validate without real data",
)
parser.add_argument(
"--output-dir",
type=str,
default="reports",
help="Output directory for reports (default: reports/)",
)
args = parser.parse_args(argv)
# Resolve project root (script is at skills/<name>/scripts/<file>.py)
project_root = Path(__file__).resolve().parents[3]
claude_md_path = Path(args.claude_md) if args.claude_md else project_root / "CLAUDE.md"
skills_dir = Path(args.skills_dir) if args.skills_dir else project_root / "skills"
output_dir = Path(args.output_dir)
if not claude_md_path.is_file():
print(
f"Error: CLAUDE.md not found at {claude_md_path}",
file=sys.stderr,
)
return 1
if not skills_dir.is_dir():
print(
f"Error: skills directory not found at {skills_dir}",
file=sys.stderr,
)
return 1
# Parse workflows
content = claude_md_path.read_text(encoding="utf-8")
workflows = parse_workflows(content)
if not workflows:
print("Warning: No workflows found in CLAUDE.md", file=sys.stderr)
return 0
# Filter to specific workflow if requested
if args.workflow:
matching = {k: v for k, v in workflows.items() if args.workflow.lower() in k.lower()}
if not matching:
available = ", ".join(workflows.keys())
print(
f"Error: No workflow matching '{args.workflow}' found. Available: {available}",
file=sys.stderr,
)
return 1
workflows = matching
# Dry-run fixtures
fixtures: list[str] = []
if args.dry_run:
fixtures = create_dry_run_fixtures(workflows, output_dir)
print(
f"Dry-run: created {len(fixtures)} fixture files",
file=sys.stderr,
)
# Validate each workflow
results: list[dict[str, Any]] = []
for wf_name, steps in workflows.items():
result = validate_workflow(wf_name, steps, skills_dir)
results.append(result)
# Generate reports
json_path, md_path = generate_report(results, args.dry_run, fixtures, output_dir)
# Print summary
total = len(results)
broken = sum(1 for r in results if r["status"] == "broken")
valid = sum(1 for r in results if r["status"] == "valid")
print("\nIntegration Test Results:")
print(f" Workflows tested: {total}")
print(f" Valid: {valid}")
print(f" Broken: {broken}")
print(f" Reports: {json_path}, {md_path}")
return 1 if broken > 0 else 0
if __name__ == "__main__":
sys.exit(main())
```
### references/workflow_contracts.md
```markdown
# Workflow Contracts Reference
## Overview
Multi-skill workflows chain several skills in sequence, where the output of
step N feeds into step N+1. A **data contract** defines what each skill
produces and what its downstream consumer expects. When contracts are
violated -- missing fields, incompatible formats, wrong file patterns --
the handoff breaks silently.
## Contract Schema
Each skill contract specifies:
| Field | Description |
|-------|-------------|
| `output_format` | File format produced: `json`, `md`, `json+md`, `yaml` |
| `output_pattern` | Glob pattern for output filenames |
| `output_fields` | Required top-level fields in JSON/YAML output |
| `api_keys` | Environment variables needed to run the skill |
## Handoff Contract Schema
A handoff contract between producer and consumer specifies:
| Field | Description |
|-------|-------------|
| `mechanism` | How data flows: `file_param` (CLI arg), `directory`, `stdin` |
| `param` | CLI parameter(s) the consumer uses to accept input |
| `required_fields` | Fields the consumer reads from the producer's output |
| `description` | Human-readable explanation of the data flow |
## Known Handoff Contracts
### Earnings Momentum Trading Pipeline
```
earnings-trade-analyzer → pead-screener
mechanism: file_param (--candidates-json)
required_fields: symbol, grade, gap_pct
```
PEAD Screener's Mode B reads the earnings-trade-analyzer JSON output via
`--candidates-json`. The consumer filters by `grade >= B` and uses `symbol`
to fetch weekly candle data.
### Edge Research Pipeline
```
edge-candidate-agent → edge-hint-extractor
mechanism: file_param (--market-summary, --anomalies)
required_fields: market_summary, anomalies
edge-hint-extractor → edge-concept-synthesizer
mechanism: file_param (--hints)
required_fields: hints
edge-concept-synthesizer → edge-strategy-designer
mechanism: file_param (--concepts)
required_fields: concepts
edge-strategy-designer → edge-strategy-reviewer
mechanism: file_param (--drafts-dir)
required_fields: strategy_name, entry, exit
```
### Trade Execution Pipeline
```
screener skills → position-sizer
mechanism: manual (user copies entry/stop from screener output)
required_fields: (none -- user provides values)
analysis skills → data-quality-checker
mechanism: file_param (--file)
required_fields: (validates markdown content, not specific fields)
```
## Workflows Without Explicit Contracts
Some workflows (Daily Market Monitoring, Weekly Strategy Review) consist of
independent analysis steps that do not pass data between them. Each step
produces a standalone report. These workflows are validated for skill
existence and naming conventions only.
## File Naming Conventions
| Component | Convention | Example |
|-----------|-----------|---------|
| Skill directory | lowercase-hyphen | `earnings-trade-analyzer` |
| Python scripts | snake_case.py | `analyze_earnings_trades.py` |
| Output files | `<prefix>_YYYY-MM-DD_HHMMSS.{json,md}` | `integration_test_2026-03-01_120000.json` |
| SKILL.md name | must match directory name | `name: earnings-trade-analyzer` |
## Adding New Contracts
When creating a new multi-skill workflow:
1. Define each skill's output contract (format, fields, pattern)
2. Define handoff contracts for consecutive steps with data dependencies
3. Add the contracts to the `SKILL_CONTRACTS` and `HANDOFF_CONTRACTS`
dictionaries in `validate_workflows.py`
4. Run the integration tester to verify the new workflow
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### scripts/tests/conftest.py
```python
"""Shared fixtures for Skill Integration Tester tests"""
import os
import sys
# Add scripts directory to path so modules can be imported
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# Add tests directory to path so helpers can be imported
sys.path.insert(0, os.path.dirname(__file__))
```
### scripts/tests/test_test_workflows.py
```python
"""Tests for validate_workflows.py — Skill Integration Tester."""
import json
from pathlib import Path
from validate_workflows import (
HANDOFF_CONTRACTS,
SKILL_CONTRACTS,
check_naming_conventions,
check_skill_exists,
create_dry_run_fixtures,
generate_report,
parse_frontmatter_name,
parse_workflows,
resolve_skill_name,
validate_handoff,
validate_workflow,
)
# ── Sample CLAUDE.md content for testing ─────────────────────────────
SAMPLE_CLAUDE_MD = """\
## Multi-Skill Workflows
Skills are designed to be combined for comprehensive analysis:
**Daily Market Monitoring:**
1. Economic Calendar Fetcher \u2192 Check today's events
2. Earnings Calendar \u2192 Identify reporting companies
3. Market News Analyst \u2192 Review overnight developments
**Earnings Momentum Trading:**
1. Earnings Trade Analyzer \u2192 Score recent earnings reactions
2. PEAD Screener (Mode B) \u2192 Feed analyzer output and screen
3. Technical Analyst \u2192 Confirm weekly chart setups
"""
# ── parse_workflows ──────────────────────────────────────────────────
class TestParseWorkflows:
def test_finds_named_workflows(self):
workflows = parse_workflows(SAMPLE_CLAUDE_MD)
assert "Daily Market Monitoring" in workflows
assert "Earnings Momentum Trading" in workflows
def test_extracts_correct_step_count(self):
workflows = parse_workflows(SAMPLE_CLAUDE_MD)
assert len(workflows["Daily Market Monitoring"]) == 3
assert len(workflows["Earnings Momentum Trading"]) == 3
def test_extracts_skill_display_and_action(self):
workflows = parse_workflows(SAMPLE_CLAUDE_MD)
step = workflows["Daily Market Monitoring"][0]
assert step["skill_display"] == "Economic Calendar Fetcher"
assert step["action"] == "Check today's events"
def test_strips_parenthetical_from_skill_name(self):
workflows = parse_workflows(SAMPLE_CLAUDE_MD)
step = workflows["Earnings Momentum Trading"][1]
assert step["skill_display"] == "PEAD Screener"
def test_returns_empty_for_no_workflows(self):
assert parse_workflows("No workflows here") == {}
# ── resolve_skill_name ───────────────────────────────────────────────
class TestResolveSkillName:
def test_known_mapping(self):
assert resolve_skill_name("Economic Calendar Fetcher") == "economic-calendar-fetcher"
def test_case_insensitive(self):
assert resolve_skill_name("SECTOR ANALYST") == "sector-analyst"
def test_unknown_name_algorithmic_fallback(self):
assert resolve_skill_name("My Custom Skill") == "my-custom-skill"
# ── check_skill_exists ──────────────────────────────────────────────
class TestCheckSkillExists:
def test_exists_when_skill_md_present(self, tmp_path):
skill_dir = tmp_path / "my-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("---\nname: my-skill\n---\n")
assert check_skill_exists("my-skill", tmp_path)
def test_not_exists_when_missing(self, tmp_path):
assert not check_skill_exists("nonexistent", tmp_path)
# ── parse_frontmatter_name ──────────────────────────────────────────
class TestParseFrontmatterName:
def test_extracts_name(self, tmp_path):
md = tmp_path / "SKILL.md"
md.write_text("---\nname: foo-bar\ndescription: X\n---\nBody")
assert parse_frontmatter_name(md) == "foo-bar"
def test_returns_none_for_no_frontmatter(self, tmp_path):
md = tmp_path / "SKILL.md"
md.write_text("No frontmatter here")
assert parse_frontmatter_name(md) is None
def test_returns_none_for_missing_file(self, tmp_path):
assert parse_frontmatter_name(tmp_path / "missing.md") is None
# ── check_naming_conventions ─────────────────────────────────────────
class TestCheckNamingConventions:
def test_valid_skill_no_violations(self, tmp_path):
skill_dir = tmp_path / "my-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("---\nname: my-skill\n---\n")
scripts_dir = skill_dir / "scripts"
scripts_dir.mkdir()
(scripts_dir / "run_check.py").write_text("")
assert check_naming_conventions("my-skill", tmp_path) == []
def test_detects_non_snake_case_script(self, tmp_path):
skill_dir = tmp_path / "my-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("---\nname: my-skill\n---\n")
scripts_dir = skill_dir / "scripts"
scripts_dir.mkdir()
(scripts_dir / "RunAnalysis.py").write_text("")
violations = check_naming_conventions("my-skill", tmp_path)
assert any("snake_case" in v for v in violations)
def test_detects_name_mismatch(self, tmp_path):
skill_dir = tmp_path / "my-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("---\nname: wrong-name\n---\n")
violations = check_naming_conventions("my-skill", tmp_path)
assert any("does not match" in v for v in violations)
# ── validate_handoff ─────────────────────────────────────────────────
class TestValidateHandoff:
def _make_skill(self, skills_dir, name):
d = skills_dir / name
d.mkdir(parents=True, exist_ok=True)
(d / "SKILL.md").write_text(f"---\nname: {name}\n---\n")
def test_known_valid_contract(self, tmp_path):
self._make_skill(tmp_path, "earnings-trade-analyzer")
self._make_skill(tmp_path, "pead-screener")
result = validate_handoff("earnings-trade-analyzer", "pead-screener", tmp_path)
assert result["status"] == "valid"
def test_missing_producer_is_broken(self, tmp_path):
self._make_skill(tmp_path, "pead-screener")
result = validate_handoff("nonexistent", "pead-screener", tmp_path)
assert result["status"] == "broken"
def test_unknown_pair_is_no_contract(self, tmp_path):
self._make_skill(tmp_path, "skill-a")
self._make_skill(tmp_path, "skill-b")
result = validate_handoff("skill-a", "skill-b", tmp_path)
assert result["status"] == "no_contract"
# ── validate_workflow ────────────────────────────────────────────────
class TestValidateWorkflow:
def test_all_steps_exist(self, tmp_path):
for name in ("skill-a", "skill-b"):
d = tmp_path / name
d.mkdir()
(d / "SKILL.md").write_text(f"---\nname: {name}\n---\n")
steps = [
{"skill_display": "skill-a", "action": "do A"},
{"skill_display": "skill-b", "action": "do B"},
]
result = validate_workflow("Test", steps, tmp_path)
assert result["status"] != "broken"
def test_missing_step_marks_broken(self, tmp_path):
d = tmp_path / "skill-a"
d.mkdir()
(d / "SKILL.md").write_text("---\nname: skill-a\n---\n")
steps = [
{"skill_display": "skill-a", "action": "do A"},
{"skill_display": "nonexistent", "action": "do B"},
]
result = validate_workflow("Test", steps, tmp_path)
assert result["status"] == "broken"
# ── generate_report ──────────────────────────────────────────────────
class TestGenerateReport:
def _make_result(self, status="valid"):
return {
"workflow": "Test",
"step_count": 1,
"steps": [
{
"index": 1,
"skill_display": "A",
"skill_name": "a",
"action": "do",
"exists": True,
"is_meta": False,
"has_contract": False,
}
],
"handoffs": [],
"naming_violations": [],
"status": status,
}
def test_creates_json_and_md_files(self, tmp_path):
json_p, md_p = generate_report([self._make_result()], False, [], tmp_path)
assert json_p.is_file()
assert md_p.is_file()
def test_json_report_schema(self, tmp_path):
json_p, _ = generate_report([self._make_result()], False, [], tmp_path)
data = json.loads(json_p.read_text())
assert data["schema_version"] == "1.0"
assert data["summary"]["total_workflows"] == 1
assert data["summary"]["valid"] == 1
assert data["summary"]["broken"] == 0
def test_dry_run_includes_fixtures(self, tmp_path):
json_p, _ = generate_report(
[self._make_result()],
True,
["/tmp/fixture.json"],
tmp_path,
)
data = json.loads(json_p.read_text())
assert data["dry_run"] is True
assert "/tmp/fixture.json" in data["fixtures_created"]
# ── create_dry_run_fixtures ──────────────────────────────────────────
class TestCreateDryRunFixtures:
def test_creates_fixture_files(self, tmp_path):
workflows = {
"Test": [
{
"skill_display": "Earnings Calendar",
"action": "check",
},
],
}
fixtures = create_dry_run_fixtures(workflows, tmp_path)
assert len(fixtures) > 0
data = json.loads(Path(fixtures[0]).read_text())
assert data["_fixture"] is True
assert data["_skill"] == "earnings-calendar"
assert data["schema_version"] == "1.0"
def test_skips_unknown_skills(self, tmp_path):
workflows = {
"Test": [
{
"skill_display": "Unknown Skill XYZ",
"action": "do",
},
],
}
fixtures = create_dry_run_fixtures(workflows, tmp_path)
assert len(fixtures) == 0
def test_deduplicates_across_workflows(self, tmp_path):
workflows = {
"WF1": [
{
"skill_display": "Earnings Calendar",
"action": "a",
},
],
"WF2": [
{
"skill_display": "Earnings Calendar",
"action": "b",
},
],
}
fixtures = create_dry_run_fixtures(workflows, tmp_path)
assert len(fixtures) == 1
# ── Contract data integrity ──────────────────────────────────────────
class TestContractIntegrity:
def test_all_handoff_producers_have_contracts(self):
"""Every producer in HANDOFF_CONTRACTS must exist in SKILL_CONTRACTS."""
for (producer, _consumer), _contract in HANDOFF_CONTRACTS.items():
assert producer in SKILL_CONTRACTS, (
f"Handoff producer '{producer}' missing from SKILL_CONTRACTS"
)
def test_handoff_required_fields_subset_of_output(self):
"""Required fields in handoff must be subset of producer output."""
for (producer, consumer), contract in HANDOFF_CONTRACTS.items():
producer_fields = set(SKILL_CONTRACTS[producer]["output_fields"])
required = set(contract["required_fields"])
missing = required - producer_fields
assert not missing, (
f"Handoff {producer}\u2192{consumer}: "
f"required fields {missing} not in producer output"
)
```