skill-idea-miner
Mine Claude Code session logs for skill idea candidates. Use when running the weekly skill generation pipeline to extract, score, and backlog new skill ideas from recent coding sessions.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install tradermonty-claude-trading-skills-skill-idea-miner
Repository
Skill path: skills/skill-idea-miner
Mine Claude Code session logs for skill idea candidates. Use when running the weekly skill generation pipeline to extract, score, and backlog new skill ideas from recent coding sessions.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: tradermonty.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install skill-idea-miner into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/tradermonty/claude-trading-skills before adding skill-idea-miner to shared team environments
- Use skill-idea-miner for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: skill-idea-miner
description: Mine Claude Code session logs for skill idea candidates. Use when running the weekly skill generation pipeline to extract, score, and backlog new skill ideas from recent coding sessions.
---
# Skill Idea Miner
Automatically extract skill idea candidates from Claude Code session logs,
score them for novelty, feasibility, and trading value, and maintain a
prioritized backlog for downstream skill generation.
## When to Use
- Weekly automated pipeline run (Saturday 06:00 via launchd)
- Manual backlog refresh: `python3 scripts/run_skill_generation_pipeline.py --mode weekly`
- Dry-run to preview candidates without LLM scoring
## Workflow
### Stage 1: Session Log Mining
1. Enumerate session logs from allowlist projects in `~/.claude/projects/`
2. Filter to past 7 days by file mtime, confirm with `timestamp` field
3. Extract user messages (`type: "user"`, `userType: "external"`)
4. Extract tool usage patterns from assistant messages
5. Run deterministic signal detection:
- Skill usage frequency (`skills/*/` path references)
- Error patterns (non-zero exit codes, `is_error` flags, exception keywords)
- Repetitive tool sequences (3+ tools repeated 3+ times)
- Automation request keywords (English and Japanese)
- Unresolved requests (5+ minute gap after user message)
6. Invoke Claude CLI headless for idea abstraction
7. Output `raw_candidates.yaml`
### Stage 2: Scoring and Deduplication
1. Load existing skills from `skills/*/SKILL.md` frontmatter
2. Deduplicate via Jaccard similarity (threshold > 0.5) against:
- Existing skill names and descriptions
- Existing backlog ideas
3. Score non-duplicate candidates with Claude CLI:
- Novelty (0-100): differentiation from existing skills
- Feasibility (0-100): technical implementability
- Trading Value (0-100): practical value for investors/traders
- Composite = 0.3 * Novelty + 0.3 * Feasibility + 0.4 * Trading Value
4. Merge scored candidates into `logs/.skill_generation_backlog.yaml`
## Output Format
### raw_candidates.yaml
```yaml
generated_at_utc: "2026-03-08T06:00:00Z"
period: {from: "2026-03-01", to: "2026-03-07"}
projects_scanned: ["claude-trading-skills"]
sessions_scanned: 12
candidates:
- id: "raw_2026w10_001"
title: "Earnings Whispers Image Parser"
source_project: "claude-trading-skills"
evidence:
user_requests: ["Extract earnings dates from screenshot"]
pain_points: ["Manual image reading"]
frequency: 3
raw_description: "Parse Earnings Whispers screenshots to extract dates."
category: "data-extraction"
```
### Backlog (logs/.skill_generation_backlog.yaml)
```yaml
updated_at_utc: "2026-03-08T06:15:00Z"
ideas:
- id: "idea_2026w10_001"
title: "Earnings Whispers Image Parser"
description: "Skill that parses Earnings Whispers screenshots..."
category: "data-extraction"
scores: {novelty: 75, feasibility: 60, trading_value: 80, composite: 73}
status: "pending"
```
## Resources
- `references/idea_extraction_rubric.md` — Signal detection criteria and scoring rubric
- `scripts/mine_session_logs.py` — Session log parser
- `scripts/score_ideas.py` — Scorer and deduplicator
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### references/idea_extraction_rubric.md
```markdown
# Idea Extraction Rubric
Criteria and scoring framework for mining skill ideas from Claude Code session logs.
## Signal Detection Rules
### 1. Skill Usage Signals
Detect references to existing skills in tool arguments:
- **Pattern:** File paths containing `skills/*/` in Read, Edit, Write, Glob, Grep, Bash tools
- **Threshold:** Any reference counts as a signal
- **Interpretation:** Frequently used skills indicate adjacent needs; skills used together suggest workflow gaps
### 2. Error Detection Signals
Identify pain points from failed operations:
| Signal | Detection Method |
|--------|-----------------|
| Non-zero exit code | Bash tool_result with `exitCode != 0` |
| Explicit error flag | `tool_result` content block with `is_error: true` |
| Error text patterns | Regex match: `Error:`, `Exception:`, `Traceback`, `FAILED`, `ModuleNotFoundError` |
- **Interpretation:** Recurring errors on similar tasks suggest automation or tooling gaps
### 3. Repetitive Pattern Signals
Detect repetitive tool sequences that indicate manual workflows:
- **Definition:** A sequence of 3+ consecutive tool calls (by tool name)
- **Threshold:** Same sequence appearing 3+ times in a session
- **Extraction:** Record the tool sequence and associated file/command arguments
- **Interpretation:** Repetitive sequences are strong candidates for workflow automation
### 4. Automation Request Signals
Keyword detection in user messages:
**English keywords:**
- `skill`, `create`, `automate`, `workflow`, `pipeline`, `generate`, `template`, `script`
**Japanese keywords:**
- `スキル`, `作成`, `自動化`, `ワークフロー`, `パイプライン`, `生成`, `テンプレート`
- **Matching:** Case-insensitive, partial word match
- **Interpretation:** Direct user intent to automate; highest signal value
### 5. Unresolved Request Signals
Detect user messages that did not result in tool actions:
- **Definition:** User message (`type: "user"`) followed by 5+ minutes without any `tool_use`
- **Measurement:** Compare timestamps between user message and next tool_use
- **Edge cases:** End-of-session messages (no subsequent entries) are excluded
- **Interpretation:** May indicate requests beyond current capabilities
## LLM Abstraction Prompt
When invoking Claude CLI for idea abstraction, use this prompt structure:
```
You are analyzing Claude Code session logs to extract skill idea candidates
for a trading/investing skill repository.
Given the following signals detected from recent sessions:
{signals_json}
And sample user messages from these sessions:
{user_messages}
Extract 0-5 skill idea candidates. For each candidate:
1. Abstract the idea (do not copy verbatim user messages)
2. Assign a category: data-extraction, analysis, screening, reporting, workflow, monitoring
3. Describe the pain point it addresses
4. Note which signals support this idea
Return JSON with this structure:
{
"candidates": [
{
"title": "Short descriptive name",
"raw_description": "What the skill would do",
"category": "one of the categories above",
"evidence": {
"user_requests": ["abstracted summaries, not verbatim"],
"pain_points": ["what problem this solves"],
"frequency": <number of times this pattern appeared>
}
}
]
}
Rules:
- Return 0 candidates if no clear skill ideas emerge
- Do not suggest skills that already exist (provided in context)
- Abstract user requests (e.g., "extract earnings dates from chart" not "check CRWD earnings")
- Focus on trading/investing domain relevance
```
## Scoring Rubric
### Novelty (0-100)
| Score Range | Criteria |
|------------|---------|
| 80-100 | No existing skill covers this domain; fills a clear gap |
| 60-79 | Partially overlaps with existing skills but adds significant new capability |
| 40-59 | Moderate overlap; could be a feature of an existing skill |
| 20-39 | High overlap; mostly duplicates existing functionality |
| 0-19 | Near-identical to an existing skill |
### Feasibility (0-100)
| Score Range | Criteria |
|------------|---------|
| 80-100 | Can be built with existing tools (WebSearch, Read, Bash); no paid API required |
| 60-79 | Requires existing paid API (FMP, FINVIZ) already in the project |
| 40-59 | Requires new API integration or complex parsing logic |
| 20-39 | Requires significant infrastructure (databases, real-time feeds) |
| 0-19 | Not feasible as a Claude skill (needs persistent state, GUI, etc.) |
### Trading Value (0-100)
| Score Range | Criteria |
|------------|---------|
| 80-100 | Directly supports trade decisions with actionable signals |
| 60-79 | Provides valuable market intelligence or portfolio insights |
| 40-59 | Useful for research but not directly actionable |
| 20-39 | Tangentially related to trading/investing |
| 0-19 | No clear trading/investing application |
### Composite Score
```
composite = 0.3 * novelty + 0.3 * feasibility + 0.4 * trading_value
```
Trading value is weighted higher (0.4) because the repository's primary purpose
is to serve equity investors and traders.
## Category Taxonomy
| Category | Description | Examples |
|----------|-------------|---------|
| `data-extraction` | Parse/extract structured data from unstructured sources | Image parsing, PDF extraction |
| `analysis` | Analyze market data to produce insights | Correlation analysis, regime detection |
| `screening` | Filter stocks/assets based on criteria | Momentum screener, value screener |
| `reporting` | Generate formatted reports from data | Weekly summary, portfolio report |
| `workflow` | Automate multi-step processes | Pipeline orchestration, batch processing |
| `monitoring` | Track conditions and alert on changes | Price alerts, position monitoring |
## Privacy and Data Handling
- Session logs contain user interactions already sent to Claude; no additional privacy concern for LLM processing
- Raw user messages must be abstracted before storage in backlog (no verbatim copies)
- File paths with usernames must be stripped from output artifacts
- Source session UUIDs are stored in `logs/` (gitignored) for audit purposes only
- Committed files (SKILL.md, references) must not contain personal information
```
### scripts/mine_session_logs.py
```python
#!/usr/bin/env python3
"""Mine Claude Code session logs for skill idea candidates."""
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import shutil
import subprocess
from datetime import datetime, timedelta, timezone
from pathlib import Path
import yaml
logger = logging.getLogger("skill_idea_miner")
PROJECT_ALLOWLIST = [
"claude-trading-skills",
"claude-market-agents",
"trade-edge-finder",
"trade-strategy-pipeline",
"weekly-trade-strategy",
]
CLAUDE_TIMEOUT = 600
CLAUDE_BUDGET_MINE = 1.00
LOOKBACK_DAYS = 7
MAX_USER_MESSAGES_PER_SESSION = 5
MAX_ERROR_OUTPUT_LEN = 500
# Categories rejected by the deterministic post-LLM filter.
# Broad denylist covering non-trading domains.
REJECTED_CATEGORIES = {
"developer-tooling",
"developer-productivity",
"code-navigation",
"documentation-generation",
"skill-development",
"devops",
"ci-cd",
"testing",
"communication",
"project-management",
"customer-support",
"hr",
"meeting",
"email",
"calendar",
"general-productivity",
}
# Keywords in title/description that indicate a non-trading skill.
# Use specific phrases to avoid false positives on trading-adjacent tools
# (e.g. "slack-alert" for trade notifications is legitimate).
REJECTED_KEYWORDS = [
"codebase-navigator",
"code-navigation",
"doc-generator",
"doc-site-generator",
"git-bulk",
"skill-score-optimizer",
"batch-patcher",
"meeting-scheduler",
"meeting-minutes",
"jira-integration",
"confluence-page",
]
AUTOMATED_PROMPT_PREFIXES = [
"# LLM Skill Review Request",
"Improve the skill '",
"Implement the following plan:",
"Score each skill idea candidate",
]
# ── Project discovery ──
def find_project_dirs(
base_dir: Path,
allowlist: list[str] | None = None,
) -> list[tuple[str, Path]]:
"""Scan ~/.claude/projects/ for directories matching allowlist.
Directory encoding: `-Users-username-PycharmProjects-{project}` maps to
the last path segment as the project name.
Returns list of (project_name, dir_path) tuples.
"""
if allowlist is None:
allowlist = PROJECT_ALLOWLIST
if not base_dir.is_dir():
return []
matches: list[tuple[str, Path]] = []
for child in sorted(base_dir.iterdir()):
if not child.is_dir():
continue
# Directory name is encoded as dash-separated absolute path segments.
# The project name is the last segment.
for proj in allowlist:
# Project name may itself contain dashes, so check if the dir name
# ends with the project name (after the path encoding).
if child.name.endswith(f"-{proj}") or child.name == proj:
matches.append((proj, child))
break
return matches
# ── Session log listing ──
def list_session_logs(
project_dirs: list[tuple[str, Path]],
lookback_days: int = LOOKBACK_DAYS,
) -> list[tuple[str, Path]]:
"""Find *.jsonl files in each project dir, filtered by mtime.
Returns list of (project_name, log_path) tuples.
"""
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=lookback_days)
cutoff_ts = cutoff.timestamp()
results: list[tuple[str, Path]] = []
for project_name, dir_path in project_dirs:
if not dir_path.is_dir():
continue
for jsonl_file in sorted(dir_path.glob("*.jsonl")):
if not jsonl_file.is_file():
continue
try:
mtime = jsonl_file.stat().st_mtime
if mtime >= cutoff_ts:
results.append((project_name, jsonl_file))
except OSError:
logger.warning("Could not stat %s", jsonl_file)
return results
# ── Session parsing ──
def parse_session(log_path: Path) -> dict:
"""Parse a JSONL session log file.
Extracts user messages, tool usage, and timestamps.
Skips malformed lines with a warning.
Returns {"user_messages": [...], "tool_uses": [...], "timestamps": [...]}.
"""
user_messages: list[str] = []
tool_uses: list[dict] = []
timestamps: list[str] = []
timed_entries: list[dict] = []
try:
text = log_path.read_text(encoding="utf-8")
except OSError as e:
logger.warning("Could not read %s: %s", log_path, e)
return {"user_messages": [], "tool_uses": [], "timestamps": []}
for line_num, line in enumerate(text.splitlines(), 1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
logger.warning("Malformed JSON at %s:%d, skipping.", log_path.name, line_num)
continue
if not isinstance(entry, dict):
continue
# Extract timestamp
ts = entry.get("timestamp")
if ts:
timestamps.append(ts)
msg = entry.get("message", {})
if not isinstance(msg, dict):
continue
entry_type = entry.get("type") or msg.get("type", "")
# Skip sidechain messages (before timed_entries to avoid contamination)
if entry.get("isSidechain") or msg.get("isSidechain"):
continue
# Track timed entries for unresolved request detection
if ts and entry_type:
timed_entries.append({"timestamp": ts, "type": entry_type})
# User messages
if entry_type == "user" or msg.get("role") == "user":
user_type = entry.get("userType") or msg.get("userType", "")
if user_type != "external":
continue
content = msg.get("content", "")
if isinstance(content, str):
if content.strip():
user_messages.append(content.strip())
elif isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text_val = block.get("text", "").strip()
if text_val:
user_messages.append(text_val)
# Assistant messages → extract tool_use blocks
elif entry_type == "assistant" or msg.get("role") == "assistant":
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
tool_uses.append(
{
"name": block.get("name", ""),
"input": block.get("input", {}),
}
)
# Tool results → store for error detection
elif entry_type == "tool_result":
content = msg.get("content", "")
is_error = entry.get("is_error") or msg.get("is_error", False)
if is_error:
raw = content if isinstance(content, str) else str(content)
tool_uses.append(
{
"name": "__tool_result_error__",
"output": raw[:MAX_ERROR_OUTPUT_LEN],
}
)
elif isinstance(content, str):
# Check for error patterns in tool output
if _has_error_pattern(content):
tool_uses.append(
{
"name": "__tool_result_error__",
"output": content[:MAX_ERROR_OUTPUT_LEN],
}
)
return {
"user_messages": user_messages,
"tool_uses": tool_uses,
"timestamps": timestamps,
"timed_entries": timed_entries,
}
def _has_error_pattern(text: str) -> bool:
"""Check if text contains common error patterns."""
error_patterns = [
r"(?m)^Error:",
r"(?m)^Exception:",
r"(?m)^Traceback \(most recent call last\)",
r"exit code[:\s]+[1-9]",
r"non-zero exit",
]
for pattern in error_patterns:
if re.search(pattern, text):
return True
return False
# ── Signal detection ──
def detect_signals(parsed: dict) -> dict:
"""Deterministic signal detection (no LLM).
Detects:
- skill_usage: references to skills/ in tool args
- errors: error patterns in tool results
- repetitive_patterns: same tool sequence appearing 3+ times
- automation_requests: keyword matches in user messages
- unresolved_requests: user message followed by 5+ min gap without tool_use
Returns dict with signal counts and details.
"""
user_messages = parsed.get("user_messages", [])
tool_uses = parsed.get("tool_uses", [])
signals: dict = {
"skill_usage": _detect_skill_usage(tool_uses),
"errors": _detect_errors(tool_uses),
"repetitive_patterns": _detect_repetitive_patterns(tool_uses),
"automation_requests": _detect_automation_requests(user_messages),
"unresolved_requests": _detect_unresolved_requests(parsed.get("timed_entries", [])),
}
return signals
def _detect_skill_usage(tool_uses: list[dict]) -> dict:
"""Count references to skills/ in tool args (file paths, commands)."""
skill_refs: dict[str, int] = {}
skill_pattern = re.compile(r"skills/([a-zA-Z0-9_-]+)/")
for tool in tool_uses:
tool_input = tool.get("input", {})
# Search in all string values of the input dict
search_text = json.dumps(tool_input) if isinstance(tool_input, dict) else str(tool_input)
for match in skill_pattern.finditer(search_text):
skill_name = match.group(1)
skill_refs[skill_name] = skill_refs.get(skill_name, 0) + 1
return {"count": len(skill_refs), "skills": skill_refs}
def _detect_errors(tool_uses: list[dict]) -> dict:
"""Detect error patterns in tool results."""
error_count = 0
error_samples: list[str] = []
for tool in tool_uses:
if tool.get("name") == "__tool_result_error__":
error_count += 1
output = tool.get("output", "")
if output and len(error_samples) < 5:
error_samples.append(output[:200])
return {"count": error_count, "samples": error_samples}
def _detect_repetitive_patterns(tool_uses: list[dict]) -> dict:
"""Detect same tool sequence (3+ tools) appearing 3+ times."""
tool_names = [
t.get("name", "") for t in tool_uses if t.get("name", "").startswith("__") is False
]
# Filter out error markers
tool_names = [n for n in tool_names if not n.startswith("__")]
if len(tool_names) < 9: # Need at least 3 sequences of 3
return {"count": 0, "patterns": []}
# Check all windows of size 3
sequences: dict[str, int] = {}
for i in range(len(tool_names) - 2):
seq = tuple(tool_names[i : i + 3])
key = " -> ".join(seq)
sequences[key] = sequences.get(key, 0) + 1
repeated = {k: v for k, v in sequences.items() if v >= 3}
return {"count": len(repeated), "patterns": list(repeated.keys())}
def _is_automated_prompt(msg: str) -> bool:
"""Check if a message is an automated Claude -p prompt (not a real user request)."""
stripped = msg.strip()
for prefix in AUTOMATED_PROMPT_PREFIXES:
if stripped.startswith(prefix):
return True
return False
def _detect_automation_requests(user_messages: list[str]) -> dict:
"""Detect automation-related keywords in user messages.
Excludes automated prompts from Claude -p invocations (e.g., skill
improvement loop, scoring pipelines) to avoid false positives.
"""
keywords = [
"skill",
"create",
"automate",
"workflow",
"pipeline",
"スキル",
"作成",
"自動化",
"ワークフロー",
]
matches: list[str] = []
for msg in user_messages:
if _is_automated_prompt(msg):
continue
msg_lower = msg.lower()
for kw in keywords:
if kw.lower() in msg_lower:
if msg[:100] not in matches:
matches.append(msg[:100])
break
return {"count": len(matches), "samples": matches}
_RESPONSE_TYPES = {"assistant", "tool_result", "tool_use"}
def _detect_unresolved_requests(timed_entries: list[dict]) -> dict:
"""Detect user messages followed by 5+ minutes without assistant response.
Only ``assistant``, ``tool_result``, and ``tool_use`` entries count as
a real response. System/progress/queue-operation entries are ignored.
"""
if len(timed_entries) < 2:
return {"count": 0}
gap_count = 0
for i, entry in enumerate(timed_entries):
if entry.get("type") != "user":
continue
t1 = _parse_timestamp(entry.get("timestamp", ""))
if t1 is None:
continue
# Find the next response entry (assistant/tool_use/tool_result)
responded = False
found_response = False
for j in range(i + 1, len(timed_entries)):
next_entry = timed_entries[j]
if next_entry.get("type") not in _RESPONSE_TYPES:
continue # Skip user, system, progress, etc.
found_response = True
t2 = _parse_timestamp(next_entry.get("timestamp", ""))
if t2 is None:
continue
diff = (t2 - t1).total_seconds()
if diff < 300: # Response within 5 minutes
responded = True
break # Found the next response entry
if not found_response:
continue # End of session or no real response — not unresolved
if not responded:
gap_count += 1
return {"count": gap_count}
def _parse_timestamp(ts: str) -> datetime | None:
"""Parse ISO format timestamp string, normalizing to UTC-aware.
Handles the ``Z`` suffix that Python 3.10's fromisoformat() rejects.
"""
try:
if ts.endswith("Z"):
ts = ts[:-1] + "+00:00"
dt = datetime.fromisoformat(ts)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
# ── LLM abstraction ──
def abstract_with_llm(
signals: dict,
user_samples: list[str],
project_name: str,
dry_run: bool = False,
trading_focus: bool = True,
) -> list[dict] | None:
"""Use claude CLI to abstract skill idea candidates from signals.
Returns list of candidate dicts, or None if dry_run or on failure.
"""
if dry_run:
return None
if not shutil.which("claude"):
logger.warning("claude CLI not found; skipping LLM abstraction.")
return None
# Build prompt
prompt = _build_llm_prompt(signals, user_samples, project_name, trading_focus)
# Remove CLAUDECODE env var to allow claude -p from within Claude Code terminals
env = {k: v for k, v in os.environ.items() if k != "CLAUDECODE"}
try:
result = subprocess.run( # nosec B607 – claude CLI is an expected dependency
[
"claude",
"-p",
"--output-format",
"json",
"--max-turns",
"3",
f"--max-budget-usd={CLAUDE_BUDGET_MINE}",
],
input=prompt,
capture_output=True,
text=True,
check=False,
timeout=CLAUDE_TIMEOUT,
env=env,
)
if result.returncode != 0:
logger.warning("claude -p failed: %s", result.stderr.strip()[:200])
return None
logger.debug("claude -p stdout (%d chars): %.500s", len(result.stdout), result.stdout)
response = _extract_json_from_claude(result.stdout, ["candidates"])
if response and "candidates" in response:
return response["candidates"]
logger.warning(
"Could not parse LLM candidates JSON. stdout (%d chars): %.300s",
len(result.stdout),
result.stdout,
)
return None
except subprocess.TimeoutExpired:
logger.warning("claude -p timed out.")
return None
except FileNotFoundError:
logger.warning("claude CLI not found.")
return None
def _build_llm_prompt(
signals: dict,
user_samples: list[str],
project_name: str,
trading_focus: bool = True,
) -> str:
"""Build the LLM prompt for skill idea abstraction.
Args:
trading_focus: When True (default, used with PROJECT_ALLOWLIST), constrain
candidates to trading/investing domain. When False (used with --project
override), use a generic prompt.
"""
parts = [f"Project: {project_name}\n"]
if trading_focus:
parts.extend(
[
"This project is a trading and investing skill library. "
"Analyze the following session signals and user message samples to suggest "
"new Claude skill ideas focused on TRADING, INVESTING, and MARKET ANALYSIS.\n",
"\n## IMPORTANT CONSTRAINTS\n",
"- ONLY propose skills directly related to: stock/options trading, market analysis, "
"portfolio management, risk management, economic/earnings data, technical/fundamental "
"analysis, or trade execution workflows.",
"- DO NOT propose developer-tooling, code-navigation, documentation-generation, "
"or general-purpose software engineering skills. Those belong to a separate project.",
"- Each skill idea must clearly describe how it helps a trader or investor.\n",
]
)
else:
parts.append(
"Analyze the following session signals and user message samples to suggest "
"new Claude skill ideas that would automate or improve the user's workflow.\n",
)
parts.append("\n## Signals\n")
for signal_name, signal_data in signals.items():
count = signal_data.get("count", 0)
parts.append(f"- {signal_name}: {count}")
if isinstance(signal_data, dict):
samples = signal_data.get("samples", [])
skills = signal_data.get("skills", {})
patterns = signal_data.get("patterns", [])
if skills:
parts.append(f" Skills referenced: {', '.join(skills.keys())}")
if samples:
for s in samples[:3]:
parts.append(f" Sample: {s[:100]}")
if patterns:
for p in patterns[:3]:
parts.append(f" Pattern: {p}")
parts.append("")
if user_samples:
parts.append("\n## User Message Samples\n")
for sample in user_samples[:MAX_USER_MESSAGES_PER_SESSION]:
parts.append(f"- {sample[:200]}")
parts.append(
"\n\nReturn 1-5 skill idea candidates as a single JSON object. "
"Do NOT use markdown or natural language. Output ONLY valid JSON.\n"
"Required format:\n"
'{"candidates": [{"title": "...", "description": "...", "category": "...", '
'"rationale": "...", "priority": "high|medium|low", '
'"signals_used": ["..."]}]}'
)
return "\n".join(parts)
def _extract_json_from_claude(output: str, required_keys: list[str]) -> dict | None:
"""Extract JSON from claude CLI --output-format json envelope.
Unwraps the envelope (result or content[].text), then scans for
the first JSON object containing any of the required_keys.
"""
# claude --output-format json wraps response; try to extract inner JSON
try:
wrapper = json.loads(output)
text = ""
if isinstance(wrapper, dict):
text = wrapper.get("result", "") or ""
if not text:
content = wrapper.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text += block.get("text", "")
if not text:
text = output
except json.JSONDecodeError:
text = output
# Find JSON block using raw_decode
decoder = json.JSONDecoder()
idx = 0
while idx < len(text):
pos = text.find("{", idx)
if pos == -1:
break
try:
obj, end_idx = decoder.raw_decode(text, pos)
if isinstance(obj, dict) and any(k in obj for k in required_keys):
return obj
idx = pos + 1
except json.JSONDecodeError:
idx = pos + 1
return None
def filter_non_trading_candidates(candidates: list[dict]) -> list[dict]:
"""Deterministic post-LLM filter: reject candidates outside trading domain."""
accepted = []
for c in candidates:
category = str(c.get("category") or "").lower()
title = str(c.get("title") or "").lower()
desc = str(c.get("description") or "").lower()
if category in REJECTED_CATEGORIES:
logger.info("Filtered out '%s' (rejected category: %s)", c.get("title"), category)
continue
text = f"{title} {desc}"
if any(kw in text for kw in REJECTED_KEYWORDS):
logger.info("Filtered out '%s' (rejected keyword match)", c.get("title"))
continue
accepted.append(c)
return accepted
# ── Output helpers ──
def _write_empty_output(output_dir: Path, lookback_days: int) -> None:
"""Write an empty raw_candidates.yaml when there are no sessions to process."""
output = {
"generated_at_utc": datetime.now(tz=timezone.utc).isoformat(),
"lookback_days": lookback_days,
"sessions_analyzed": 0,
"aggregated_signals": {},
"session_details": [],
"candidates": [],
}
output_path = output_dir / "raw_candidates.yaml"
output_path.write_text(
yaml.safe_dump(output, sort_keys=False, allow_unicode=True),
encoding="utf-8",
)
logger.info("Wrote empty raw candidates to %s", output_path)
# ── Main entry point ──
def run(args: argparse.Namespace) -> int:
"""Main entry point for session log mining."""
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Determine base directory for project scanning
claude_dir = Path.home() / ".claude" / "projects"
# Determine allowlist
if args.project:
allowlist = [args.project]
else:
allowlist = PROJECT_ALLOWLIST
# Find project directories
project_dirs = find_project_dirs(claude_dir, allowlist)
if not project_dirs:
logger.warning("No matching project directories found in %s", claude_dir)
_write_empty_output(output_dir, args.lookback_days)
return 0
logger.info("Found %d project directories.", len(project_dirs))
# List session logs
session_logs = list_session_logs(project_dirs, lookback_days=args.lookback_days)
if not session_logs:
logger.info("No recent session logs found (lookback=%d days).", args.lookback_days)
_write_empty_output(output_dir, args.lookback_days)
return 0
logger.info("Found %d session logs.", len(session_logs))
# Parse and detect signals per session
all_signals: list[dict] = []
all_user_samples: list[str] = []
for project_name, log_path in session_logs:
logger.info("Parsing %s (%s)", log_path.name, project_name)
parsed = parse_session(log_path)
signals = detect_signals(parsed)
# Collect user message samples
user_samples = parsed.get("user_messages", [])[:MAX_USER_MESSAGES_PER_SESSION]
all_user_samples.extend(user_samples)
all_signals.append(
{
"project": project_name,
"session": log_path.name,
"signals": signals,
"user_message_count": len(parsed.get("user_messages", [])),
"tool_use_count": len(parsed.get("tool_uses", [])),
}
)
# LLM abstraction (optional)
# Aggregate signals across sessions
aggregated = _aggregate_signals(all_signals)
# --project overrides the default trading-focused allowlist
trading_focus = args.project is None
candidates = abstract_with_llm(
aggregated,
all_user_samples[:MAX_USER_MESSAGES_PER_SESSION],
project_name=", ".join(set(p for p, _ in project_dirs)),
dry_run=args.dry_run,
trading_focus=trading_focus,
)
# Normalize candidates before filtering: name -> title, assign ids
if candidates:
date_str = datetime.now(tz=timezone.utc).strftime("%Y%m%d")
for i, c in enumerate(candidates):
c["id"] = f"raw_{date_str}_{i + 1:03d}"
# Handle backward compatibility: LLM might output 'name' instead of 'title',
# or return title as null alongside a valid 'name'.
if (not c.get("title")) and c.get("name"):
c["title"] = c.pop("name")
# Deterministic domain filter (only when using default allowlist)
if candidates and trading_focus:
candidates = filter_non_trading_candidates(candidates)
# Write output
output = {
"generated_at_utc": datetime.now(tz=timezone.utc).isoformat(),
"lookback_days": args.lookback_days,
"sessions_analyzed": len(session_logs),
"aggregated_signals": aggregated,
"session_details": all_signals,
}
output["candidates"] = candidates if candidates else []
output_path = output_dir / "raw_candidates.yaml"
output_path.write_text(
yaml.safe_dump(output, sort_keys=False, allow_unicode=True),
encoding="utf-8",
)
logger.info("Wrote raw candidates to %s", output_path)
return 0
def _aggregate_signals(all_signals: list[dict]) -> dict:
"""Aggregate signals across multiple sessions."""
totals: dict = {
"skill_usage": {"count": 0, "skills": {}},
"errors": {"count": 0, "samples": []},
"repetitive_patterns": {"count": 0, "patterns": []},
"automation_requests": {"count": 0, "samples": []},
"unresolved_requests": {"count": 0},
}
for entry in all_signals:
signals = entry.get("signals", {})
for key in totals:
sig = signals.get(key, {})
totals[key]["count"] = totals[key].get("count", 0) + sig.get("count", 0)
if key == "skill_usage":
for skill, cnt in sig.get("skills", {}).items():
totals[key]["skills"][skill] = totals[key]["skills"].get(skill, 0) + cnt
elif key in ("errors", "automation_requests"):
samples = sig.get("samples", [])
totals[key]["samples"].extend(samples[:3])
elif key == "repetitive_patterns":
patterns = sig.get("patterns", [])
totals[key]["patterns"].extend(patterns[:3])
return totals
# ── CLI ──
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Mine Claude Code session logs for skill idea candidates."
)
parser.add_argument(
"--output-dir",
default="reports",
help="Output directory for raw_candidates.yaml (default: reports)",
)
parser.add_argument(
"--project",
default=None,
help="Override allowlist with a single project name",
)
parser.add_argument(
"--lookback-days",
type=int,
default=LOOKBACK_DAYS,
help=f"Number of days to look back for session logs (default: {LOOKBACK_DAYS})",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Skip LLM abstraction step",
)
return parser.parse_args(argv)
def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
args = parse_args()
return run(args)
if __name__ == "__main__":
raise SystemExit(main())
```
### scripts/score_ideas.py
```python
#!/usr/bin/env python3
"""Score and deduplicate skill idea candidates."""
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import shutil
import subprocess
import tempfile
from datetime import datetime, timezone
from pathlib import Path
import yaml
logger = logging.getLogger("skill_idea_scorer")
CLAUDE_TIMEOUT = 600
CLAUDE_BUDGET_SCORE = 0.50
JACCARD_THRESHOLD = 0.5
# ── Existing skill discovery ──
def list_existing_skills(project_root: Path) -> list[dict]:
"""Discover existing skills by globbing skills/*/SKILL.md (single level only).
Returns list of {"name": str, "description": str}.
"""
skills_dir = project_root / "skills"
if not skills_dir.is_dir():
return []
results = []
for child in sorted(skills_dir.iterdir()):
if not child.is_dir():
continue
skill_md = child / "SKILL.md"
if not skill_md.exists():
continue
try:
text = skill_md.read_text(encoding="utf-8")
fm = _parse_yaml_frontmatter(text)
if fm and "name" in fm:
results.append(
{
"name": fm.get("name", child.name),
"description": fm.get("description", ""),
}
)
except (OSError, yaml.YAMLError):
logger.debug("Skipping %s: invalid YAML frontmatter.", skill_md)
return results
def _parse_yaml_frontmatter(text: str) -> dict | None:
"""Extract YAML frontmatter between --- markers."""
if not text.startswith("---"):
return None
end = text.find("---", 3)
if end == -1:
return None
try:
return yaml.safe_load(text[3:end])
except yaml.YAMLError:
return None
# ── Text similarity ──
def normalize_text(text: str) -> set[str]:
"""Lowercase, remove punctuation, split into word set."""
cleaned = re.sub(r"[^\w\s]", "", text.lower())
return set(cleaned.split())
def jaccard_similarity(text_a: str, text_b: str) -> float:
"""Compute Jaccard coefficient between word sets of two texts."""
set_a = normalize_text(text_a)
set_b = normalize_text(text_b)
if not set_a or not set_b:
return 0.0
intersection = set_a & set_b
union = set_a | set_b
return len(intersection) / len(union)
# ── Deduplication ──
def find_duplicates(
candidates: list[dict],
existing_skills: list[dict],
backlog_ideas: list[dict],
) -> list[dict]:
"""Check candidates against existing skills and backlog ideas for duplicates.
Annotates each candidate with status="duplicate" and duplicate_of if
Jaccard similarity exceeds JACCARD_THRESHOLD.
"""
for candidate in candidates:
cand_text = f"{candidate.get('title', '')} {candidate.get('description', '')}"
# Check against existing skills
for skill in existing_skills:
ref_text = f"{skill.get('name', '')} {skill.get('description', '')}"
sim = jaccard_similarity(cand_text, ref_text)
if sim > JACCARD_THRESHOLD:
candidate["status"] = "duplicate"
candidate["duplicate_of"] = f"skill:{skill.get('name', '')}"
candidate["jaccard_score"] = round(sim, 3)
break
if candidate.get("status") == "duplicate":
continue
# Check against backlog ideas
for idea in backlog_ideas:
ref_text = f"{idea.get('title', '')} {idea.get('description', '')}"
sim = jaccard_similarity(cand_text, ref_text)
if sim > JACCARD_THRESHOLD:
candidate["status"] = "duplicate"
candidate["duplicate_of"] = f"backlog:{idea.get('id', '')}"
candidate["jaccard_score"] = round(sim, 3)
break
return candidates
# ── LLM scoring ──
def score_with_llm(candidates: list[dict], dry_run: bool = False) -> list[dict]:
"""Score non-duplicate candidates using Claude CLI.
Scores each candidate on novelty, feasibility, and trading_value (0-100).
Composite = 0.3 * novelty + 0.3 * feasibility + 0.4 * trading_value.
"""
scorable = [c for c in candidates if c.get("status") != "duplicate"]
if not scorable:
logger.info("No candidates to score (all duplicates or empty).")
return candidates
if dry_run:
for c in scorable:
c["scores"] = {
"novelty": 0,
"feasibility": 0,
"trading_value": 0,
"composite": 0,
}
return candidates
if not shutil.which("claude"):
logger.warning("claude CLI not found; setting zero scores.")
for c in scorable:
c["scores"] = {
"novelty": 0,
"feasibility": 0,
"trading_value": 0,
"composite": 0,
}
return candidates
# Build scoring prompt
candidate_descriptions = []
for i, c in enumerate(scorable):
candidate_descriptions.append(
f"{i + 1}. ID: {c.get('id', f'unknown_{i}')}\n"
f" Title: {c.get('title', 'N/A')}\n"
f" Description: {c.get('description', 'N/A')}\n"
f" Category: {c.get('category', 'N/A')}"
)
prompt = (
"Score each skill idea candidate on three dimensions (0-100 each):\n"
"- novelty: How unique is this idea compared to typical trading tools?\n"
"- feasibility: How practical is it to implement as a Claude skill?\n"
"- trading_value: How useful is this for equity traders/investors?\n\n"
"Candidates:\n" + "\n".join(candidate_descriptions) + "\n\n"
"Return scores for ALL candidates as a single JSON object. "
"Do NOT use markdown or natural language. Output ONLY valid JSON.\n"
"Required format:\n"
'{"candidates": [{"id": "...", "novelty": 0, "feasibility": 0, "trading_value": 0}]}'
)
# Remove CLAUDECODE env var to allow claude -p from within Claude Code terminals
env = {k: v for k, v in os.environ.items() if k != "CLAUDECODE"}
try:
result = subprocess.run( # nosec B607 – claude CLI is an expected dependency
[
"claude",
"-p",
"--output-format",
"json",
"--max-turns",
"3",
f"--max-budget-usd={CLAUDE_BUDGET_SCORE}",
],
input=prompt,
capture_output=True,
text=True,
check=False,
timeout=CLAUDE_TIMEOUT,
env=env,
)
if result.returncode != 0:
logger.warning("claude scoring failed: %s", result.stderr.strip()[:200])
for c in scorable:
c["scores"] = {
"novelty": 0,
"feasibility": 0,
"trading_value": 0,
"composite": 0,
}
return candidates
parsed = _extract_json_from_claude(result.stdout, ["scores", "candidates"])
if parsed and "candidates" in parsed:
score_map = {s.get("id", ""): s for s in parsed.get("candidates", [])}
for c in scorable:
cid = c.get("id", "")
if cid in score_map:
s = score_map[cid]
n = s.get("novelty", 0)
f = s.get("feasibility", 0)
t = s.get("trading_value", 0)
c["scores"] = {
"novelty": n,
"feasibility": f,
"trading_value": t,
"composite": round(0.3 * n + 0.3 * f + 0.4 * t, 1),
}
else:
c["scores"] = {
"novelty": 0,
"feasibility": 0,
"trading_value": 0,
"composite": 0,
}
else:
logger.warning("Could not parse LLM scoring output.")
for c in scorable:
c["scores"] = {
"novelty": 0,
"feasibility": 0,
"trading_value": 0,
"composite": 0,
}
except subprocess.TimeoutExpired:
logger.warning("claude scoring timed out.")
for c in scorable:
c["scores"] = {"novelty": 0, "feasibility": 0, "trading_value": 0, "composite": 0}
except FileNotFoundError:
logger.warning("claude CLI not found during execution.")
for c in scorable:
c["scores"] = {"novelty": 0, "feasibility": 0, "trading_value": 0, "composite": 0}
return candidates
def _extract_json_from_claude(output: str, required_keys: list[str]) -> dict | None:
"""Extract JSON from claude CLI --output-format json envelope.
Unwraps the envelope (result or content[].text), then scans for
the first JSON object containing any of the required_keys.
"""
# Try parsing the wrapper envelope first
try:
wrapper = json.loads(output)
text = ""
if isinstance(wrapper, dict):
text = wrapper.get("result", "") or ""
if not text:
content = wrapper.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text += block.get("text", "")
if not text:
text = output
except json.JSONDecodeError:
text = output
# Find JSON block using raw_decode
decoder = json.JSONDecoder()
idx = 0
while idx < len(text):
pos = text.find("{", idx)
if pos == -1:
break
try:
obj, end_idx = decoder.raw_decode(text, pos)
if isinstance(obj, dict) and any(k in obj for k in required_keys):
return obj
idx = pos + 1
except json.JSONDecodeError:
idx = pos + 1
return None
# ── Backlog management ──
def load_backlog(backlog_path: Path) -> dict:
"""Load existing backlog YAML or return empty structure."""
if backlog_path.exists():
try:
data = yaml.safe_load(backlog_path.read_text(encoding="utf-8"))
if isinstance(data, dict) and "ideas" in data:
return data
except (OSError, yaml.YAMLError):
logger.warning("Failed to load backlog from %s; starting fresh.", backlog_path)
return {"updated_at_utc": "", "ideas": []}
def merge_into_backlog(backlog: dict, scored_candidates: list[dict]) -> dict:
"""Add new ideas to backlog, preserving existing statuses.
Candidates with matching id are skipped (not duplicated).
Existing idea fields (especially status) are never overwritten.
"""
existing_ids = {idea.get("id") for idea in backlog.get("ideas", [])}
for candidate in scored_candidates:
cid = candidate.get("id")
if not cid or cid in existing_ids:
continue
idea = {
"id": cid,
"title": candidate.get("title", ""),
"description": candidate.get("description", ""),
"category": candidate.get("category", ""),
"source_project": candidate.get("source_project", ""),
"source_raw_ids": candidate.get("source_raw_ids", []),
"scores": candidate.get("scores", {}),
"status": candidate.get("status", "pending"),
"created_at": candidate.get(
"created_at", datetime.now(timezone.utc).strftime("%Y-%m-%d")
),
"attempted_at": None,
"pr_url": None,
}
backlog["ideas"].append(idea)
existing_ids.add(cid)
backlog["updated_at_utc"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
return backlog
def save_backlog(backlog_path: Path, backlog: dict) -> None:
"""Save backlog to YAML atomically via tempfile + os.replace."""
backlog_path.parent.mkdir(parents=True, exist_ok=True)
content = yaml.safe_dump(backlog, default_flow_style=False, sort_keys=False, allow_unicode=True)
fd, tmp_path = tempfile.mkstemp(dir=backlog_path.parent, suffix=".tmp", prefix=".backlog_")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(content)
os.replace(tmp_path, backlog_path)
except BaseException:
os.unlink(tmp_path)
raise
logger.info("Backlog saved to %s (%d ideas).", backlog_path, len(backlog.get("ideas", [])))
# ── Main ──
def run(args) -> int:
"""Main entry point for scoring and deduplication."""
project_root = Path(args.project_root).resolve()
candidates_path = Path(args.candidates)
if not candidates_path.is_absolute():
candidates_path = project_root / candidates_path
backlog_path = Path(args.backlog)
if not backlog_path.is_absolute():
backlog_path = project_root / backlog_path
# Load candidates
if not candidates_path.exists():
logger.error("Candidates file not found: %s", candidates_path)
return 1
try:
raw = yaml.safe_load(candidates_path.read_text(encoding="utf-8"))
except (OSError, yaml.YAMLError) as exc:
logger.error("Failed to load candidates: %s", exc)
return 1
candidates = raw.get("candidates", []) if isinstance(raw, dict) else []
if not candidates:
logger.info("No candidates to process.")
return 0
logger.info("Loaded %d candidates from %s.", len(candidates), candidates_path)
# Discover existing skills
existing_skills = list_existing_skills(project_root)
logger.info("Found %d existing skills.", len(existing_skills))
# Load backlog
backlog = load_backlog(backlog_path)
backlog_ideas = backlog.get("ideas", [])
logger.info("Loaded backlog with %d existing ideas.", len(backlog_ideas))
# Deduplication
candidates = find_duplicates(candidates, existing_skills, backlog_ideas)
dup_count = sum(1 for c in candidates if c.get("status") == "duplicate")
logger.info("Marked %d candidates as duplicates.", dup_count)
# LLM scoring
candidates = score_with_llm(candidates, dry_run=args.dry_run)
# Merge into backlog
backlog = merge_into_backlog(backlog, candidates)
save_backlog(backlog_path, backlog)
# Summary
scored = [c for c in candidates if c.get("scores", {}).get("composite", 0) > 0]
logger.info(
"Done. %d scored, %d duplicates, %d total in backlog.",
len(scored),
dup_count,
len(backlog.get("ideas", [])),
)
return 0
def parse_args():
parser = argparse.ArgumentParser(description="Score and deduplicate skill idea candidates")
parser.add_argument(
"--candidates",
default="reports/skill-idea-miner/raw_candidates.yaml",
help="Path to raw_candidates.yaml",
)
parser.add_argument("--project-root", default=".", help="Project root directory")
parser.add_argument(
"--backlog",
default="reports/skill-idea-miner/idea_backlog.yaml",
help="Path to backlog YAML file",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Skip LLM scoring; set zero scores",
)
return parser.parse_args()
def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
args = parse_args()
return run(args)
if __name__ == "__main__":
raise SystemExit(main())
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### scripts/tests/conftest.py
```python
"""Shared fixtures for skill-idea-miner tests."""
from __future__ import annotations
import sys
from pathlib import Path
# Add scripts dir to path for imports
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
```
### scripts/tests/test_mine_session_logs.py
```python
"""Tests for mine_session_logs.py."""
from __future__ import annotations
import importlib.util
import json
import os
import sys
import time
from pathlib import Path
import pytest
@pytest.fixture(scope="module")
def mine_module():
"""Load mine_session_logs.py as a module."""
script_path = Path(__file__).resolve().parents[1] / "mine_session_logs.py"
spec = importlib.util.spec_from_file_location("mine_session_logs", script_path)
if spec is None or spec.loader is None:
raise RuntimeError("Failed to load mine_session_logs.py")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
# ── find_project_dirs ──
def test_find_project_dirs(mine_module, tmp_path: Path):
"""Create mock dirs matching allowlist encoding, verify matches."""
# Simulate ~/.claude/projects/ directory structure
base = tmp_path / "projects"
base.mkdir()
# Directory encoding: dash-separated absolute path
(base / "-Users-alice-PycharmProjects-claude-trading-skills").mkdir()
(base / "-Users-bob-Code-trade-edge-finder").mkdir()
(base / "-Users-carol-Projects-unrelated-project").mkdir()
allowlist = ["claude-trading-skills", "trade-edge-finder"]
result = mine_module.find_project_dirs(base, allowlist)
assert len(result) == 2
names = [name for name, _ in result]
assert "claude-trading-skills" in names
assert "trade-edge-finder" in names
def test_find_project_dirs_no_match(mine_module, tmp_path: Path):
"""No matching dirs returns empty list."""
base = tmp_path / "projects"
base.mkdir()
(base / "-Users-alice-Projects-something-else").mkdir()
result = mine_module.find_project_dirs(base, ["claude-trading-skills"])
assert result == []
# ── list_session_logs ──
def test_list_session_logs_date_filter(mine_module, tmp_path: Path):
"""Create files with recent and old mtime, verify filter."""
proj_dir = tmp_path / "proj"
proj_dir.mkdir()
# Recent file (now)
recent = proj_dir / "recent_session.jsonl"
recent.write_text('{"type":"user"}\n')
# Old file (30 days ago)
old = proj_dir / "old_session.jsonl"
old.write_text('{"type":"user"}\n')
old_time = time.time() - (30 * 86400)
os.utime(old, (old_time, old_time))
project_dirs = [("test-project", proj_dir)]
result = mine_module.list_session_logs(project_dirs, lookback_days=7)
assert len(result) == 1
assert result[0][0] == "test-project"
assert result[0][1].name == "recent_session.jsonl"
# ── parse_session ──
def test_parse_user_messages_str(mine_module, tmp_path: Path):
"""Parse JSONL with string content format."""
log = tmp_path / "session.jsonl"
lines = [
json.dumps(
{
"type": "user",
"message": {"type": "user", "content": "Analyze AAPL"},
"userType": "external",
"timestamp": "2026-02-28T10:00:00+00:00",
}
),
json.dumps(
{
"type": "user",
"message": {"type": "user", "content": "Check breadth"},
"userType": "external",
"timestamp": "2026-02-28T10:01:00+00:00",
}
),
]
log.write_text("\n".join(lines))
result = mine_module.parse_session(log)
assert len(result["user_messages"]) == 2
assert result["user_messages"][0] == "Analyze AAPL"
assert result["user_messages"][1] == "Check breadth"
def test_parse_user_messages_list(mine_module, tmp_path: Path):
"""Parse JSONL with list[{type,text}] content format."""
log = tmp_path / "session.jsonl"
lines = [
json.dumps(
{
"type": "user",
"message": {
"type": "user",
"content": [
{"type": "text", "text": "Create a new skill"},
{"type": "text", "text": "for dividend analysis"},
],
},
"userType": "external",
"timestamp": "2026-02-28T10:00:00+00:00",
}
),
]
log.write_text("\n".join(lines))
result = mine_module.parse_session(log)
assert len(result["user_messages"]) == 2
assert result["user_messages"][0] == "Create a new skill"
assert result["user_messages"][1] == "for dividend analysis"
def test_parse_tool_usage(mine_module, tmp_path: Path):
"""Extract tool_use blocks from assistant messages."""
log = tmp_path / "session.jsonl"
lines = [
json.dumps(
{
"type": "assistant",
"message": {
"type": "assistant",
"content": [
{
"type": "tool_use",
"name": "Bash",
"input": {
"command": "python3 skills/pead-screener/scripts/screen_pead.py"
},
},
{
"type": "tool_use",
"name": "Read",
"input": {"file_path": "/tmp/report.md"},
},
],
},
"timestamp": "2026-02-28T10:00:00+00:00",
}
),
]
log.write_text("\n".join(lines))
result = mine_module.parse_session(log)
tool_uses = [t for t in result["tool_uses"] if not t["name"].startswith("__")]
assert len(tool_uses) == 2
assert tool_uses[0]["name"] == "Bash"
assert tool_uses[1]["name"] == "Read"
def test_parse_malformed_jsonl(mine_module, tmp_path: Path):
"""Bad lines are skipped, good lines parsed."""
log = tmp_path / "session.jsonl"
lines = [
"this is not json",
json.dumps(
{
"type": "user",
"message": {"type": "user", "content": "Valid message"},
"userType": "external",
"timestamp": "2026-02-28T10:00:00+00:00",
}
),
"{broken json",
json.dumps(
{
"type": "user",
"message": {"type": "user", "content": "Another valid one"},
"userType": "external",
"timestamp": "2026-02-28T10:01:00+00:00",
}
),
]
log.write_text("\n".join(lines))
result = mine_module.parse_session(log)
assert len(result["user_messages"]) == 2
assert result["user_messages"][0] == "Valid message"
assert result["user_messages"][1] == "Another valid one"
# ── detect_signals ──
def test_detect_skill_usage(mine_module):
"""Detect skills/ references in tool args."""
tool_uses = [
{
"name": "Bash",
"input": {"command": "python3 skills/earnings-trade-analyzer/scripts/run.py"},
},
{
"name": "Read",
"input": {"file_path": "skills/pead-screener/SKILL.md"},
},
{
"name": "Bash",
"input": {"command": "ls -la"},
},
]
result = mine_module._detect_skill_usage(tool_uses)
assert result["count"] == 2
assert "earnings-trade-analyzer" in result["skills"]
assert "pead-screener" in result["skills"]
def test_detect_errors(mine_module):
"""Detect error patterns in tool results."""
tool_uses = [
{"name": "__tool_result_error__", "output": "Error: API key missing"},
{"name": "__tool_result_error__", "output": "Traceback (most recent call last):\n..."},
{"name": "Bash", "input": {"command": "echo hello"}},
]
result = mine_module._detect_errors(tool_uses)
assert result["count"] == 2
assert len(result["samples"]) == 2
def test_detect_automation_requests(mine_module):
"""Detect automation keywords in user messages."""
messages = [
"Can you create a skill for this?",
"Just run the analysis",
"I want to automate this workflow",
"スキルを作成してほしい",
]
result = mine_module._detect_automation_requests(messages)
assert result["count"] == 3
assert len(result["samples"]) == 3
def test_detect_automation_requests_excludes_automated_prompts(mine_module):
"""Claude -p automated prompts are excluded from automation_requests."""
messages = [
"# LLM Skill Review Request\nPlease review this skill...",
"Improve the skill 'backtest-expert' using the review results below.",
"Implement the following plan:\n1. Create skill...",
"Score each skill idea candidate on three dimensions...",
"Can you create a skill for this?", # Real user request
]
result = mine_module._detect_automation_requests(messages)
assert result["count"] == 1
assert "create a skill" in result["samples"][0].lower()
def test_is_automated_prompt(mine_module):
"""_is_automated_prompt correctly identifies automated prompts."""
assert mine_module._is_automated_prompt("# LLM Skill Review Request\nContent...")
assert mine_module._is_automated_prompt("Improve the skill 'x' using...")
assert mine_module._is_automated_prompt("Implement the following plan:\n...")
assert mine_module._is_automated_prompt("Score each skill idea candidate on...")
assert not mine_module._is_automated_prompt("Can you create a skill?")
assert not mine_module._is_automated_prompt("I want to automate this")
# ── _detect_unresolved_requests ──
def test_detect_unresolved_requests_no_gap(mine_module):
"""User message followed by quick assistant response is not unresolved."""
timed_entries = [
{"timestamp": "2026-02-28T10:00:00+00:00", "type": "user"},
{"timestamp": "2026-02-28T10:00:30+00:00", "type": "assistant"},
]
result = mine_module._detect_unresolved_requests(timed_entries)
assert result["count"] == 0
def test_detect_unresolved_requests_with_gap(mine_module):
"""User message followed by 10-min gap before response is unresolved."""
timed_entries = [
{"timestamp": "2026-02-28T10:00:00+00:00", "type": "user"},
{"timestamp": "2026-02-28T10:10:00+00:00", "type": "assistant"},
]
result = mine_module._detect_unresolved_requests(timed_entries)
assert result["count"] == 1
def test_detect_unresolved_requests_user_then_user(mine_module):
"""Consecutive user messages with gap: first is unresolved."""
timed_entries = [
{"timestamp": "2026-02-28T10:00:00+00:00", "type": "user"},
{"timestamp": "2026-02-28T10:06:00+00:00", "type": "user"},
{"timestamp": "2026-02-28T10:06:10+00:00", "type": "assistant"},
]
result = mine_module._detect_unresolved_requests(timed_entries)
# First user: next non-user is assistant at +6:10 (>5min) → unresolved
# Second user: next non-user is assistant at +0:10 (<5min) → resolved
assert result["count"] == 1
# ── _extract_json_from_claude ──
def test_extract_json_from_claude_candidates(mine_module):
"""JSON with candidates key is extracted."""
raw = json.dumps(
{
"candidates": [
{
"name": "test-skill",
"description": "A test",
"rationale": "Because",
"priority": "high",
},
],
}
)
result = mine_module._extract_json_from_claude(raw, ["candidates"])
assert result is not None
assert "candidates" in result
assert len(result["candidates"]) == 1
def test_extract_json_from_claude_wrapped(mine_module):
"""JSON wrapped in claude --output-format json envelope."""
inner = json.dumps(
{
"candidates": [{"name": "x", "description": "y", "rationale": "z", "priority": "low"}],
}
)
wrapper = json.dumps({"result": f"Here are the ideas:\n{inner}\nDone."})
result = mine_module._extract_json_from_claude(wrapper, ["candidates"])
assert result is not None
assert result["candidates"][0]["name"] == "x"
def test_extract_json_from_claude_no_candidates(mine_module):
"""JSON without 'candidates' key returns None."""
raw = '{"score": 85, "summary": "review"}'
result = mine_module._extract_json_from_claude(raw, ["candidates"])
assert result is None
# ── Real session format: entry.type != msg.type ──
def test_parse_assistant_with_message_type(mine_module, tmp_path: Path):
"""Real session format: entry.type=assistant, msg.type=message, msg.role=assistant."""
log = tmp_path / "session.jsonl"
entry = {
"type": "assistant",
"message": {
"type": "message",
"role": "assistant",
"content": [
{
"type": "tool_use",
"name": "Read",
"input": {"file_path": "skills/pead-screener/SKILL.md"},
},
],
},
"timestamp": "2026-02-28T10:00:00+00:00",
}
log.write_text(json.dumps(entry))
result = mine_module.parse_session(log)
tool_uses = [t for t in result["tool_uses"] if not t["name"].startswith("__")]
assert len(tool_uses) == 1
assert tool_uses[0]["name"] == "Read"
def test_timed_entries_correct_types(mine_module, tmp_path: Path):
"""timed_entries records entry-level type, not message-level type."""
log = tmp_path / "session.jsonl"
lines = [
json.dumps(
{
"type": "user",
"message": {"type": "user", "content": "Hello"},
"userType": "external",
"timestamp": "2026-02-28T10:00:00+00:00",
}
),
json.dumps(
{
"type": "assistant",
"message": {
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "Hi"}],
},
"timestamp": "2026-02-28T10:00:05+00:00",
}
),
]
log.write_text("\n".join(lines))
result = mine_module.parse_session(log)
assert result["timed_entries"][0]["type"] == "user"
assert result["timed_entries"][1]["type"] == "assistant"
def test_parse_assistant_role_fallback(mine_module, tmp_path: Path):
"""When entry has no type but msg.role=assistant, tool_use blocks are extracted."""
log = tmp_path / "session.jsonl"
entry = {
"message": {
"type": "message",
"role": "assistant",
"content": [
{
"type": "tool_use",
"name": "Bash",
"input": {"command": "ls"},
},
],
},
"timestamp": "2026-02-28T10:00:00+00:00",
}
log.write_text(json.dumps(entry))
result = mine_module.parse_session(log)
tool_uses = [t for t in result["tool_uses"] if not t["name"].startswith("__")]
assert len(tool_uses) == 1
assert tool_uses[0]["name"] == "Bash"
# ── find_project_dirs endswith fix ──
def test_find_project_dirs_no_false_positive(mine_module, tmp_path: Path):
"""Suffix match without dash boundary should not match."""
base = tmp_path / "projects"
base.mkdir()
# "notclaude-trading-skills" ends with "claude-trading-skills" but lacks
# a dash boundary separating the path segment from the project name.
(base / "-Users-alice-notclaude-trading-skills").mkdir()
# This SHOULD match (proper dash boundary)
(base / "-Users-bob-PycharmProjects-claude-trading-skills").mkdir()
result = mine_module.find_project_dirs(base, ["claude-trading-skills"])
assert len(result) == 1
assert result[0][1].name == "-Users-bob-PycharmProjects-claude-trading-skills"
def test_find_project_dirs_exact_name(mine_module, tmp_path: Path):
"""Directory with exact project name matches."""
base = tmp_path / "projects"
base.mkdir()
(base / "claude-trading-skills").mkdir()
result = mine_module.find_project_dirs(base, ["claude-trading-skills"])
assert len(result) == 1
assert result[0][0] == "claude-trading-skills"
# ── Fix A: _parse_timestamp timezone normalization ──
def test_parse_timestamp_naive_gets_utc(mine_module):
"""Naive timestamps are normalized to UTC-aware."""
from datetime import timezone
dt = mine_module._parse_timestamp("2026-03-01T10:00:00")
assert dt is not None
assert dt.tzinfo is not None
assert dt.tzinfo == timezone.utc
def test_parse_timestamp_aware_preserved(mine_module):
"""Aware timestamps keep their original tzinfo."""
from datetime import timedelta
dt = mine_module._parse_timestamp("2026-03-01T10:00:00+09:00")
assert dt is not None
assert dt.utcoffset() == timedelta(hours=9)
def test_unresolved_requests_mixed_tz(mine_module):
"""Naive and aware timestamps can be subtracted without TypeError."""
timed_entries = [
{"timestamp": "2026-03-01T10:00:00", "type": "user"},
{"timestamp": "2026-03-01T10:10:00+00:00", "type": "assistant"},
]
result = mine_module._detect_unresolved_requests(timed_entries)
assert result["count"] == 1 # 10-min gap → unresolved
# ── Fix A2: sidechain contamination ──
def test_sidechain_excluded_from_timed_entries(mine_module, tmp_path: Path):
"""Sidechain messages should not appear in timed_entries."""
log = tmp_path / "session.jsonl"
lines = [
json.dumps(
{
"type": "user",
"message": {"type": "user", "content": "Hello"},
"userType": "external",
"timestamp": "2026-03-01T10:00:00+00:00",
}
),
json.dumps(
{
"type": "assistant",
"message": {"type": "assistant", "content": [{"type": "text", "text": "Hi"}]},
"isSidechain": True,
"timestamp": "2026-03-01T10:00:05+00:00",
}
),
json.dumps(
{
"type": "assistant",
"message": {"type": "assistant", "content": [{"type": "text", "text": "Real"}]},
"timestamp": "2026-03-01T10:00:10+00:00",
}
),
]
log.write_text("\n".join(lines))
result = mine_module.parse_session(log)
# Sidechain entry should NOT be in timed_entries
types = [e["type"] for e in result["timed_entries"]]
assert len(types) == 2 # user + non-sidechain assistant
assert types == ["user", "assistant"]
# ── Fix A3: end-of-session exclusion ──
def test_unresolved_requests_end_of_session(mine_module):
"""User message at end of session (no following non-user entry) is NOT unresolved."""
timed_entries = [
{"timestamp": "2026-03-01T10:00:00+00:00", "type": "user"},
{"timestamp": "2026-03-01T10:00:05+00:00", "type": "assistant"},
{"timestamp": "2026-03-01T10:05:00+00:00", "type": "user"},
# No assistant response after this — end of session
]
result = mine_module._detect_unresolved_requests(timed_entries)
assert result["count"] == 0 # Last user message should not count
# ── Fix D: MAX_ERROR_OUTPUT_LEN truncation ──
def test_error_output_truncated(mine_module, tmp_path: Path):
"""Long error outputs are truncated to MAX_ERROR_OUTPUT_LEN."""
log = tmp_path / "session.jsonl"
long_error = "Error: " + "x" * 1000
entry = {
"type": "tool_result",
"message": {"type": "tool_result", "content": long_error},
"is_error": True,
"timestamp": "2026-03-01T10:00:00+00:00",
}
log.write_text(json.dumps(entry))
result = mine_module.parse_session(log)
error_entries = [t for t in result["tool_uses"] if t["name"] == "__tool_result_error__"]
assert len(error_entries) == 1
assert len(error_entries[0]["output"]) <= mine_module.MAX_ERROR_OUTPUT_LEN
def test_error_pattern_output_truncated(mine_module, tmp_path: Path):
"""Error-pattern path (is_error=False) also truncates long output."""
log = tmp_path / "session.jsonl"
# Not flagged as is_error, but contains an error pattern
long_traceback = "Traceback (most recent call last):\n" + " File x.py\n" * 200
entry = {
"type": "tool_result",
"message": {"type": "tool_result", "content": long_traceback},
"timestamp": "2026-03-01T10:00:00+00:00",
}
log.write_text(json.dumps(entry))
result = mine_module.parse_session(log)
error_entries = [t for t in result["tool_uses"] if t["name"] == "__tool_result_error__"]
assert len(error_entries) == 1
assert len(error_entries[0]["output"]) <= mine_module.MAX_ERROR_OUTPUT_LEN
# ── Z-suffix timestamp handling (Python 3.10 compatibility) ──
def test_parse_timestamp_z_suffix(mine_module):
"""Timestamps ending with Z are parsed correctly on Python 3.10."""
from datetime import timezone
dt = mine_module._parse_timestamp("2026-03-01T10:00:00Z")
assert dt is not None
assert dt.tzinfo == timezone.utc
assert dt.hour == 10
def test_parse_timestamp_z_with_millis(mine_module):
"""Z-suffix with milliseconds is also handled."""
dt = mine_module._parse_timestamp("2026-03-01T10:00:00.123Z")
assert dt is not None
assert dt.microsecond == 123000
def test_unresolved_requests_z_timestamps(mine_module):
"""Real session logs use Z-suffix timestamps; detection must work."""
timed_entries = [
{"timestamp": "2026-03-01T10:00:00Z", "type": "user"},
{"timestamp": "2026-03-01T10:10:00Z", "type": "assistant"},
]
result = mine_module._detect_unresolved_requests(timed_entries)
assert result["count"] == 1 # 10-min gap → unresolved
# ── Response type filtering ──
def test_unresolved_requests_ignores_system_entries(mine_module):
"""system/progress/queue-operation entries do not count as a response."""
timed_entries = [
{"timestamp": "2026-03-01T10:00:00+00:00", "type": "user"},
{"timestamp": "2026-03-01T10:00:01+00:00", "type": "system"},
{"timestamp": "2026-03-01T10:00:02+00:00", "type": "progress"},
{"timestamp": "2026-03-01T10:06:00+00:00", "type": "assistant"},
]
result = mine_module._detect_unresolved_requests(timed_entries)
# system/progress at +1s/+2s are skipped; assistant at +6min is the real response → unresolved
assert result["count"] == 1
def test_unresolved_requests_tool_result_counts_as_response(mine_module):
"""tool_result entries count as a valid response."""
timed_entries = [
{"timestamp": "2026-03-01T10:00:00+00:00", "type": "user"},
{"timestamp": "2026-03-01T10:00:30+00:00", "type": "tool_result"},
]
result = mine_module._detect_unresolved_requests(timed_entries)
assert result["count"] == 0 # tool_result within 5 min → resolved
# ── N4: C1 regression test (name→title conversion + id generation) ──
def test_run_converts_name_to_title_and_adds_id(mine_module, tmp_path: Path):
"""run() converts LLM-returned 'name' to 'title' and generates 'id' for each candidate."""
import types
from unittest.mock import patch
import yaml
output_dir = tmp_path / "out"
output_dir.mkdir()
# Candidates as the LLM might return them (with 'name' instead of 'title')
fake_candidates = [
{"name": "Auto Reporter", "description": "Automated reports", "priority": "high"},
{"title": "Already Titled", "description": "Has title", "priority": "low"},
]
# Build minimal args namespace
args = types.SimpleNamespace(
output_dir=str(output_dir),
project=None,
lookback_days=7,
dry_run=False,
)
# Patch dependencies to isolate the enrichment logic
with (
patch.object(mine_module, "find_project_dirs", return_value=[("proj", tmp_path)]),
patch.object(
mine_module,
"list_session_logs",
return_value=[("proj", tmp_path / "fake.jsonl")],
),
patch.object(
mine_module,
"parse_session",
return_value={
"user_messages": ["hello"],
"tool_uses": [],
"timestamps": [],
"timed_entries": [],
},
),
patch.object(mine_module, "abstract_with_llm", return_value=fake_candidates),
):
rc = mine_module.run(args)
assert rc == 0
# Read the output file
output_path = output_dir / "raw_candidates.yaml"
assert output_path.exists()
data = yaml.safe_load(output_path.read_text(encoding="utf-8"))
candidates = data["candidates"]
assert len(candidates) == 2
# First candidate: 'name' should be converted to 'title'
assert "title" in candidates[0]
assert candidates[0]["title"] == "Auto Reporter"
assert "name" not in candidates[0] # 'name' key removed
# Second candidate: already had 'title', should be untouched
assert candidates[1]["title"] == "Already Titled"
# Both should have 'id' assigned
assert candidates[0]["id"].startswith("raw_")
assert candidates[1]["id"].startswith("raw_")
assert candidates[0]["id"] != candidates[1]["id"]
# ── PROJECT_ALLOWLIST ──
def test_project_allowlist_contains_trading_projects(mine_module):
"""Allowlist includes all expected trading-related projects."""
al = mine_module.PROJECT_ALLOWLIST
assert "claude-trading-skills" in al
assert "weekly-trade-strategy" in al
assert "claude-market-agents" in al
assert "trade-edge-finder" in al
assert "trade-strategy-pipeline" in al
# ── filter_non_trading_candidates ──
def test_filter_rejects_developer_tooling_category(mine_module):
"""Candidates with rejected categories are filtered out."""
candidates = [
{"title": "code-nav", "category": "developer-tooling", "description": ""},
{"title": "trade-tool", "category": "trade-execution", "description": ""},
{"title": "skill-opt", "category": "skill-development", "description": ""},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 1
assert result[0]["title"] == "trade-tool"
def test_filter_rejects_broad_off_domain_categories(mine_module):
"""Non-trading categories beyond developer-tooling are also rejected."""
candidates = [
{"title": "scheduler", "category": "meeting", "description": "Schedule meetings"},
{"title": "support-bot", "category": "customer-support", "description": "Handle tickets"},
{"title": "earnings-tool", "category": "trade-review", "description": "Review trades"},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 1
assert result[0]["title"] == "earnings-tool"
def test_filter_rejects_keyword_in_title(mine_module):
"""Candidates with rejected keywords in title are filtered out."""
candidates = [
{"title": "codebase-navigator", "category": "other", "description": ""},
{"title": "earnings-tracker", "category": "trade-review", "description": ""},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 1
assert result[0]["title"] == "earnings-tracker"
def test_filter_rejects_keyword_in_description(mine_module):
"""Candidates with rejected keywords in description are filtered out."""
candidates = [
{
"title": "some-tool",
"category": "other",
"description": "A git-bulk commit helper",
},
{
"title": "watchlist",
"category": "trading",
"description": "Monitor stock prices",
},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 1
assert result[0]["title"] == "watchlist"
def test_filter_passes_all_trading_candidates(mine_module):
"""All trading-related candidates pass through the filter."""
candidates = [
{"title": "earnings-reviewer", "category": "trade-review", "description": "Review trades"},
{"title": "alert-monitor", "category": "trade-execution", "description": "Price alerts"},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 2
# ── _build_llm_prompt trading_focus ──
def test_build_llm_prompt_trading_focus_true(mine_module):
"""With trading_focus=True, prompt includes trading constraints."""
prompt = mine_module._build_llm_prompt({}, [], "test-project", trading_focus=True)
assert "trading and investing skill library" in prompt
assert "DO NOT propose developer-tooling" in prompt
def test_build_llm_prompt_trading_focus_false(mine_module):
"""With trading_focus=False, prompt is generic (no trading constraints)."""
prompt = mine_module._build_llm_prompt({}, [], "test-project", trading_focus=False)
assert "trading and investing skill library" not in prompt
assert "DO NOT propose developer-tooling" not in prompt
assert "automate or improve" in prompt
def test_filter_handles_null_fields(mine_module):
"""Filter handles None/null values in category, title, description."""
candidates = [
{"title": None, "category": None, "description": None},
{"title": "valid", "category": "trading", "description": "ok"},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 2 # None fields don't match any reject rule
def test_filter_handles_non_string_fields(mine_module):
"""Filter handles non-string values (e.g., list, int) without crashing."""
candidates = [
{"title": 123, "category": ["a"], "description": True},
{"title": "valid", "category": "trading", "description": "ok"},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 2
def test_filter_rejects_keyword_already_in_title(mine_module):
"""Filter rejects candidates whose title matches a rejected keyword."""
candidates = [
{"title": "codebase-navigator", "category": "other", "description": ""},
]
result = mine_module.filter_non_trading_candidates(candidates)
assert len(result) == 0
def test_run_normalizes_null_title_from_name_then_filters(mine_module, tmp_path: Path):
"""run() normalizes title=None + name=X before filter, so keyword rejection works."""
import types
from unittest.mock import patch
import yaml
output_dir = tmp_path / "out"
output_dir.mkdir()
# LLM returns title=None with name containing a rejected keyword
fake_candidates = [
{
"title": None,
"name": "codebase-navigator",
"category": "other",
"description": "Navigate code",
},
{"title": "earnings-tool", "category": "trade-review", "description": "Review trades"},
]
args = types.SimpleNamespace(
output_dir=str(output_dir),
project=None,
lookback_days=7,
dry_run=False,
)
with (
patch.object(mine_module, "find_project_dirs", return_value=[("proj", tmp_path)]),
patch.object(
mine_module,
"list_session_logs",
return_value=[("proj", tmp_path / "fake.jsonl")],
),
patch.object(
mine_module,
"parse_session",
return_value={
"user_messages": ["hello"],
"tool_uses": [],
"timestamps": [],
"timed_entries": [],
},
),
patch.object(mine_module, "abstract_with_llm", return_value=fake_candidates),
):
rc = mine_module.run(args)
assert rc == 0
output_path = output_dir / "raw_candidates.yaml"
data = yaml.safe_load(output_path.read_text(encoding="utf-8"))
candidates = data["candidates"]
# codebase-navigator should be filtered out after name->title normalization
assert len(candidates) == 1
assert candidates[0]["title"] == "earnings-tool"
```
### scripts/tests/test_score_ideas.py
```python
"""Tests for the skill idea scorer and deduplication script."""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import pytest
import yaml
@pytest.fixture(scope="module")
def score_module():
"""Load score_ideas.py as a module via importlib."""
script_path = Path(__file__).resolve().parents[1] / "score_ideas.py"
spec = importlib.util.spec_from_file_location("score_ideas", script_path)
if spec is None or spec.loader is None:
raise RuntimeError("Failed to load score_ideas.py")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
# ── Jaccard similarity tests ──
def test_jaccard_identical(score_module):
"""Identical text returns 1.0."""
assert score_module.jaccard_similarity("hello world", "hello world") == 1.0
def test_jaccard_partial(score_module):
"""Partially overlapping words return expected value."""
# "hello world" -> {"hello", "world"}
# "hello there" -> {"hello", "there"}
# intersection = {"hello"}, union = {"hello", "world", "there"}
# Jaccard = 1/3
result = score_module.jaccard_similarity("hello world", "hello there")
assert abs(result - 1.0 / 3.0) < 1e-9
def test_jaccard_disjoint(score_module):
"""No common words returns 0.0."""
assert score_module.jaccard_similarity("alpha beta", "gamma delta") == 0.0
def test_jaccard_empty(score_module):
"""Empty string returns 0.0."""
assert score_module.jaccard_similarity("", "hello") == 0.0
assert score_module.jaccard_similarity("hello", "") == 0.0
assert score_module.jaccard_similarity("", "") == 0.0
# ── list_existing_skills tests ──
def _make_skill(project_root: Path, name: str, description: str = "test") -> None:
"""Create a minimal skill directory with SKILL.md."""
skill_dir = project_root / "skills" / name
skill_dir.mkdir(parents=True, exist_ok=True)
(skill_dir / "SKILL.md").write_text(
f"---\nname: {name}\ndescription: {description}\n---\n# {name}\n",
encoding="utf-8",
)
def test_list_existing_skills(score_module, tmp_path: Path):
"""Create mock skills/*/SKILL.md and verify parsing."""
_make_skill(tmp_path, "alpha-skill", "Alpha skill for testing")
_make_skill(tmp_path, "beta-skill", "Beta skill for testing")
results = score_module.list_existing_skills(tmp_path)
assert len(results) == 2
names = {r["name"] for r in results}
assert "alpha-skill" in names
assert "beta-skill" in names
# Verify descriptions are parsed
alpha = next(r for r in results if r["name"] == "alpha-skill")
assert alpha["description"] == "Alpha skill for testing"
def test_list_existing_skills_skips_nested(score_module, tmp_path: Path):
"""Verify skills/**/SKILL.md nested entries are NOT returned."""
_make_skill(tmp_path, "top-skill", "Top level skill")
# Create a nested SKILL.md that should NOT be picked up
nested_dir = tmp_path / "skills" / "top-skill" / "sub-component"
nested_dir.mkdir(parents=True, exist_ok=True)
(nested_dir / "SKILL.md").write_text(
"---\nname: nested-skill\ndescription: Should not appear\n---\n",
encoding="utf-8",
)
results = score_module.list_existing_skills(tmp_path)
names = {r["name"] for r in results}
assert "top-skill" in names
assert "nested-skill" not in names
assert len(results) == 1
# ── merge_into_backlog tests ──
def test_merge_new_ideas(score_module):
"""New ideas are added to backlog."""
backlog = {"updated_at_utc": "", "ideas": []}
candidates = [
{"id": "idea_001", "title": "New Idea", "description": "desc", "scores": {"composite": 70}},
{"id": "idea_002", "title": "Another", "description": "desc2", "scores": {"composite": 80}},
]
result = score_module.merge_into_backlog(backlog, candidates)
assert len(result["ideas"]) == 2
assert result["ideas"][0]["id"] == "idea_001"
assert result["ideas"][1]["id"] == "idea_002"
assert result["updated_at_utc"] != ""
def test_merge_skip_duplicates(score_module):
"""Ideas with same id are not duplicated in backlog."""
backlog = {
"updated_at_utc": "2026-02-28T06:15:00Z",
"ideas": [
{"id": "idea_001", "title": "Existing Idea", "status": "pending"},
],
}
candidates = [
{"id": "idea_001", "title": "Existing Idea Updated", "scores": {"composite": 90}},
{"id": "idea_002", "title": "New Idea", "scores": {"composite": 80}},
]
result = score_module.merge_into_backlog(backlog, candidates)
assert len(result["ideas"]) == 2
# Original idea unchanged
assert result["ideas"][0]["title"] == "Existing Idea"
# New idea added
assert result["ideas"][1]["id"] == "idea_002"
def test_merge_preserves_status(score_module):
"""Existing idea status is not overwritten by merge."""
backlog = {
"updated_at_utc": "2026-02-28T06:15:00Z",
"ideas": [
{
"id": "idea_001",
"title": "Old Idea",
"status": "attempted",
"scores": {"composite": 60},
},
],
}
# Candidate with same id tries to change status
candidates = [
{
"id": "idea_001",
"title": "Old Idea Revisited",
"status": "pending",
"scores": {"composite": 90},
},
]
result = score_module.merge_into_backlog(backlog, candidates)
assert len(result["ideas"]) == 1
assert result["ideas"][0]["status"] == "attempted"
assert result["ideas"][0]["scores"]["composite"] == 60
# ── find_duplicates tests ──
def test_find_duplicates_marks_similar(score_module):
"""Candidate with high Jaccard similarity is marked as duplicate."""
candidates = [
{
"id": "cand_001",
"title": "Market Breadth Weekly Reporter",
"description": "Weekly market breadth summary reports for trading",
},
]
existing_skills = [
{
"name": "market-breadth-reporter",
"description": "Weekly market breadth summary reports for trading",
},
]
backlog_ideas = []
result = score_module.find_duplicates(candidates, existing_skills, backlog_ideas)
assert result[0].get("status") == "duplicate"
assert "skill:market-breadth-reporter" in result[0].get("duplicate_of", "")
assert result[0].get("jaccard_score", 0) > score_module.JACCARD_THRESHOLD
# ── save_backlog atomic write tests ──
def test_save_backlog_writes_valid_yaml(score_module, tmp_path: Path):
"""save_backlog writes valid YAML that round-trips correctly."""
backlog_path = tmp_path / "backlog.yaml"
backlog = {
"updated_at_utc": "2026-03-01T10:00:00Z",
"ideas": [
{"id": "idea_001", "title": "Test Idea", "scores": {"composite": 75}},
],
}
score_module.save_backlog(backlog_path, backlog)
assert backlog_path.exists()
loaded = yaml.safe_load(backlog_path.read_text(encoding="utf-8"))
assert loaded["ideas"][0]["id"] == "idea_001"
assert loaded["ideas"][0]["scores"]["composite"] == 75
def test_save_backlog_no_temp_files_remain(score_module, tmp_path: Path):
"""After save_backlog, no .tmp files remain in the directory."""
backlog_path = tmp_path / "backlog.yaml"
backlog = {"updated_at_utc": "", "ideas": []}
score_module.save_backlog(backlog_path, backlog)
tmp_files = list(tmp_path.glob(".backlog_*.tmp"))
assert tmp_files == [], f"Temp files should be cleaned up: {tmp_files}"
def test_save_backlog_no_bak_created(score_module, tmp_path: Path):
"""Atomic write replaces .bak strategy; no .bak file is created."""
backlog_path = tmp_path / "backlog.yaml"
backlog = {"updated_at_utc": "", "ideas": [{"id": "a"}]}
# Write twice to ensure overwrite path doesn't create .bak
score_module.save_backlog(backlog_path, backlog)
backlog["ideas"].append({"id": "b"})
score_module.save_backlog(backlog_path, backlog)
bak_files = list(tmp_path.glob("*.bak"))
assert bak_files == [], f"No .bak files should exist: {bak_files}"
loaded = yaml.safe_load(backlog_path.read_text(encoding="utf-8"))
assert len(loaded["ideas"]) == 2
```