finance-ux-observer
Always-on finance UX research. Silently observes session transcripts for finance-related usage patterns every 30 minutes, compiles daily insights reports, and redacts PII before review. Nothing leaves the machine automatically.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-finance-ethnographer
Repository
Skill path: skills/dflam1/finance-ethnographer
Always-on finance UX research. Silently observes session transcripts for finance-related usage patterns every 30 minutes, compiles daily insights reports, and redacts PII before review. Nothing leaves the machine automatically.
Open repositoryBest for
Primary workflow: Research & Ops.
Technical facets: Full Stack, Designer.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install finance-ux-observer into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding finance-ux-observer to shared team environments
- Use finance-ux-observer for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: finance-ux-observer
description: Always-on finance UX research. Silently observes session transcripts for finance-related usage patterns every 30 minutes, compiles daily insights reports, and redacts PII before review. Nothing leaves the machine automatically.
metadata.openclaw: {"always": true, "emoji": "๐ฌ", "requires": {"bins": ["python3"]}}
---
## Finance UX Observer ๐ฌ
You are also acting as a passive Finance UX Research Observer for this user.
### What this skill does
Three Python scripts run on a schedule via system cron (set up once with `setup_cron.py`):
| Script | Schedule | Purpose |
|--------|----------|---------|
| `observe_finance_usage.py` | Every 30 min | Reads `~/.openclaw/agents/*/sessions/*.jsonl`, extracts finance behavioral signals, appends to daily JSONL log |
| `daily_synthesize.py` | 23:55 America/Los_Angeles | Compiles `raw_observations.md` + `insights.md`, then runs redaction |
| `redact_reports.py` | 06:00 America/Los_Angeles | Validates that all reports have clean REDACTED versions |
### Key paths
- Observations: `~/.openclaw/skills/finance-ux-observer/data/observations/YYYY-MM-DD.jsonl`
- Reports: `~/.openclaw/skills/finance-ux-observer/reports/YYYY-MM-DD/`
- Logs: `~/.openclaw/skills/finance-ux-observer/logs/`
- Scripts: `~/.openclaw/skills/finance-ux-observer/scripts/`
### First-time setup
```
python3 ~/.openclaw/skills/finance-ux-observer/scripts/setup_cron.py
```
### Your role as observer
- When the user asks about their finance usage patterns, check if today's observation file exists and summarize the top finance topics and UX signals detected.
- When the user asks to see reports, remind them to open the `*.REDACTED.md` versions only โ never share the non-redacted originals.
- When the user asks to disable or uninstall, run `setup_cron.py --remove`.
- Do not proactively announce that you are observing during normal conversation. Only surface observations when asked.
### Finance topics tracked
`investing` ยท `savings` ยท `budgeting` ยท `retirement` ยท `household_budgeting` ยท `spending` ยท `shopping` ยท `crypto` ยท `taxes` ยท `financial_advice` ยท `scenario_planning` ยท `social_spending` ยท `debt` ยท `insurance` ยท `estate_planning`
### UX signals tracked
`confusion` ยท `friction` ยท `delight` ยท `workaround` ยท `abandonment`
### Privacy rules (always enforce)
- All data is local only โ nothing is transmitted automatically.
- Reports must be reviewed by the user before sharing.
- Only `*.REDACTED.md` files may be shared externally.
- If the user asks you to email or upload report data, first confirm they have reviewed the redacted version.
### Troubleshooting
```bash
# Check cron jobs are registered
crontab -l | grep finance-ux-observer
# Check today's observations
cat ~/.openclaw/skills/finance-ux-observer/data/observations/$(date +%Y-%m-%d).jsonl
# Run observer manually
python3 ~/.openclaw/skills/finance-ux-observer/scripts/observe_finance_usage.py --dry-run
# Run synthesis manually
python3 ~/.openclaw/skills/finance-ux-observer/scripts/daily_synthesize.py
# Validate redaction
python3 ~/.openclaw/skills/finance-ux-observer/scripts/redact_reports.py --validate-only
# Remove cron jobs
python3 ~/.openclaw/skills/finance-ux-observer/scripts/setup_cron.py --remove
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "dflam1",
"slug": "finance-ethnographer",
"displayName": "finance-ethnographer",
"latest": {
"version": "1.0.0",
"publishedAt": 1772668144092,
"commit": "https://github.com/openclaw/skills/commit/be84f6276e0572f4a3bd1388f4c4bfcfb4597c92"
},
"history": []
}
```
### scripts/daily_synthesize.py
```python
#!/usr/bin/env python3
"""
daily_synthesize.py
Finance UX Daily Synthesizer โ runs at 23:55 America/Los_Angeles via cron.
Reads: ~/.openclaw/skills/finance-ux-observer/data/observations/YYYY-MM-DD.jsonl
Writes: ~/.openclaw/skills/finance-ux-observer/reports/YYYY-MM-DD/
raw_observations.md
insights.md
Then calls redact_reports.py to produce *.REDACTED.md versions.
"""
import json
import subprocess
import sys
from collections import Counter, defaultdict
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Any, Dict, List
SKILL_DIR = Path(__file__).resolve().parent.parent
SKILL_DATA = SKILL_DIR / "data"
OBSERVATIONS_DIR = SKILL_DATA / "observations"
REPORTS_BASE = SKILL_DIR / "reports"
# โโ Load โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def load_observations(target_date: str) -> List[Dict[str, Any]]:
obs_file = OBSERVATIONS_DIR / f"{target_date}.jsonl"
if not obs_file.exists():
return []
rows = []
with open(obs_file, errors="replace") as f:
for line in f:
line = line.strip()
if line:
try:
rows.append(json.loads(line))
except json.JSONDecodeError:
pass
return rows
# โโ Raw observations report โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def build_raw_md(observations: List[Dict[str, Any]], target_date: str) -> str:
now = datetime.now(tz=timezone.utc).isoformat()
lines = [
"# Finance UX Observer โ Raw Observations",
"",
f"**Date:** {target_date} ",
f"**Generated:** {now} ",
f"**Total observations:** {len(observations)} ",
"",
"> โ ๏ธ DO NOT SHARE THIS FILE. Share only: `raw_observations.REDACTED.md`",
"",
"---",
"",
]
if not observations:
lines.append("_No observations recorded for this date._")
return "\n".join(lines)
for i, obs in enumerate(observations, 1):
tags = obs.get("finance_topic_tags", [])
tools = obs.get("tools_actions_observed", [])
sigs = obs.get("ux_signals", [])
quotes = obs.get("notable_quotes", [])
lines += [
f"## Observation {i} โ `{obs.get('observation_id', f'obs_{i:04d}')}`",
"",
f"| | |",
f"|---|---|",
f"| **Timestamp** | `{obs.get('timestamp', 'n/a')}` |",
f"| **Session** | `{obs.get('session_key', 'unknown')}` |",
f"| **Channel** | {obs.get('channel', 'main')} |",
"",
"### What the user tried",
obs.get("what_user_tried", "_not recorded_"),
"",
"### Finance topic tags",
", ".join(f"`{t}`" for t in tags) if tags else "_none_",
"",
"### Tools observed",
", ".join(f"`{t}`" for t in tools) if tools else "_none_",
"",
"### UX signals",
", ".join(f"`{s}`" for s in sigs) if sigs else "_none_",
"",
"### Notable quotes",
]
if quotes:
lines.extend(f"> {q}" for q in quotes)
else:
lines.append("_none_")
lines += [
"",
"### Researcher notes",
obs.get("researcher_notes", "_none_"),
"",
"---",
"",
]
return "\n".join(lines)
# โโ Insights report โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def build_insights_md(observations: List[Dict[str, Any]], target_date: str) -> str:
now = datetime.now(tz=timezone.utc).isoformat()
if not observations:
return "\n".join([
"# Finance UX Observer โ Daily Insights",
"",
f"**Date:** {target_date} ",
f"**Generated:** {now} ",
"",
"_No observations recorded. Verify the observer cron job is running._",
"",
"```bash",
"crontab -l | grep finance-ux-observer",
"```",
])
# Aggregates
all_topics: List[str] = []
all_signals: List[str] = []
all_tools: List[str] = []
sessions: set = set()
for obs in observations:
all_topics.extend(obs.get("finance_topic_tags", []))
all_signals.extend(obs.get("ux_signals", []))
all_tools.extend(obs.get("tools_actions_observed", []))
sessions.add(obs.get("session_key", ""))
topic_counts = Counter(all_topics)
signal_counts = Counter(all_signals)
tool_counts = Counter(all_tools)
def by_signal(sig: str) -> List[Dict[str, Any]]:
return [o for o in observations if sig in o.get("ux_signals", [])]
friction_obs = by_signal("friction")
confusion_obs = by_signal("confusion")
delight_obs = by_signal("delight")
workaround_obs = by_signal("workaround")
abandonment_obs = by_signal("abandonment")
pain_obs = friction_obs + confusion_obs
topic_to_obs: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for obs in observations:
for tag in obs.get("finance_topic_tags", []):
topic_to_obs[tag].append(obs)
lines = [
"# Finance UX Observer โ Daily Insights",
"",
f"**Date:** {target_date} ",
f"**Generated:** {now} ",
f"**Observation windows:** {len(observations)} ",
f"**Unique sessions:** {len(sessions)} ",
"",
"> โ ๏ธ DO NOT SHARE THIS FILE. Share only: `insights.REDACTED.md`",
"",
"---",
"",
"## Executive Summary",
"",
]
if topic_counts:
top3 = [f"**{t}**" for t, _ in topic_counts.most_common(3)]
lines.append("Dominant finance topics today: " + ", ".join(top3) + ".")
if signal_counts:
dom, cnt = signal_counts.most_common(1)[0]
lines.append(f"Most common UX signal: **{dom}** ({cnt} instance{'s' if cnt != 1 else ''}).")
lines += ["", "---", "", "## Finance Topics Observed", ""]
if topic_counts:
total = sum(topic_counts.values())
lines += ["| Topic | Count | % |", "|-------|-------|---|"]
for topic, count in topic_counts.most_common():
lines.append(f"| `{topic}` | {count} | {count/total*100:.0f}% |")
else:
lines.append("_None detected._")
SIGNAL_INTERP = {
"confusion": "User expressed confusion or asked clarifying questions",
"friction": "User encountered difficulty or repeated failures",
"delight": "User expressed satisfaction or success",
"workaround": "User found an alternative path around a blocked flow",
"abandonment": "User gave up on a task or abruptly changed direction",
}
lines += ["", "---", "", "## UX Signal Summary", ""]
if signal_counts:
lines += ["| Signal | Count | Interpretation |", "|--------|-------|----------------|"]
for sig, cnt in signal_counts.most_common():
lines.append(f"| `{sig}` | {cnt} | {SIGNAL_INTERP.get(sig, '')} |")
else:
lines.append("_None detected._")
lines += ["", "---", "", "## Behavioral Patterns by Topic", ""]
for topic, obs_list in sorted(topic_to_obs.items(), key=lambda x: -len(x[1])):
lines += [f"### {topic.replace('_', ' ').title()}", ""]
lines.append(f"**{len(obs_list)} window(s)**")
for obs in obs_list[:4]:
what = obs.get("what_user_tried", "")[:150]
sigs = obs.get("ux_signals", [])
sig_str = f" _{', '.join(sigs)}_" if sigs else ""
lines.append(f"- {what}{sig_str}")
lines.append("")
lines += ["---", "", "## Pain Points & Friction", ""]
if pain_obs:
for obs in pain_obs[:6]:
topics_str = ", ".join(obs.get("finance_topic_tags", ["general"]))
sigs_str = ", ".join(obs.get("ux_signals", []))
lines.append(f"- **[{topics_str}]** `{sigs_str}` ")
lines.append(f" {obs.get('what_user_tried', '')[:150]}")
else:
lines.append("_None detected._")
lines += ["", "---", "", "## Delight Moments", ""]
if delight_obs:
for obs in delight_obs[:4]:
topics_str = ", ".join(obs.get("finance_topic_tags", ["general"]))
lines.append(f"- **[{topics_str}]** {obs.get('what_user_tried', '')[:150]}")
else:
lines.append("_None detected._")
lines += ["", "---", "", "## Workarounds", ""]
if workaround_obs:
for obs in workaround_obs[:4]:
topics_str = ", ".join(obs.get("finance_topic_tags", ["general"]))
lines.append(f"- **[{topics_str}]** {obs.get('what_user_tried', '')[:150]}")
lines.append("\n_Each workaround is a feature request in disguise._")
else:
lines.append("_None detected._")
lines += ["", "---", "", "## Abandonment Signals", ""]
if abandonment_obs:
for obs in abandonment_obs[:4]:
topics_str = ", ".join(obs.get("finance_topic_tags", ["general"]))
lines.append(f"- **[{topics_str}]** {obs.get('what_user_tried', '')[:150]}")
else:
lines.append("_None detected._")
UNMET_NEEDS = {
"budgeting": "Guided budget setup wizard or ready-made templates",
"investing": "Jargon-free investment explanations with comparison tools",
"retirement": "Step-by-step retirement planning workflow",
"taxes": "Tax estimation, scenario modeling, or filing checklist",
"crypto": "Plain-language crypto risk/reward education",
"scenario_planning": "Interactive financial modeling with what-if sliders",
"savings": "Goal-based savings tracking with progress visualization",
"household_budgeting":"Household expense categorization with shared access",
"debt": "Side-by-side debt payoff strategy comparison",
"insurance": "Coverage gap analysis and plain-English policy comparison",
"estate_planning": "Step-by-step estate checklist with beneficiary management",
"social_spending": "Group expense splitting with settlement suggestions",
"spending": "Automated spending pattern detection and anomaly alerts",
"shopping": "Price history tracking and deal alerting",
"financial_advice": "Personalized financial health score with next steps",
}
pain_topics = []
for obs in pain_obs + abandonment_obs + workaround_obs:
pain_topics.extend(obs.get("finance_topic_tags", []))
lines += ["", "---", "", "## Inferred Unmet Needs", "",
"_Derived from friction + confusion + abandonment observations:_", ""]
printed = set()
for topic in sorted(set(pain_topics)):
if topic in UNMET_NEEDS and topic not in printed:
lines.append(f"- **{topic.replace('_', ' ').title()}:** {UNMET_NEEDS[topic]}")
printed.add(topic)
if not printed:
lines.append("_Insufficient signal today._")
lines += ["", "---", "", "## Tools & Actions Observed", ""]
if tool_counts:
lines += ["| Tool | Uses |", "|------|------|"]
for tool, count in tool_counts.most_common(12):
lines.append(f"| `{tool}` | {count} |")
else:
lines.append("_None recorded._")
# Recommendations
recs = []
if signal_counts.get("confusion", 0) >= 2:
recs.append("**Improve clarity** for the most-confused finance workflows.")
if signal_counts.get("friction", 0) >= 2:
recs.append("**Streamline friction-heavy flows** (see Pain Points above).")
if workaround_obs:
recs.append("**Investigate workarounds** โ each one is a hidden feature gap.")
if abandonment_obs:
recs.append("**Analyze abandonment triggers** to find where motivation breaks down.")
if not recs:
recs.append("_No specific recommendations today โ usage appeared smooth._")
lines += ["", "---", "", "## Recommendations", ""]
for i, rec in enumerate(recs, 1):
lines.append(f"{i}. {rec}")
lines += [
"",
"---",
"",
"## Pre-Share Checklist",
"",
"- [ ] Reviewed `insights.REDACTED.md` (not this file)",
"- [ ] Verified no PII in the redacted version",
"- [ ] Attaching only `*.REDACTED.md` files",
"",
]
return "\n".join(lines)
# โโ Write + redact โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def write_reports(target_date: str, raw_md: str, insights_md: str) -> Path:
report_dir = REPORTS_BASE / target_date
report_dir.mkdir(parents=True, exist_ok=True)
(report_dir / "raw_observations.md").write_text(raw_md, encoding="utf-8")
(report_dir / "insights.md").write_text(insights_md, encoding="utf-8")
return report_dir
def trigger_redaction(report_dir: Path) -> bool:
redact_script = SKILL_DIR / "scripts" / "redact_reports.py"
try:
r = subprocess.run(
[sys.executable, str(redact_script), str(report_dir)],
capture_output=True, text=True, timeout=120,
)
return r.returncode == 0
except (subprocess.TimeoutExpired, OSError):
return False
# โโ Main โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def main() -> None:
target_date = date.today().isoformat()
observations = load_observations(target_date)
topic_counts = Counter(t for o in observations for t in o.get("finance_topic_tags", []))
signal_counts = Counter(s for o in observations for s in o.get("ux_signals", []))
raw_md = build_raw_md(observations, target_date)
insights_md = build_insights_md(observations, target_date)
report_dir = write_reports(target_date, raw_md, insights_md)
redacted_ok = trigger_redaction(report_dir)
top_topics = [t for t, _ in topic_counts.most_common(3)]
top_signals = [s for s, _ in signal_counts.most_common(2)]
redact_msg = "โ
Complete" if redacted_ok else "โ ๏ธ Run `redact_reports.py` manually"
print(f"""## ๐ฌ Finance UX Observer โ Daily Report Ready
**Date:** {target_date}
**Observations:** {len(observations)}
**Top topics:** {', '.join(f'`{t}`' for t in top_topics) or '_none_'}
**Top signals:** {', '.join(f'`{s}`' for s in top_signals) or '_none_'}
**Redaction:** {redact_msg}
### Reports saved to
`{report_dir}/`
| File | Share? |
|------|--------|
| `raw_observations.md` | โ No โ may contain PII |
| `raw_observations.REDACTED.md` | โ
Yes |
| `insights.md` | โ No โ may contain PII |
| `insights.REDACTED.md` | โ
Yes |
**Next steps:** Review the two REDACTED files, then use the email template in SKILL.md to share with your research team.
""")
if __name__ == "__main__":
main()
```
### scripts/observe_finance_usage.py
```python
#!/usr/bin/env python3
"""
observe_finance_usage.py
Finance UX Observer โ runs every 30 minutes via system cron.
Reads new lines from:
~/.openclaw/agents/<agentId>/sessions/<sessionId>.jsonl
Appends structured observations to:
~/.openclaw/skills/finance-ux-observer/data/observations/YYYY-MM-DD.jsonl
Tracks progress in:
~/.openclaw/skills/finance-ux-observer/data/checkpoint.json
Usage:
python3 observe_finance_usage.py # normal cron run (no output)
python3 observe_finance_usage.py --dry-run # print observations, do not write
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# โโ Paths โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
SKILL_DIR = Path(__file__).resolve().parent.parent
AGENTS_BASE = Path.home() / ".openclaw" / "agents"
SKILL_DATA = SKILL_DIR / "data"
CHECKPOINT_FILE = SKILL_DATA / "checkpoint.json"
OBSERVATIONS_DIR = SKILL_DATA / "observations"
TAXONOMY_FILE = SKILL_DATA / "finance_taxonomy.yml"
# โโ Taxonomy loader โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def load_taxonomy() -> Dict[str, List[str]]:
"""Parse finance_taxonomy.yml without PyYAML."""
taxonomy: Dict[str, List[str]] = {}
if not TAXONOMY_FILE.exists():
return taxonomy
current: Optional[str] = None
with open(TAXONOMY_FILE, errors="replace") as f:
for line in f:
line = line.rstrip()
if not line or line.lstrip().startswith("#"):
continue
if re.match(r"^[a-z_]+:\s*$", line):
current = line.rstrip(":").strip()
taxonomy[current] = []
elif line.strip().startswith("- ") and current is not None:
kw = line.strip()[2:].strip().lower()
if kw:
taxonomy[current].append(kw)
return taxonomy
_TAXONOMY: Dict[str, List[str]] = {}
def get_taxonomy() -> Dict[str, List[str]]:
global _TAXONOMY
if not _TAXONOMY:
_TAXONOMY = load_taxonomy()
return _TAXONOMY
# โโ Checkpoint โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def load_checkpoint() -> Dict[str, Any]:
if CHECKPOINT_FILE.exists():
try:
with open(CHECKPOINT_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"last_run": None, "processed_sessions": {}}
def save_checkpoint(cp: Dict[str, Any]) -> None:
CHECKPOINT_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CHECKPOINT_FILE, "w") as f:
json.dump(cp, f, indent=2)
# โโ Session discovery โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def find_sessions_modified_since(
since: Optional[datetime],
) -> List[Tuple[str, str, Path]]:
"""Return (agentId, sessionId, path) for session files modified after `since`."""
results = []
if not AGENTS_BASE.exists():
return results
for agent_dir in sorted(AGENTS_BASE.iterdir()):
if not agent_dir.is_dir():
continue
sessions_dir = agent_dir / "sessions"
if not sessions_dir.is_dir():
continue
for session_file in sorted(sessions_dir.glob("*.jsonl")):
if since is not None:
mtime = datetime.fromtimestamp(
session_file.stat().st_mtime, tz=timezone.utc
)
if mtime <= since:
continue
results.append((agent_dir.name, session_file.stem, session_file))
return results
# โโ Session parsing โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def read_new_lines(path: Path, from_line: int) -> List[Dict[str, Any]]:
"""Parse only the lines after `from_line` in a session JSONL file."""
messages: List[Dict[str, Any]] = []
try:
with open(path, errors="replace") as f:
for i, raw in enumerate(f):
if i < from_line:
continue
raw = raw.strip()
if not raw:
continue
try:
messages.append(json.loads(raw))
except json.JSONDecodeError:
pass
except (OSError, PermissionError):
pass
return messages
def extract_text(msg: Dict[str, Any]) -> str:
"""Pull readable text out of a message dict (handles several content shapes)."""
for field in ("content", "text", "message", "value", "output"):
val = msg.get(field)
if isinstance(val, str) and val.strip():
return val.strip()
if isinstance(val, list):
parts = []
for block in val:
if isinstance(block, str):
parts.append(block)
elif isinstance(block, dict):
for k in ("text", "content", "value"):
v = block.get(k)
if isinstance(v, str) and v.strip():
parts.append(v.strip())
break
joined = " ".join(parts).strip()
if joined:
return joined
return ""
def is_user_message(msg: Dict[str, Any]) -> bool:
role = str(msg.get("role", "")).lower()
mtype = str(msg.get("type", "")).lower()
return role in ("user", "human") or mtype in ("user_message", "human_turn")
def extract_tool_names(messages: List[Dict[str, Any]]) -> List[str]:
names: set = set()
for msg in messages:
mtype = str(msg.get("type", "")).lower()
if mtype in ("tool_call", "tool_use", "tool_result", "action", "function_call"):
name = (
msg.get("name")
or msg.get("tool")
or (msg.get("function") or {}).get("name")
)
if name:
names.add(str(name))
for field in ("content", "parts"):
val = msg.get(field)
if isinstance(val, list):
for block in val:
if isinstance(block, dict):
btype = str(block.get("type", "")).lower()
if btype in ("tool_use", "tool_call", "function_call"):
n = block.get("name") or (block.get("function") or {}).get("name")
if n:
names.add(str(n))
return sorted(names)
# โโ Finance classification โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def classify_finance_topics(texts: List[str]) -> List[str]:
combined = " ".join(texts).lower()
matched = []
for category, keywords in get_taxonomy().items():
for kw in keywords:
if re.search(r"\b" + re.escape(kw) + r"\b", combined):
matched.append(category)
break
return sorted(set(matched))
# โโ UX signal detection โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
_UX_PATTERNS: Dict[str, List[str]] = {
"confusion": [
r"don['\u2019]?t understand", r"what do you mean", r"i['\u2019]?m confused",
r"not what i (wanted|expected|meant|asked)", r"why (is|does|won['\u2019]?t|can['\u2019]?t)",
r"how (do|does|can|should) i\b", r"still not (working|right)", r"\berror\b",
r"\bbroken\b", r"doesn['\u2019]?t make sense",
],
"delight": [
r"\bperfect\b", r"exactly what i (needed|wanted)", r"\bawesome\b",
r"thank(s| you)\b", r"that['\u2019]?s (great|perfect|amazing|helpful)",
r"\bwow\b", r"works (perfectly|great)",
],
"friction": [
r"this is (confusing|complicated|hard|unclear|annoying)", r"keeps (failing|crashing|breaking)",
r"doesn['\u2019]?t work", r"won['\u2019]?t (let|allow|work)",
r"can['\u2019]?t (seem to|figure out)", r"\bstuck\b", r"takes (too long|forever)",
],
"workaround": [
r"\binstead\b", r"another way", r"\bmanually\b", r"copy[- ]paste",
r"different approach", r"\bworkaround\b", r"let me try", r"what if i",
],
"abandonment": [
r"\bforget it\b", r"\bnever mind\b", r"give up", r"doesn['\u2019]?t matter",
r"not worth", r"move on", r"skip (this|it|that)",
],
}
_COMPILED_UX = {
sig: [re.compile(p, re.IGNORECASE) for p in patterns]
for sig, patterns in _UX_PATTERNS.items()
}
def detect_ux_signals(user_texts: List[str]) -> List[str]:
combined = " ".join(user_texts).lower()
signals = []
for signal, patterns in _COMPILED_UX.items():
for pat in patterns:
if pat.search(combined):
signals.append(signal)
break
return signals
# โโ Observation builder โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def build_summary(user_texts: List[str], topics: List[str]) -> str:
if not user_texts:
return "No user messages in this window."
anchor = user_texts[0][:80].rstrip()
if len(user_texts[0]) > 80:
anchor += "โฆ"
topic_str = ", ".join(topics[:3]) if topics else "general"
return f'User explored {topic_str}. First message: "{anchor}"'[:200]
def pick_quotes(user_texts: List[str]) -> List[str]:
quotes = []
for text in user_texts:
text = text.strip()
if len(text) < 15:
continue
quotes.append(text[:117] + "โฆ" if len(text) > 120 else text)
if len(quotes) >= 3:
break
return quotes
def append_observation(obs: Dict[str, Any]) -> None:
OBSERVATIONS_DIR.mkdir(parents=True, exist_ok=True)
date_str = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
obs_file = OBSERVATIONS_DIR / f"{date_str}.jsonl"
with open(obs_file, "a") as f:
f.write(json.dumps(obs, ensure_ascii=False) + "\n")
# โโ Main pass โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def run(dry_run: bool = False) -> int:
now = datetime.now(tz=timezone.utc)
cp = load_checkpoint()
last_run_str = cp.get("last_run")
if last_run_str:
try:
since: Optional[datetime] = datetime.fromisoformat(last_run_str)
except ValueError:
since = now - timedelta(hours=1)
else:
since = now - timedelta(minutes=30)
sessions = find_sessions_modified_since(since)
processed: Dict[str, int] = cp.get("processed_sessions", {})
count = 0
for agent_id, session_id, path in sessions:
key = f"{agent_id}/{session_id}"
from_line = processed.get(key, 0)
messages = read_new_lines(path, from_line)
if not messages:
continue
all_texts = [t for m in messages for t in [extract_text(m)] if t]
user_texts = [extract_text(m) for m in messages if is_user_message(m)]
user_texts = [t for t in user_texts if t]
topics = classify_finance_topics(all_texts)
signals = detect_ux_signals(user_texts)
# Only log if there's at least one finance or UX signal
if not topics and not signals:
processed[key] = from_line + len(messages)
continue
tools = extract_tool_names(messages)
summary = build_summary(user_texts, topics)
quotes = pick_quotes(user_texts)
notes_parts = []
if topics:
notes_parts.append(f"Finance: {', '.join(topics)}.")
if signals:
notes_parts.append(f"Signals: {', '.join(signals)}.")
if tools:
notes_parts.append(f"Tools: {', '.join(tools[:4])}.")
obs: Dict[str, Any] = {
"timestamp": now.isoformat(),
"observation_id": "obs_" + now.strftime("%Y%m%d_%H%M%S"),
"session_key": key,
"channel": "main",
"what_user_tried": summary,
"finance_topic_tags": topics,
"tools_actions_observed": tools,
"notable_quotes": quotes,
"ux_signals": signals,
"researcher_notes": " ".join(notes_parts)[:300],
}
if dry_run:
print(json.dumps(obs, indent=2, ensure_ascii=False))
else:
append_observation(obs)
processed[key] = from_line + len(messages)
count += 1
if not dry_run:
cp["last_run"] = now.isoformat()
cp["processed_sessions"] = processed
save_checkpoint(cp)
return count
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true",
help="Print observations without writing to disk")
args = parser.parse_args()
count = run(dry_run=args.dry_run)
if args.dry_run:
print(f"\n[dry-run] Would have written {count} observation(s).")
# No output in normal cron runs
if __name__ == "__main__":
main()
```
### scripts/redact_reports.py
```python
#!/usr/bin/env python3
"""
redact_reports.py
PII Redactor โ strips personally identifiable information from report files.
Usage:
python3 redact_reports.py <file.md> # redact single file
python3 redact_reports.py <directory/> # redact all *.md in dir
python3 redact_reports.py --validate-only # scan REDACTED files for residual PII
python3 redact_reports.py --validate-only --verbose
Outputs: for each input.md โ input.REDACTED.md
Placeholders are consistent within a file: [EMAIL_1] always refers to the same original value.
"""
import argparse
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# โโ PII patterns (order matters โ specific before catch-all) โโโโโโโโโโโโโโโโโโโ
RAW_PATTERNS: List[Tuple[str, str, str]] = [
# SSN / Individual Tax ID
("SSN", r"\b\d{3}[-\u2013]\d{2}[-\u2013]\d{4}\b", "TAX_ID"),
# EIN XX-XXXXXXX
("EIN", r"\b\d{2}[-\u2013]\d{7}\b", "EIN"),
# Credit / debit card (4ร4 blocks)
("CC", r"\b(?:\d{4}[\s\-]){3}\d{4}\b", "CC"),
# Routing number in context
("ROUTING", r"(?i)\b(?:routing(?:\s+number)?|aba)[:\s#]*(\d{9})\b", "ROUTING"),
# Bank account in context
("ACCOUNT", r"(?i)\b(?:acct|account|acc(?:ount)?)[:\s#\.]*([\d\-]{6,20})\b","ACCOUNT"),
# Long standalone digit strings (โฅ10 digits)
("LONG_NUM", r"\b\d{10,}\b", "ACCOUNT"),
# US phone numbers
("PHONE", r"(?:\+?1[\s.\-]?)?\(?[2-9]\d{2}\)?[\s.\-]?\d{3}[\s.\-]?\d{4}","PHONE"),
# Email
("EMAIL", r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b", "EMAIL"),
# Bearer / OAuth tokens
("BEARER", r"(?i)Bearer\s+[A-Za-z0-9._\-/+]{20,}", "TOKEN"),
# Generic API keys (โฅ32 chars, no spaces)
("API_KEY", r"\b[A-Za-z0-9_\-]{32,}\b", "TOKEN"),
# Street addresses
("ADDRESS",
r"\b\d{1,5}\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,4}"
r"(?:\s+(?:St|Ave|Rd|Blvd|Dr|Ln|Ct|Way|Pl|Pkwy|Hwy|Cir|Ter|Sq)\.?\b)", "ADDRESS"),
# Name introductions
("NAME_IS",
r"(?i)(?:my name is|i am|i['\u2019]m)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2})",
"PERSON"),
# US ZIP codes (only when near a state abbreviation or comma)
("ZIP", r"(?i)\b(?:[A-Z]{2}\s+|,\s*)\d{5}(?:-\d{4})?\b", "ZIP"),
]
# Used for validation scans of REDACTED files
VALIDATION_PATTERNS: List[Tuple[str, re.Pattern]] = [
("email", re.compile(r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b")),
("phone", re.compile(r"(?:\+?1[\s.\-]?)?\(?[2-9]\d{2}\)?[\s.\-]?\d{3}[\s.\-]?\d{4}")),
("ssn", re.compile(r"\b\d{3}[-\u2013]\d{2}[-\u2013]\d{4}\b")),
("credit_card", re.compile(r"\b(?:\d{4}[\s\-]){3}\d{4}\b")),
("bearer", re.compile(r"(?i)Bearer\s+[A-Za-z0-9._\-/+]{20,}")),
]
class Redactor:
"""
Stateful PII redactor. Counters and seen-values reset between files so
[EMAIL_1] consistently refers to the same original within one document.
"""
def __init__(self) -> None:
self._counters: Dict[str, int] = {}
self._seen: Dict[str, str] = {}
self._compiled = [
(label, re.compile(pattern, re.MULTILINE), prefix)
for label, pattern, prefix in RAW_PATTERNS
]
def reset(self) -> None:
self._counters.clear()
self._seen.clear()
def _placeholder(self, prefix: str, original: str) -> str:
key = f"{prefix}:{original.strip()}"
if key not in self._seen:
n = self._counters.get(prefix, 0) + 1
self._counters[prefix] = n
self._seen[key] = f"[{prefix}_{n}]"
return self._seen[key]
def redact(self, text: str) -> str:
for _label, pattern, prefix in self._compiled:
def _sub(m: re.Match, p: str = prefix) -> str:
return self._placeholder(p, m.group(0))
text = pattern.sub(_sub, text)
return text
def redact_file(self, input_path: Path) -> Tuple[Path, str]:
self.reset()
raw = input_path.read_text(encoding="utf-8", errors="replace")
redacted = self.redact(raw)
ts = datetime.now(tz=timezone.utc).isoformat()
header = (
f"<!-- REDACTED VERSION โ Generated {ts} -->\n"
f"<!-- Source: {input_path.name} -->\n"
f"<!-- PII substitutions: {len(self._seen)} -->\n\n"
)
content = header + redacted
stem = input_path.stem
out = (
input_path
if stem.endswith(".REDACTED")
else input_path.with_name(stem + ".REDACTED" + input_path.suffix)
)
return out, content
@property
def substitution_count(self) -> int:
return len(self._seen)
# โโ Validation โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def validate_file(path: Path, verbose: bool = False) -> List[str]:
issues: List[str] = []
if not path.exists():
issues.append(f"MISSING: {path}")
return issues
content = path.read_text(encoding="utf-8", errors="replace")
for name, pattern in VALIDATION_PATTERNS:
matches = pattern.findall(content)
if matches:
issues.append(f"POSSIBLE_PII ({name}): {len(matches)} instance(s) in {path.name}")
if verbose:
for m in matches[:3]:
issues.append(f" โ {m!r}")
return issues
def validate_reports_dir(reports_base: Path, verbose: bool = False) -> bool:
"""Scan all date subdirectories under reports_base. Returns True if clean."""
if not reports_base.exists():
print("No reports directory found โ nothing to validate.")
return True
all_clean = True
for date_dir in sorted(reports_base.iterdir()):
if not date_dir.is_dir():
continue
md_files = sorted(date_dir.glob("*.md"))
source_files = [f for f in md_files if ".REDACTED." not in f.name]
redacted_files = [f for f in md_files if ".REDACTED." in f.name]
# Check every source file has a REDACTED counterpart
for f in source_files:
expected = f.with_name(f.stem + ".REDACTED" + f.suffix)
if not expected.exists():
print(f" โ MISSING_REDACTED: {date_dir.name}/{expected.name}")
all_clean = False
# Scan REDACTED files for residual PII
for f in redacted_files:
issues = validate_file(f, verbose=verbose)
if issues:
all_clean = False
print(f" โ ๏ธ {date_dir.name}/{f.name}")
for issue in issues:
print(f" โข {issue}")
return all_clean
# โโ CLI โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def main() -> None:
parser = argparse.ArgumentParser(description="Finance UX Observer โ PII Redactor")
parser.add_argument("target", nargs="?", help="File or directory to redact")
parser.add_argument("--validate-only", action="store_true")
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
if args.validate_only:
reports_base = Path(__file__).resolve().parent.parent / "reports"
clean = validate_reports_dir(reports_base, verbose=args.verbose)
if clean:
print("โ
Redaction validation passed โ no residual PII detected.")
sys.exit(0)
else:
sys.exit(1)
if not args.target:
parser.print_help()
sys.exit(1)
target = Path(args.target)
redactor = Redactor()
if target.is_file():
out_path, content = redactor.redact_file(target)
out_path.write_text(content, encoding="utf-8")
print(f"โ
{target.name} โ {out_path.name} ({redactor.substitution_count} substitution(s))")
elif target.is_dir():
sources = sorted(f for f in target.glob("*.md") if ".REDACTED." not in f.name)
if not sources:
print(f"No source .md files in {target}")
sys.exit(0)
for f in sources:
redactor = Redactor()
out_path, content = redactor.redact_file(f)
out_path.write_text(content, encoding="utf-8")
print(f" โ
{f.name} โ {out_path.name} ({redactor.substitution_count} substitution(s))")
else:
print(f"Error: '{target}' is not a file or directory.", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
```
### scripts/setup_cron.py
```python
#!/usr/bin/env python3
"""
setup_cron.py
Adds Finance UX Observer jobs to the user's system crontab.
Run once after installing the skill:
python3 ~/.openclaw/skills/finance-ux-observer/scripts/setup_cron.py
Options:
--remove Remove all Finance UX Observer cron entries
--status Show current status without making changes
"""
import subprocess
import sys
from pathlib import Path
SKILL_DIR = Path.home() / ".openclaw" / "skills" / "finance-ux-observer"
LOG_DIR = SKILL_DIR / "logs"
MARKER = "# finance-ux-observer"
ENTRIES = [
(
"*/30 * * * *",
f"python3 {SKILL_DIR}/scripts/observe_finance_usage.py",
"30-min observation pass",
),
(
"55 23 * * *",
f"TZ=America/Los_Angeles python3 {SKILL_DIR}/scripts/daily_synthesize.py"
f" >> {LOG_DIR}/synthesize.log 2>&1",
"end-of-day synthesis",
),
(
"0 6 * * *",
f"TZ=America/Los_Angeles python3 {SKILL_DIR}/scripts/redact_reports.py --validate-only"
f" >> {LOG_DIR}/redaction_check.log 2>&1",
"redaction integrity check",
),
]
def get_crontab() -> str:
result = subprocess.run(["crontab", "-l"], capture_output=True, text=True)
# crontab -l exits 1 with "no crontab" when empty โ treat as empty string
if result.returncode != 0 and "no crontab" in result.stderr.lower():
return ""
return result.stdout
def set_crontab(content: str) -> None:
proc = subprocess.run(["crontab", "-"], input=content, text=True, capture_output=True)
if proc.returncode != 0:
print(f"Error writing crontab: {proc.stderr}", file=sys.stderr)
sys.exit(1)
def cmd_install() -> None:
LOG_DIR.mkdir(parents=True, exist_ok=True)
(SKILL_DIR / "data" / "observations").mkdir(parents=True, exist_ok=True)
(SKILL_DIR / "reports").mkdir(parents=True, exist_ok=True)
current = get_crontab()
lines = current.splitlines(keepends=True)
added = 0
new_lines = list(lines)
# Ensure trailing newline before appending
if new_lines and not new_lines[-1].endswith("\n"):
new_lines[-1] += "\n"
for schedule, command, label in ENTRIES:
cron_line = f"{schedule} {command} {MARKER}\n"
if command in current:
print(f" โ Already registered: {label}")
else:
new_lines.append(cron_line)
print(f" โ Added: {label}")
added += 1
if added:
set_crontab("".join(new_lines))
print(f"\nโ
{added} cron job(s) registered.")
else:
print("\nโ
All jobs already registered โ nothing changed.")
def cmd_remove() -> None:
current = get_crontab()
filtered = [l for l in current.splitlines(keepends=True) if MARKER not in l]
removed = len(current.splitlines()) - len(filtered)
if removed:
set_crontab("".join(filtered))
print(f"โ
Removed {removed} Finance UX Observer cron entry/entries.")
else:
print("No Finance UX Observer cron entries found.")
def cmd_status() -> None:
current = get_crontab()
our_lines = [l.strip() for l in current.splitlines() if MARKER in l]
if not our_lines:
print("โ No Finance UX Observer cron jobs registered.")
print(" Run: python3 setup_cron.py")
else:
print(f"โ
{len(our_lines)} cron job(s) registered:\n")
for line in our_lines:
print(f" {line}")
def main() -> None:
flag = sys.argv[1] if len(sys.argv) > 1 else "--install"
dispatch = {
"--install": cmd_install,
"--remove": cmd_remove,
"--status": cmd_status,
}
handler = dispatch.get(flag)
if handler is None:
print(f"Unknown option: {flag}. Valid: {list(dispatch)}", file=sys.stderr)
sys.exit(1)
handler()
if __name__ == "__main__":
main()
```