media-news-digest
Generate media & entertainment industry news digests. Covers Hollywood trades (THR, Deadline, Variety), box office, streaming, awards season, film festivals, and production news. Four-source data collection from RSS feeds, Twitter/X KOLs, Reddit, and web search. Pipeline-based scripts with retry mechanisms and deduplication. Supports Discord and email output with PDF attachments.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-media-news-digest
Repository
Skill path: skills/dinstein/media-news-digest
Generate media & entertainment industry news digests. Covers Hollywood trades (THR, Deadline, Variety), box office, streaming, awards season, film festivals, and production news. Four-source data collection from RSS feeds, Twitter/X KOLs, Reddit, and web search. Pipeline-based scripts with retry mechanisms and deduplication. Supports Discord and email output with PDF attachments.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install media-news-digest into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding media-news-digest to shared team environments
- Use media-news-digest for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: media-news-digest
description: Generate media & entertainment industry news digests. Covers Hollywood trades (THR, Deadline, Variety), box office, streaming, awards season, film festivals, and production news. Four-source data collection from RSS feeds, Twitter/X KOLs, Reddit, and web search. Pipeline-based scripts with retry mechanisms and deduplication. Supports Discord and email output with PDF attachments.
version: "2.1.1"
homepage: https://github.com/draco-agent/media-news-digest
source: https://github.com/draco-agent/media-news-digest
metadata:
openclaw:
requires:
bins: ["python3"]
optionalBins: ["mail", "msmtp"]
credentialAccess: >
This skill does NOT read, store, or manage any platform credentials itself.
Email delivery uses send-email.py with system mail (msmtp). Twitter and web search
API keys are passed via environment variables and used only for outbound API calls.
No credentials are written to disk by this skill.
env:
- name: X_BEARER_TOKEN
required: false
description: Twitter/X API v2 bearer token for KOL monitoring (official backend)
- name: TWITTERAPI_IO_KEY
required: false
description: twitterapi.io API key (alternative Twitter backend)
- name: TWITTER_API_BACKEND
required: false
description: "Twitter backend selection: official, twitterapiio, or auto (default: auto)"
- name: BRAVE_API_KEY
required: false
description: Brave Search API key for web search (single key)
- name: BRAVE_API_KEYS
required: false
description: "Comma-separated Brave API keys for multi-key rotation (preferred over BRAVE_API_KEY)"
- name: TAVILY_API_KEY
required: false
description: Tavily Search API key (alternative web search backend)
---
# Media News Digest
Automated media & entertainment industry news digest system. Covers Hollywood trades, box office, streaming platforms, awards season, film festivals, production news, and industry deals.
## Quick Start
1. **Generate Digest** (unified pipeline β runs all 4 sources in parallel):
```bash
python3 scripts/run-pipeline.py \
--defaults <SKILL_DIR>/config/defaults \
--config <WORKSPACE>/config \
--hours 48 --freshness pd \
--archive-dir <WORKSPACE>/archive/media-news-digest/ \
--output /tmp/md-merged.json --verbose --force
```
2. **Use Templates**: Apply Discord or email templates to merged output
## Data Sources (65 total, 64 enabled)
- **RSS Feeds (36, 35 enabled)**: THR, Deadline, Variety, IndieWire, The Wrap, Collider, Vulture, Awards Daily, Gold Derby, Screen Rant, Empire, The Playlist, /Film, Entertainment Weekly, Roger Ebert, CinemaBlend, Den of Geek, The Direct, MovieWeb, CBR, What's on Netflix, Decider, Anime News Network, and more
- **Twitter/X KOLs (18)**: @THR, @DEADLINE, @Variety, @FilmUpdates, @DiscussingFilm, @BoxOfficeMojo, @MattBelloni, @Borys_Kit, @TheAcademy, @letterboxd, @A24, and more
- **Reddit (11)**: r/movies, r/boxoffice, r/television, r/Oscars, r/TrueFilm, r/entertainment, r/netflix, r/marvelstudios, r/DC_Cinematic, r/anime, r/flicks
- **Web Search (9 topics)**: Brave Search / Tavily with freshness filters
## Topics (9 sections)
- π¨π³ China / δΈε½ε½±θ§ β China mainland box office, Chinese films, Chinese streaming
- π¬ Production / εΆδ½ε¨ζ β New projects, casting, filming updates
- π° Deals & Business / θ‘δΈδΊ€ζ β M&A, rights, talent deals
- ποΈ Upcoming Releases / εηΎθΏζδΈζ β Theater openings, release dates, trailers
- ποΈ Box Office / η₯¨ζΏ β NA/global box office, opening weekends
- πΊ Streaming / ζ΅εͺδ½ β Netflix, Disney+, Apple TV+, HBO, viewership
- π Awards / ι’ε₯ε£ β Oscars, Golden Globes, Emmys, BAFTAs
- πͺ Film Festivals / η΅ε½±θ β Cannes, Venice, TIFF, Sundance, Berlin
- β Reviews & Buzz / ε½±θ―ε£η’ β Critical reception, RT/Metacritic scores
## Scripts Pipeline
### Unified Pipeline
```bash
python3 scripts/run-pipeline.py \
--defaults config/defaults --config workspace/config \
--hours 48 --freshness pd \
--archive-dir workspace/archive/media-news-digest/ \
--output /tmp/md-merged.json --verbose --force
```
- **Features**: Runs all 4 fetch steps in parallel, then merges + deduplicates + scores
- **Output**: Final merged JSON ready for report generation (~30s total)
- **Flags**: `--skip rss,twitter` to skip steps, `--enrich` for full-text enrichment
### Individual Scripts
- `fetch-rss.py` β Parallel RSS fetcher (10 workers, 30s timeout, caching)
- `fetch-twitter.py` β Dual backend: official X API v2 + twitterapi.io (auto fallback, 3-worker concurrency)
- `fetch-web.py` β Web search via Brave (multi-key rotation) or Tavily
- `fetch-reddit.py` β Reddit public JSON API (4 workers, no auth)
- `merge-sources.py` β Quality scoring, URL dedup, multi-source merging
- `summarize-merged.py` β Structured overview sorted by quality_score
- `enrich-articles.py` β Full-text enrichment for top articles
- `generate-pdf.py` β PDF generation with Chinese typography + emoji
- `send-email.py` β MIME email with HTML body + PDF attachment
- `sanitize-html.py` β XSS-safe markdown to HTML conversion
- `validate-config.py` β Configuration validator
- `source-health.py` β Source health tracking
- `config_loader.py` β Config overlay loader (defaults + user overrides)
- `test-pipeline.sh` β Pipeline testing with --only/--skip/--twitter-backend filters
## Cron Integration
Reference `references/digest-prompt.md` in cron prompts.
### Daily Digest
```
MODE = daily, FRESHNESS = pd, RSS_HOURS = 48
```
### Weekly Digest
```
MODE = weekly, FRESHNESS = pw, RSS_HOURS = 168
```
## Dependencies
All scripts work with **Python 3.8+ standard library only**. `feedparser` optional but recommended.
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/run-pipeline.py
```python
#!/usr/bin/env python3
"""
Unified data collection pipeline for media-news-digest.
Runs all 5 fetch steps (RSS, Twitter, GitHub, Reddit, Web) in parallel,
then merges + deduplicates + scores into a single output JSON.
Replaces the agent's sequential 6-step tool-call loop with one command,
eliminating ~60-120s of LLM round-trip overhead.
Usage:
python3 run-pipeline.py \
--defaults <SKILL_DIR>/config/defaults \
--config <WORKSPACE>/config \
--hours 48 --freshness pd \
--archive-dir <WORKSPACE>/archive/media-news-digest/ \
--output /tmp/md-merged.json \
--verbose
"""
import json
import sys
import os
import subprocess
import time
import argparse
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, Any
SCRIPTS_DIR = Path(__file__).parent
DEFAULT_TIMEOUT = 180 # per-step timeout in seconds
def setup_logging(verbose: bool) -> logging.Logger:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
return logging.getLogger(__name__)
def run_step(
name: str,
script: str,
args_list: list,
output_path: Path,
timeout: int = DEFAULT_TIMEOUT,
force: bool = False,
) -> Dict[str, Any]:
"""Run a fetch script as a subprocess, return result metadata."""
t0 = time.time()
cmd = [sys.executable, str(SCRIPTS_DIR / script)] + args_list + [
"--output", str(output_path),
]
if force:
cmd.append("--force")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env=os.environ,
)
elapsed = time.time() - t0
ok = result.returncode == 0
# Try to read output stats
count = 0
if ok and output_path.exists():
try:
with open(output_path) as f:
data = json.load(f)
count = (
data.get("total_articles")
or data.get("total_posts")
or data.get("total_releases")
or data.get("total_results")
or data.get("total")
or 0
)
except (json.JSONDecodeError, OSError):
pass
return {
"name": name,
"status": "ok" if ok else "error",
"elapsed_s": round(elapsed, 1),
"count": count,
"stderr_tail": (result.stderr or "").strip().split("\n")[-3:] if not ok else [],
}
except subprocess.TimeoutExpired:
elapsed = time.time() - t0
return {
"name": name,
"status": "timeout",
"elapsed_s": round(elapsed, 1),
"count": 0,
"stderr_tail": [f"Killed after {timeout}s"],
}
except Exception as e:
elapsed = time.time() - t0
return {
"name": name,
"status": "error",
"elapsed_s": round(elapsed, 1),
"count": 0,
"stderr_tail": [str(e)],
}
def main() -> int:
parser = argparse.ArgumentParser(
description="Run the full media-news-digest data pipeline in one shot.",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--defaults", type=Path, required=True, help="Skill defaults config dir")
parser.add_argument("--config", type=Path, default=None, help="User config overlay dir")
parser.add_argument("--hours", type=int, default=48, help="Time window in hours")
parser.add_argument("--freshness", type=str, default="pd", help="Web search freshness (pd/pw/pm)")
parser.add_argument("--archive-dir", type=Path, default=None, help="Archive dir for dedup penalty")
parser.add_argument("--output", "-o", type=Path, default=Path("/tmp/md-merged.json"), help="Final merged output")
parser.add_argument("--step-timeout", type=int, default=DEFAULT_TIMEOUT, help="Per-step timeout (seconds)")
parser.add_argument("--twitter-backend", choices=["official", "twitterapiio", "auto"], default=None, help="Twitter API backend to use")
parser.add_argument("--verbose", "-v", action="store_true")
parser.add_argument("--force", action="store_true", help="Force re-fetch ignoring caches")
parser.add_argument("--enrich", action="store_true", help="Enable full-text enrichment for top articles")
parser.add_argument("--skip", type=str, default="", help="Comma-separated list of steps to skip (rss,twitter,github,reddit,web)")
parser.add_argument("--reuse-dir", type=Path, default=None, help="Reuse existing intermediate directory instead of creating new one")
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Parse --skip into a set
skip_steps = set(s.strip().lower() for s in args.skip.split(',') if s.strip())
# Intermediate output paths
import tempfile
if args.reuse_dir:
_run_dir = str(args.reuse_dir)
os.makedirs(_run_dir, exist_ok=True)
else:
_run_dir = tempfile.mkdtemp(prefix="td-pipeline-")
tmp_rss = Path(_run_dir) / "rss.json"
tmp_twitter = Path(_run_dir) / "twitter.json"
tmp_github = Path(_run_dir) / "github.json"
tmp_trending = Path(_run_dir) / "trending.json"
tmp_reddit = Path(_run_dir) / "reddit.json"
tmp_web = Path(_run_dir) / "web.json"
logger.info(f"π Run directory: {_run_dir}")
# Common args for all fetch scripts
common = ["--defaults", str(args.defaults)]
if args.config:
common += ["--config", str(args.config)]
common += ["--hours", str(args.hours)]
verbose_flag = ["--verbose"] if args.verbose else []
# Define the 5 parallel fetch steps
steps = [
("RSS", "fetch-rss.py", common + verbose_flag, tmp_rss),
("Twitter", "fetch-twitter.py", common + verbose_flag + (["--backend", args.twitter_backend] if args.twitter_backend else []), tmp_twitter),
("GitHub", "fetch-github.py", common + verbose_flag, tmp_github),
("GitHub Trending", "fetch-github.py", ["--trending", "--hours", str(args.hours)] + verbose_flag, tmp_trending),
("Reddit", "fetch-reddit.py", common + verbose_flag, tmp_reddit),
("Web", "fetch-web.py",
["--defaults", str(args.defaults)]
+ (["--config", str(args.config)] if args.config else [])
+ ["--freshness", args.freshness]
+ verbose_flag,
tmp_web),
]
# Filter steps by --skip and --reuse-dir
active_steps = []
for name, script, step_args, out_path in steps:
step_key = name.lower()
if step_key in skip_steps:
logger.info(f" βοΈ {name}: skipped (--skip)")
continue
if args.reuse_dir and out_path.exists() and not args.force:
logger.info(f" β»οΈ {name}: reusing existing {out_path}")
continue
active_steps.append((name, script, step_args, out_path))
logger.info(f"π Starting pipeline: {len(active_steps)}/{len(steps)} sources, {args.hours}h window, freshness={args.freshness}")
t_start = time.time()
# Phase 1: Parallel fetch
step_results = []
if active_steps:
with ThreadPoolExecutor(max_workers=len(active_steps)) as pool:
futures = {}
for name, script, step_args, out_path in active_steps:
f = pool.submit(run_step, name, script, step_args, out_path, args.step_timeout, args.force)
futures[f] = name
for future in as_completed(futures):
res = future.result()
step_results.append(res)
status_icon = {"ok": "β
", "error": "β", "timeout": "β°"}.get(res["status"], "?")
logger.info(f" {status_icon} {res['name']}: {res['count']} items ({res['elapsed_s']}s)")
if res["status"] != "ok" and res["stderr_tail"]:
for line in res["stderr_tail"]:
logger.debug(f" {line}")
fetch_elapsed = time.time() - t_start
logger.info(f"π‘ Fetch phase done in {fetch_elapsed:.1f}s")
# Phase 2: Merge
logger.info("π Merging & scoring...")
merge_args = ["--verbose"] if args.verbose else []
for flag, path in [("--rss", tmp_rss), ("--twitter", tmp_twitter),
("--github", tmp_github), ("--trending", tmp_trending), ("--reddit", tmp_reddit),
("--web", tmp_web)]:
if path.exists():
merge_args += [flag, str(path)]
if args.archive_dir:
merge_args += ["--archive-dir", str(args.archive_dir)]
merge_args += ["--output", str(args.output)]
merge_result = run_step("Merge", "merge-sources.py", merge_args, args.output, timeout=60, force=False)
# Phase 3: Enrich high-scoring articles with full text
if merge_result["status"] == "ok" and args.enrich and "enrich" not in skip_steps:
logger.info("π° Enriching top articles with full text...")
enrich_args = ["--input", str(args.output), "--output", str(args.output)]
enrich_args += ["--verbose"] if args.verbose else []
enrich_result = run_step("Enrich", "enrich-articles.py", enrich_args, args.output, timeout=120, force=False)
else:
enrich_result = {"name": "Enrich", "status": "skipped", "elapsed_s": 0, "count": 0, "stderr_tail": []}
total_elapsed = time.time() - t_start
# Summary
logger.info(f"{'=' * 50}")
logger.info(f"π Pipeline Summary ({total_elapsed:.1f}s total)")
for r in step_results:
logger.info(f" {r['name']:10s} {r['status']:7s} {r['count']:4d} items {r['elapsed_s']:5.1f}s")
logger.info(f" {'Merge':10s} {merge_result['status']:7s} {merge_result.get('count',0):4d} items {merge_result['elapsed_s']:5.1f}s")
logger.info(f" Output: {args.output}")
if merge_result["status"] != "ok":
logger.error(f"β Merge failed: {merge_result['stderr_tail']}")
return 1
# Write pipeline metadata alongside output for agent consumption
meta = {
"pipeline_version": "1.0.0",
"total_elapsed_s": round(total_elapsed, 1),
"fetch_elapsed_s": round(fetch_elapsed, 1),
"steps": step_results,
"merge": merge_result,
"output": str(args.output),
}
meta_path = args.output.with_suffix(".meta.json")
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
if not args.reuse_dir:
import shutil
try:
shutil.rmtree(_run_dir)
logger.debug(f"Cleaned up {_run_dir}")
except Exception:
pass
logger.info(f"β
Done β {args.output}")
return 0
if __name__ == "__main__":
sys.exit(main())
```
### references/digest-prompt.md
```markdown
# Digest Prompt Template
Replace `<...>` placeholders before use. Daily defaults shown; weekly overrides in parentheses.
## Placeholders
| Placeholder | Default | Weekly Override |
|-------------|---------|----------------|
| `<MODE>` | `daily` | `weekly` |
| `<TIME_WINDOW>` | `past 1-2 days` | `past 7 days` |
| `<FRESHNESS>` | `pd` | `pw` |
| `<RSS_HOURS>` | `48` | `168` |
| `<ITEMS_PER_SECTION>` | `3-5` | `5-8` |
| `<BLOG_PICKS_COUNT>` | `2-3` | `3-5` |
| `<EXTRA_SECTIONS>` | *(none)* | `π Weekly Trend Summary` |
| `<SUBJECT>` | `Daily Media Digest - YYYY-MM-DD - π¬ ζ―ζ₯ε½±θ§ζ₯ζ₯` | `Weekly Media Digest - YYYY-MM-DD - π¬ ζ―ε¨ε½±θ§ε¨ζ₯` |
| `<WORKSPACE>` | Your workspace path | |
| `<SKILL_DIR>` | Installed skill directory | |
| `<DISCORD_CHANNEL_ID>` | Target channel ID | |
| `<EMAIL>` | *(optional)* Recipient email | |
| `<EMAIL_FROM>` | *(optional)* e.g. `MyBot <[email protected]>` | |
| `<LANGUAGE>` | `Chinese` | |
| `<TEMPLATE>` | `discord` / `email` / `markdown` | |
| `<DATE>` | Today's date YYYY-MM-DD (caller provides) | |
| `<VERSION>` | Read from SKILL.md frontmatter | |
---
Generate the <MODE> media & entertainment digest for **<DATE>**. Use `<DATE>` as the report date β do NOT infer it.
## Configuration
Read config files (workspace overrides take priority over defaults):
1. **Sources**: `<WORKSPACE>/config/sources.json` β fallback `<SKILL_DIR>/config/defaults/sources.json`
2. **Topics**: `<WORKSPACE>/config/topics.json` β fallback `<SKILL_DIR>/config/defaults/topics.json`
## Context: Previous Report
Read the most recent file from `<WORKSPACE>/archive/media-news-digest/` to avoid repeats and follow up on developing stories. Skip if none exists.
## Data Collection Pipeline
**Use the unified pipeline** (runs all 4 sources in parallel, ~30s):
```bash
python3 <SKILL_DIR>/scripts/run-pipeline.py \
--defaults <SKILL_DIR>/config/defaults \
--config <WORKSPACE>/config \
--hours <RSS_HOURS> --freshness <FRESHNESS> \
--archive-dir <WORKSPACE>/archive/media-news-digest/ \
--output /tmp/md-merged.json --verbose --force
```
If it fails, run individual scripts in `<SKILL_DIR>/scripts/` (see each script's `--help`), then merge with `merge-sources.py`.
## Report Generation
Get a structured overview:
```bash
python3 <SKILL_DIR>/scripts/summarize-merged.py --input /tmp/md-merged.json --top <ITEMS_PER_SECTION>
```
Use this output to select articles β **do NOT write ad-hoc Python to parse the JSON**. Apply the template from `<SKILL_DIR>/references/templates/<TEMPLATE>.md`.
Select articles **purely by quality_score regardless of source type**. Articles in merged JSON are already sorted by quality_score descending within each topic β respect this order. For Reddit posts, append `*[Reddit r/xxx, {{score}}β]*`.
Each article line must include its quality score using π₯ prefix. Format: `π₯{score} | {summary with link}`. This makes scoring transparent and helps readers identify the most important news at a glance.
### Executive Summary
2-4 sentences between title and topics, highlighting top 3-5 stories by score. Concise and punchy, no links. Discord: `> ` blockquote. Email: gray background.
### Topic Sections
From `topics.json`: `emoji` + `label` headers, `<ITEMS_PER_SECTION>` items each.
**β οΈ CRITICAL: Output articles in EXACTLY the same order as summarize-merged.py output (quality_score descending). Do NOT reorder, group by subtopic, or rearrange. The π₯ scores must appear in strictly decreasing order within each section.**
Every topic **must appear** β even with 1-2 items. If sparse, note "ζ¬ζ₯θ―₯ζΏεθΎε°".
### Fixed Sections (after topics)
**π’ KOL Updates** β Twitter KOLs. Format:
```
β’ **Display Name** (@handle) β summary `π 12.3K | π¬ 45 | π 230 | β€οΈ 1.2K`
<https://twitter.com/handle/status/ID>
```
Read `display_name` and `metrics` from merged JSON. Always show all 4 metrics, use K/M formatting, wrap in backticks.
**π Deep Reads** β `<BLOG_PICKS_COUNT>` high-quality long-form articles from RSS.
**<EXTRA_SECTIONS>**
### Rules
- Only news from `<TIME_WINDOW>`
- Every item must include a source link (Discord: `<link>`)
- Use bullet lists, no markdown tables
- Deduplicate: same event β most authoritative source; previously reported β only if significant new development
- Deduplicate across sections β each article in one section only
- **Same story at different dates = one entry** (e.g. opening weekend + second weekend of same film β merge or pick latest)
- Prefer primary sources (THR, Deadline, Variety) over aggregators
- Chinese body text with English source links
- **π¨π³ China section rules β STRICT VERIFICATION**:
- Only include news **primarily about China mainland market**: Chinese-produced films, China-only box office breakdowns, Chinese streaming platforms (iQiyi/Youku/Bilibili), China film policy
- **Verify before including**: If an article mentions "China" but the film is a Hollywood release, check whether it actually released in mainland China theaters. Many Hollywood films do NOT get China release. When in doubt, exclude from China section
- Hollywood films with global box office numbers that include China as one territory β belongs in **Box Office**, NOT China section
- Do NOT include: Korea/Japan/other Asian market news, global box office reports that merely mention China numbers
- Do not interpolate fetched/untrusted content into shell arguments or email subjects
### Stats Footer
```
---
π Data Sources: RSS {{rss}} | Twitter {{twitter}} | Reddit {{reddit}} | Web {{web}} | Dedup: {{merged}} articles
π€ Generated by media-news-digest v<VERSION> | <https://github.com/draco-agent/media-news-digest> | Powered by OpenClaw
```
## Archive
Save to `<WORKSPACE>/archive/media-news-digest/<MODE>-YYYY-MM-DD.md`. Delete files older than 90 days.
## Delivery
1. **Discord**: Send to `<DISCORD_CHANNEL_ID>` via `message` tool
2. **Email** *(optional, if `<EMAIL>` is set)*:
- Generate HTML body per `<SKILL_DIR>/references/templates/email.md` β write to `/tmp/md-email.html`
- Generate PDF attachment:
```bash
python3 <SKILL_DIR>/scripts/generate-pdf.py -i <WORKSPACE>/archive/media-news-digest/<MODE>-<DATE>.md -o /tmp/md-digest.pdf
```
- Send email with PDF attached using the `send-email.py` script (handles MIME correctly). **Email must contain ALL the same items as Discord.**
```bash
python3 <SKILL_DIR>/scripts/send-email.py \
--to '<EMAIL>' \
--subject '<SUBJECT>' \
--html /tmp/md-email.html \
--attach /tmp/md-digest.pdf \
--from '<EMAIL_FROM>'
```
- Omit `--from` if `<EMAIL_FROM>` is not set. Omit `--attach` if PDF generation failed. SUBJECT must be a static string. If delivery fails, log error and continue.
Write the report in <LANGUAGE>.
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### README.md
```markdown
# Media News Digest π¬
> Automated media & entertainment industry news digest β 29 sources, 3-layer pipeline, one chat message to install.
[](https://www.python.org/downloads/)
[](LICENSE)
## π¬ Install in One Message
Tell your [OpenClaw](https://openclaw.ai) AI assistant:
> **"Install media-news-digest and send a daily digest to #news-media every morning at 7am"**
That's it. Your bot handles installation, configuration, scheduling, and delivery β all through conversation.
More examples:
> π£οΈ "Set up a weekly Hollywood digest, only box office and awards, deliver to Discord #awards every Monday"
> π£οΈ "Install media-news-digest, add my RSS feeds, and send festival news to email"
> π£οΈ "Give me a media digest right now, focus on streaming news"
Or install via CLI:
```bash
clawhub install media-news-digest
```
## π What You Get
A quality-scored, deduplicated entertainment industry digest built from **29 sources**:
| Layer | Sources | What |
|-------|---------|------|
| π‘ RSS | 16 feeds | THR, Deadline, Variety, IndieWire, The Wrap, Collider, Gold Derbyβ¦ |
| π¦ Twitter/X | 11 KOLs | @THR, @DEADLINE, @Variety, @BoxOfficeMojo, @MattBelloniβ¦ |
| π Web Search | 7 topics | Brave Search API with freshness filters |
### Pipeline
```
RSS + Twitter + Web Search
β
merge-sources.py
β
Quality Scoring β Deduplication β Topic Grouping
β
Discord / Email output
```
## π― 7 Topic Sections
| # | Section | Covers |
|---|---------|--------|
| π¬ | Production / εΆδ½ε¨ζ | New projects, casting, filming updates |
| π° | Deals & Business / θ‘δΈδΊ€ζ | M&A, rights, talent deals, restructuring |
| ποΈ | Box Office / η₯¨ζΏ | NA/global box office, opening weekends |
| πΊ | Streaming / ζ΅εͺδ½ | Netflix, Disney+, Apple TV+, viewership |
| π | Awards / ι’ε₯ε£ | Oscars, Golden Globes, Emmys, campaigns |
| πͺ | Film Festivals / η΅ε½±θ | Cannes, Venice, TIFF, Sundance, Berlin |
| β | Reviews & Buzz / ε½±θ―ε£η’ | Critical reception, RT/Metacritic scores |
## π‘ RSS Sources (16 enabled)
THR Β· Deadline Β· Variety Β· IndieWire Β· The Wrap Β· Collider Β· Awards Daily Β· Gold Derby Β· Screen Rant Β· Empire Β· The Playlist Β· /Film Β· JoBlo Β· FirstShowing.net Β· ComingSoon.net Β· World of Reel
## π¦ Twitter/X KOLs (11)
@THR Β· @DEADLINE Β· @Variety Β· @FilmUpdates Β· @DiscussingFilm Β· @ScottFeinberg Β· @kristapley Β· @BoxOfficeMojo Β· @GiteshPandya Β· @MattBelloni Β· @Borys_Kit
## βοΈ Configuration
### Default configs in `config/defaults/`:
- `sources.json` β RSS feeds, Twitter handles
- `topics.json` β Report sections with search queries
### User overrides in `workspace/config/`:
- Same `id` β user version wins
- New `id` β appended to defaults
## π Quick Start
```bash
# Full pipeline
python3 scripts/fetch-rss.py --defaults config/defaults --hours 48 --output /tmp/md-rss.json
python3 scripts/fetch-twitter.py --defaults config/defaults --hours 48 --output /tmp/md-twitter.json
python3 scripts/fetch-web.py --defaults config/defaults --freshness pd --output /tmp/md-web.json
python3 scripts/merge-sources.py --rss /tmp/md-rss.json --twitter /tmp/md-twitter.json --web /tmp/md-web.json --output /tmp/md-merged.json
```
## π¦ Dependencies
```bash
pip install -r requirements.txt
```
All scripts work with **Python 3.8+ standard library only**. `feedparser` optional but recommended.
## π Cron Integration
Reference `references/digest-prompt.md` in OpenClaw cron prompts. See [SKILL.md](SKILL.md) for full documentation.
## License
MIT
```
### _meta.json
```json
{
"owner": "dinstein",
"slug": "media-news-digest",
"displayName": "Media News Digest",
"latest": {
"version": "2.1.1",
"publishedAt": 1772383203855,
"commit": "https://github.com/openclaw/skills/commit/d19246ed40e7cf4d1be0a83d90a8dedc927c8697"
},
"history": [
{
"version": "1.8.1",
"publishedAt": 1771769764440,
"commit": "https://github.com/openclaw/skills/commit/bf67402bec3290462e4ca78b6fe0f7fdd34fcc6c"
},
{
"version": "1.8.0",
"publishedAt": 1771656728335,
"commit": "https://github.com/openclaw/skills/commit/e509b40d4f6b44c00f6c9f465d277aa911295daa"
},
{
"version": "1.7.1",
"publishedAt": 1771470668230,
"commit": "https://github.com/openclaw/skills/commit/184aaec7885bca0e14da5c4377d0aa0f2007315a"
},
{
"version": "1.6.3",
"publishedAt": 1771375582182,
"commit": "https://github.com/openclaw/skills/commit/43e2fd70fbd7b9a09a50458f9144e10c702a15cf"
},
{
"version": "1.6.1",
"publishedAt": 1771329585077,
"commit": "https://github.com/openclaw/skills/commit/e0e0d2efcfb74132dabf647a5b6c8357f1877c66"
},
{
"version": "1.6.0",
"publishedAt": 1771302892917,
"commit": "https://github.com/openclaw/skills/commit/b3025b45d4f97bd911f598bb95c91c70c4c62a3e"
},
{
"version": "1.3.0",
"publishedAt": 1771232597533,
"commit": "https://github.com/openclaw/skills/commit/323487faab39653aee723884764b6c24a15f3014"
}
]
}
```
### references/templates/discord.md
```markdown
# Media Digest Discord Template
Discord-optimized format with bullet points and link suppression.
## Template Structure
```markdown
# π¬ ζ―ζ₯ε½±θ§ζ₯ζ₯ β {{DATE}}
> {{EXECUTIVE_SUMMARY}}
{{#topics}}
## {{emoji}} {{label}}
{{#articles}}
β’ π₯{{quality_score}} | {{chinese_summary}}
<{{link}}>
{{#multi_source}}*[{{source_count}} sources]*{{/multi_source}}
{{/articles}}
{{/topics}}
## π’ KOL Updates
{{#kol_tweets}}
β’ **{{display_name}}** (@{{handle}}) β {{summary}} `π {{views}} | π¬ {{replies}} | π {{retweets}} | β€οΈ {{likes}}`
<{{tweet_link}}>
{{/kol_tweets}}
## π Deep Reads
{{#deep_reads}}
β’ {{title}} β {{description}}
<{{link}}>
{{/deep_reads}}
---
π Data Sources: RSS {{rss_count}} | Twitter {{twitter_count}} | Web {{web_count}} | After dedup: {{merged_count}} articles
π€ Generated by media-news-digest v{{version}} | <https://github.com/draco-agent/media-news-digest> | Powered by OpenClaw
```
## Delivery
- **Default: Channel** β Send to the Discord channel specified by `DISCORD_CHANNEL_ID`
- Use `message` tool with `target` set to the channel ID for channel delivery
- For DM delivery instead, set `target` to a user ID
## Discord-Specific Rules
- **Link suppression**: Wrap links in `<>` to prevent embeds
- **Bullet format**: Use `β’` for clean mobile display
- **No tables**: Discord mobile doesn't handle markdown tables well
- **Emoji headers**: Visual hierarchy with topic emojis
- **Character limits**: Discord messages have 2000 char limit, may need splitting
- **Chinese body + English links**: Summary in Chinese, original links preserved
```
### references/templates/email.md
```markdown
# Media Digest Email Template
HTML email format optimized for Gmail/Outlook rendering.
## Delivery
**Step 1: Sanitize** β convert markdown report to XSS-safe HTML:
```bash
python3 <SKILL_DIR>/scripts/sanitize-html.py --input /tmp/md-report.md --output /tmp/md-email.html
```
**Step 2: Send** via `gog gmail send`:
```bash
gog gmail send --to '<EMAIL>' --subject '<SUBJECT>' --body-html "$(cat /tmp/md-email.html)"
```
β οΈ **Security**: Never manually build HTML from fetched content. Always use sanitize-html.py which HTML-escapes all text, validates URLs (http/https only), and strips dangerous content.
## Template Structure
```html
<div style="max-width:640px;margin:0 auto;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;color:#1a1a1a;line-height:1.6">
<h1 style="font-size:22px;border-bottom:2px solid #e5e5e5;padding-bottom:8px">
π¬ ζ―ζ₯ε½±θ§ζ₯ζ₯ β {{DATE}}
</h1>
<p style="color:#555;font-size:14px;background:#f8f9fa;padding:12px;border-radius:6px">
{{EXECUTIVE_SUMMARY}}
</p>
<h2 style="font-size:17px;margin-top:24px;color:#333">{{emoji}} {{label}}</h2>
<ul style="padding-left:20px">
<li style="margin-bottom:10px">
<strong>π₯{{quality_score}} | {{chinese_summary}}</strong> β {{description}}
<br><a href="{{link}}" style="color:#0969da;font-size:13px">{{link}}</a>
</li>
</ul>
<h2 style="font-size:17px;margin-top:24px;color:#333">π’ KOL Updates</h2>
<ul style="padding-left:20px">
<li style="margin-bottom:10px">
<strong>{{display_name}}</strong> (@{{handle}}) β {{summary}}
<br><code style="font-size:12px;color:#888;background:#f4f4f4;padding:2px 6px;border-radius:3px">π {{views}} | π¬ {{replies}} | π {{retweets}} | β€οΈ {{likes}}</code>
<br><a href="{{tweet_link}}" style="color:#0969da;font-size:13px">{{tweet_link}}</a>
</li>
</ul>
<hr style="border:none;border-top:1px solid #e5e5e5;margin:24px 0">
<p style="font-size:12px;color:#888">
π Data Sources: RSS {{rss_count}} | Twitter {{twitter_count}} | Web {{web_count}} | After dedup: {{merged_count}} articles
<br>π€ Generated by <a href="https://github.com/draco-agent/media-news-digest" style="color:#0969da">media-news-digest</a> v{{version}} | Powered by <a href="https://openclaw.ai" style="color:#0969da">OpenClaw</a>
</p>
</div>
```
## Style Guidelines
- **Max width**: 640px centered (mobile-friendly)
- **Fonts**: System font stack (no web fonts in email)
- **All styles inline**: Email clients strip `<style>` tags
- **Links**: Use full URLs, styled with `color:#0969da`
- **Headings**: h1 for title (22px), h2 for topics (17px)
- **Lists**: `<ul>` with `<li>`, adequate spacing
- **Footer**: Small gray text with stats + repo link + OpenClaw branding
- **No images**: Pure text/HTML for maximum compatibility
- **No tables for layout**: Use div + inline styles
- **Chinese body + English links**
```
### scripts/config_loader.py
```python
#!/usr/bin/env python3
"""
Configuration overlay loader for media-news-digest.
Handles loading and merging of default configurations with optional user overlays.
Supports sources.json and topics.json with overlay logic for customization.
"""
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
def load_merged_sources(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
"""
Load and merge sources from defaults and optional user config overlay.
Args:
defaults_dir: Path to default configuration directory (skill defaults)
config_dir: Optional path to user configuration directory (overlay)
Returns:
List of merged source configurations
Merge Logic:
1. Load defaults/sources.json as base
2. If config_dir provided and has sources.json, load user overlay
3. For each user source:
- If id matches default source: user version completely replaces default
- If id is new: append to list
- If user source has "enabled": false: disable matching default source
"""
defaults_path = defaults_dir / "sources.json"
# Load default sources
try:
with open(defaults_path, 'r', encoding='utf-8') as f:
defaults_data = json.load(f)
default_sources = defaults_data.get("sources", [])
logger.debug(f"Loaded {len(default_sources)} default sources from {defaults_path}")
except FileNotFoundError:
raise FileNotFoundError(f"Default sources config not found: {defaults_path}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in default sources config: {e}")
# Validate required fields
validated = []
required_fields = {"id", "type", "enabled"}
for i, source in enumerate(default_sources):
missing = required_fields - set(source.keys())
if missing:
logger.warning(f"Source #{i} missing required fields {missing}, skipping: {source}")
continue
validated.append(source)
default_sources = validated
# If no user config directory specified, return defaults only
if config_dir is None:
return default_sources
config_path = config_dir / "media-news-digest-sources.json"
# Try to load user overlay
try:
with open(config_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
user_sources = config_data.get("sources", [])
logger.debug(f"Loaded {len(user_sources)} user sources from {config_path}")
except FileNotFoundError:
logger.debug(f"No user sources config found at {config_path}, using defaults only")
return default_sources
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in user sources config {config_path}: {e}, using defaults only")
return default_sources
# Merge logic: create lookup by id for efficient merging
merged_sources = {}
# Start with all default sources
for source in default_sources:
source_id = source.get("id")
if source_id:
merged_sources[source_id] = source.copy()
# Apply user overlay
for user_source in user_sources:
source_id = user_source.get("id")
if not source_id:
continue
if source_id in merged_sources:
# User source overrides default completely
if user_source.get("enabled") is False:
# User explicitly disables this source
merged_sources[source_id]["enabled"] = False
logger.debug(f"User disabled source: {source_id}")
else:
# User replaces entire source config
merged_sources[source_id] = user_source.copy()
logger.debug(f"User overrode source: {source_id}")
else:
# New user source, append
merged_sources[source_id] = user_source.copy()
logger.debug(f"User added new source: {source_id}")
# Convert back to list, maintaining order (defaults first, then user additions)
result = []
# Add default sources (potentially overridden)
for source in default_sources:
source_id = source.get("id")
if source_id and source_id in merged_sources:
result.append(merged_sources[source_id])
# Add new user sources
for user_source in user_sources:
source_id = user_source.get("id")
if source_id and source_id not in [s.get("id") for s in default_sources]:
result.append(merged_sources[source_id])
logger.info(f"Merged configuration: {len(default_sources)} defaults + {len(user_sources)} user = {len(result)} total sources")
return result
def load_merged_topics(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
"""
Load and merge topics from defaults and optional user config overlay.
Args:
defaults_dir: Path to default configuration directory (skill defaults)
config_dir: Optional path to user configuration directory (overlay)
Returns:
List of merged topic configurations
Merge Logic:
1. Load defaults/topics.json as base
2. If config_dir provided and has topics.json, load user overlay
3. For each user topic:
- If id matches default topic: user version completely replaces default
- If id is new: append to list
"""
defaults_path = defaults_dir / "topics.json"
# Load default topics
try:
with open(defaults_path, 'r', encoding='utf-8') as f:
defaults_data = json.load(f)
default_topics = defaults_data.get("topics", [])
logger.debug(f"Loaded {len(default_topics)} default topics from {defaults_path}")
except FileNotFoundError:
raise FileNotFoundError(f"Default topics config not found: {defaults_path}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in default topics config: {e}")
# If no user config directory specified, return defaults only
if config_dir is None:
return default_topics
config_path = config_dir / "media-news-digest-topics.json"
# Try to load user overlay
try:
with open(config_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
user_topics = config_data.get("topics", [])
logger.debug(f"Loaded {len(user_topics)} user topics from {config_path}")
except FileNotFoundError:
logger.debug(f"No user topics config found at {config_path}, using defaults only")
return default_topics
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in user topics config {config_path}: {e}, using defaults only")
return default_topics
# Merge logic: create lookup by id for efficient merging
merged_topics = {}
# Start with all default topics
for topic in default_topics:
topic_id = topic.get("id")
if topic_id:
merged_topics[topic_id] = topic.copy()
# Apply user overlay
for user_topic in user_topics:
topic_id = user_topic.get("id")
if not topic_id:
continue
if topic_id in merged_topics:
# User topic overrides default completely
merged_topics[topic_id] = user_topic.copy()
logger.debug(f"User overrode topic: {topic_id}")
else:
# New user topic, append
merged_topics[topic_id] = user_topic.copy()
logger.debug(f"User added new topic: {topic_id}")
# Convert back to list, maintaining order (defaults first, then user additions)
result = []
# Add default topics (potentially overridden)
for topic in default_topics:
topic_id = topic.get("id")
if topic_id and topic_id in merged_topics:
result.append(merged_topics[topic_id])
# Add new user topics
for user_topic in user_topics:
topic_id = user_topic.get("id")
if topic_id and topic_id not in [t.get("id") for t in default_topics]:
result.append(merged_topics[topic_id])
logger.info(f"Merged topics: {len(default_topics)} defaults + {len(user_topics)} user = {len(result)} total topics")
return result
```
### scripts/enrich-articles.py
```python
#!/usr/bin/env python3
"""
Enrich high-scoring articles with full text content.
Fetches full article text for top articles from merged JSON, using:
1. Cloudflare Markdown for Agents (Accept: text/markdown) β preferred
2. HTML readability extraction β fallback
3. Skip β for paywalled/JS-heavy pages
Usage:
python3 enrich-articles.py --input merged.json --output enriched.json [--min-score 10] [--verbose]
"""
import json
import re
import sys
import os
import argparse
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from html.parser import HTMLParser
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from urllib.request import urlopen, Request
from urllib.error import HTTPError, URLError
TIMEOUT = 10
MAX_WORKERS = 5
DEFAULT_MIN_SCORE = 10
DEFAULT_MAX_ARTICLES = 15
DEFAULT_MAX_CHARS = 2000
USER_AGENT = "MediaDigest/3.0 (article enrichment)"
SKIP_DOMAINS = {
"twitter.com", "x.com",
"reddit.com", "old.reddit.com",
"github.com",
"youtube.com", "youtu.be",
"nytimes.com", "bloomberg.com", "wsj.com", "ft.com",
"arxiv.org",
}
def setup_logging(verbose=False):
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
return logging.getLogger(__name__)
def get_domain(url):
try:
from urllib.parse import urlparse
return urlparse(url).netloc.lower().lstrip("www.")
except Exception:
return ""
class TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self._text = []
self._skip = False
self._skip_tags = {"script", "style", "nav", "footer", "header", "aside", "noscript"}
def handle_starttag(self, tag, attrs):
if tag in self._skip_tags:
self._skip = True
def handle_endtag(self, tag):
if tag in self._skip_tags:
self._skip = False
if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "li"):
self._text.append("\n")
def handle_data(self, data):
if not self._skip:
self._text.append(data)
def get_text(self):
raw = "".join(self._text)
raw = re.sub(r"[ \t]+", " ", raw)
raw = re.sub(r"\n{3,}", "\n\n", raw)
return raw.strip()
def extract_readable_text(html):
article_match = re.search(r"<article[^>]*>(.*?)</article>", html, re.DOTALL | re.IGNORECASE)
fragment = article_match.group(1) if article_match else html
extractor = TextExtractor()
try:
extractor.feed(fragment)
except Exception:
return ""
return extractor.get_text()
def fetch_full_text(url, max_chars=DEFAULT_MAX_CHARS):
domain = get_domain(url)
if domain in SKIP_DOMAINS:
return {"text": "", "method": "skipped", "tokens": 0, "error": f"domain {domain} in skip list"}
try:
headers = {"Accept": "text/markdown, text/html;q=0.9", "User-Agent": USER_AGENT}
req = Request(url, headers=headers)
with urlopen(req, timeout=TIMEOUT) as resp:
content_type = resp.headers.get("Content-Type", "")
token_header = resp.headers.get("x-markdown-tokens", "")
raw = resp.read()
if raw[:2] == b"\x1f\x8b":
import gzip
raw = gzip.decompress(raw)
text = raw.decode("utf-8", errors="replace")
if "text/markdown" in content_type:
tokens = int(token_header) if token_header.isdigit() else len(text) // 4
return {"text": text[:max_chars], "method": "cf-markdown", "tokens": tokens, "error": None}
extracted = extract_readable_text(text)
if len(extracted) < 100:
return {"text": "", "method": "html-too-short", "tokens": 0, "error": "extracted text too short"}
return {"text": extracted[:max_chars], "method": "html-extract", "tokens": len(extracted[:max_chars]) // 4, "error": None}
except HTTPError as e:
return {"text": "", "method": "error", "tokens": 0, "error": f"HTTP {e.code}"}
except URLError as e:
return {"text": "", "method": "error", "tokens": 0, "error": f"URL error: {e.reason}"}
except Exception as e:
return {"text": "", "method": "error", "tokens": 0, "error": str(e)[:100]}
def enrich_articles(articles, min_score=DEFAULT_MIN_SCORE, max_articles=DEFAULT_MAX_ARTICLES, max_chars=DEFAULT_MAX_CHARS):
# Eligible: high-score articles OR RSS blog articles (lower threshold for blogs)
blog_domains = {
"simonwillison.net", "overreacted.io", "eli.thegreenplace.net",
"matklad.github.io", "lucumr.pocoo.org", "devblogs.microsoft.com",
"rachelbythebay.com", "xeiaso.net", "pluralistic.net", "lcamtuf.substack.com",
"hillelwayne.com", "dynomight.net", "geoffreylitt.com", "fabiensanglard.net",
"blog.cloudflare.com", "antirez.com", "paulgraham.com", "danluu.com",
"latent.space", "www.latent.space",
}
eligible = []
for a in articles:
if a.get("full_text") or not a.get("link"):
continue
score = a.get("quality_score", 0)
domain = get_domain(a.get("link", ""))
# Blog articles get lower threshold (score >= 3), others use min_score
if score >= min_score or (domain in blog_domains and score >= 3):
eligible.append(a)
seen_urls = {}
unique = []
for a in eligible:
url = a["link"]
if url not in seen_urls:
seen_urls[url] = a
unique.append(a)
unique.sort(key=lambda x: -x.get("quality_score", 0))
to_fetch = unique[:max_articles]
if not to_fetch:
logging.info("No articles eligible for enrichment")
return 0, 0, 0
logging.info(f"Enriching {len(to_fetch)} articles (min_score={min_score})")
attempted = success = cf_count = 0
results = {}
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
futures = {pool.submit(fetch_full_text, a["link"], max_chars): a["link"] for a in to_fetch}
for future in as_completed(futures):
url = futures[future]
attempted += 1
result = future.result()
results[url] = result
if result["text"]:
success += 1
if result["method"] == "cf-markdown":
cf_count += 1
logging.debug(f" β
[{result['method']}] {url[:60]}... ({result['tokens']} tokens)")
else:
logging.debug(f" βοΈ [{result['method']}] {url[:60]}... ({result.get('error', '')})")
for a in articles:
url = a.get("link", "")
if url in results and results[url]["text"]:
r = results[url]
a["full_text"] = r["text"]
a["full_text_method"] = r["method"]
a["full_text_tokens"] = r["tokens"]
logging.info(f"Enrichment: {success}/{attempted} enriched ({cf_count} via CF Markdown)")
return attempted, success, cf_count
def main():
parser = argparse.ArgumentParser(description="Enrich articles with full text")
parser.add_argument("--input", "-i", type=Path, required=True, help="Input merged JSON")
parser.add_argument("--output", "-o", type=Path, help="Output enriched JSON (default: overwrite input)")
parser.add_argument("--min-score", type=int, default=DEFAULT_MIN_SCORE)
parser.add_argument("--max-articles", type=int, default=DEFAULT_MAX_ARTICLES)
parser.add_argument("--max-chars", type=int, default=DEFAULT_MAX_CHARS)
parser.add_argument("--verbose", "-v", action="store_true")
parser.add_argument("--force", action="store_true", help="Ignored (pipeline compat)")
args = parser.parse_args()
setup_logging(args.verbose)
if not args.input.exists():
logging.error(f"Input file not found: {args.input}")
return 1
output_path = args.output or args.input
try:
with open(args.input, "r", encoding="utf-8") as f:
data = json.load(f)
all_articles = []
topics = data.get("topics", {})
if isinstance(topics, dict):
for topic_data in topics.values():
if isinstance(topic_data, dict):
all_articles.extend(topic_data.get("articles", []))
elif isinstance(topic_data, list):
all_articles.extend(topic_data)
t0 = time.time()
attempted, success, cf_count = enrich_articles(all_articles, args.min_score, args.max_articles, args.max_chars)
elapsed = time.time() - t0
data["enrichment"] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"attempted": attempted, "success": success, "cf_markdown": cf_count,
"elapsed_s": round(elapsed, 1), "min_score": args.min_score, "max_chars": args.max_chars,
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logging.info(f"β
Done: {success}/{attempted} enriched in {elapsed:.1f}s β {output_path}")
return 0
except Exception as e:
logging.error(f"π₯ Enrichment failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/fetch-reddit.py
```python
#!/usr/bin/env python3
"""
Fetch Reddit posts from unified sources configuration.
Reads sources.json, filters Reddit sources, fetches posts via Reddit JSON API,
and outputs structured JSON with posts tagged by topics.
Usage:
python3 fetch-reddit.py [--defaults DEFAULTS_DIR] [--config CONFIG_DIR] [--hours 48] [--output FILE] [--verbose] [--force] [--no-cache]
Environment:
No API key required. Uses Reddit's public JSON API.
"""
import json
import sys
import os
import argparse
import logging
import ssl
import time
import tempfile
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Dict, Any, Optional
from urllib.request import Request, urlopen
_SSL_CTX = ssl.create_default_context()
from urllib.error import HTTPError, URLError
# Constants
MAX_WORKERS = 4
TIMEOUT = 30
RETRY_COUNT = 2
RETRY_DELAY = 3
USER_AGENT = "MediaDigest/1.8 (bot; +https://github.com/draco-agent/media-news-digest)"
RESUME_MAX_AGE_SECONDS = 3600 # 1 hour
def setup_logging(verbose: bool = False) -> logging.Logger:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%H:%M:%S'
)
return logging.getLogger(__name__)
def load_reddit_sources(defaults_dir: Optional[Path], config_dir: Optional[Path]) -> List[Dict[str, Any]]:
"""Load Reddit sources from config, with user overrides."""
sys.path.insert(0, str(Path(__file__).parent))
from config_loader import load_merged_sources as load_sources
all_sources = load_sources(defaults_dir, config_dir)
reddit_sources = []
for s in all_sources:
if s.get('type') != 'reddit':
continue
if not s.get('enabled', True):
continue
if not s.get('subreddit'):
logging.warning(f"Reddit source {s.get('id')} missing subreddit, skipping")
continue
reddit_sources.append(s)
return reddit_sources
def fetch_subreddit(source: Dict[str, Any], cutoff: datetime) -> Dict[str, Any]:
"""Fetch posts from a subreddit using Reddit's JSON API."""
source_id = source['id']
subreddit = source['subreddit']
sort = source.get('sort', 'hot')
limit = source.get('limit', 25)
min_score = source.get('min_score', 0)
priority = source.get('priority', False)
topics = source.get('topics', [])
name = source.get('name', f'r/{subreddit}')
url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={limit}&raw_json=1"
for attempt in range(RETRY_COUNT + 1):
try:
req = Request(url, headers={
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/json',
'Accept-Language': 'en-US,en;q=0.9',
})
with urlopen(req, timeout=TIMEOUT, context=_SSL_CTX) as resp:
data = json.loads(resp.read().decode('utf-8'))
articles = []
children = data.get('data', {}).get('children', [])
for child in children:
post = child.get('data', {})
if not post:
continue
# Parse timestamp
created_utc = post.get('created_utc', 0)
post_time = datetime.fromtimestamp(created_utc, tz=timezone.utc)
# Filter by time
if post_time < cutoff:
continue
# Filter by score
score = post.get('score', 0)
if score < min_score:
continue
# Skip stickied/pinned posts
if post.get('stickied', False):
continue
# Get the external URL (if it's a link post) vs self post
permalink = f"https://www.reddit.com{post.get('permalink', '')}"
external_url = post.get('url', '')
is_self = post.get('is_self', True)
# If it's a self post or URL points to reddit, use permalink
if is_self or 'reddit.com' in external_url or 'redd.it' in external_url:
link = permalink
external_url = None
else:
link = external_url
title = post.get('title', '').strip()
if not title:
continue
flair = post.get('link_flair_text', '')
num_comments = post.get('num_comments', 0)
upvote_ratio = post.get('upvote_ratio', 0)
articles.append({
"title": title,
"link": link,
"reddit_url": permalink,
"external_url": external_url,
"date": post_time.isoformat(),
"score": score,
"num_comments": num_comments,
"flair": flair,
"is_self": is_self,
"topics": topics[:],
"metrics": {
"score": score,
"num_comments": num_comments,
"upvote_ratio": upvote_ratio
}
})
return {
"source_id": source_id,
"source_type": "reddit",
"name": name,
"subreddit": subreddit,
"sort": sort,
"priority": priority,
"topics": topics,
"status": "ok",
"attempts": attempt + 1,
"count": len(articles),
"articles": articles,
}
except HTTPError as e:
if e.code == 429:
logging.warning(f"Rate limit for r/{subreddit}, attempt {attempt + 1}")
if attempt < RETRY_COUNT:
time.sleep(10)
continue
elif e.code == 403:
logging.warning(f"r/{subreddit} is private or quarantined")
return {
"source_id": source_id,
"source_type": "reddit",
"name": name,
"subreddit": subreddit,
"status": "error",
"error": f"HTTP {e.code}: Forbidden",
"count": 0,
"articles": [],
}
error_msg = f"HTTP {e.code}"
logging.warning(f"Error fetching r/{subreddit}: {error_msg}")
except (URLError, OSError) as e:
error_msg = str(e)
logging.warning(f"Network error for r/{subreddit}: {error_msg}")
except Exception as e:
error_msg = str(e)
logging.error(f"Unexpected error for r/{subreddit}: {error_msg}")
if attempt < RETRY_COUNT:
time.sleep(RETRY_DELAY)
return {
"source_id": source_id,
"source_type": "reddit",
"name": name,
"subreddit": subreddit,
"status": "error",
"error": error_msg,
"count": 0,
"articles": [],
}
def main() -> int:
parser = argparse.ArgumentParser(
description="Fetch Reddit posts from configured subreddits.\n"
"Uses Reddit's public JSON API (no authentication required).",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python3 fetch-reddit.py --defaults config/defaults --output /tmp/md-reddit.json --verbose
python3 fetch-reddit.py --defaults config/defaults --config ~/workspace/config --hours 48
"""
)
parser.add_argument('--defaults', type=Path, default=Path('config/defaults'),
help='Default config directory')
parser.add_argument('--config', type=Path, default=None,
help='User config directory (overrides defaults)')
parser.add_argument('--hours', type=int, default=48,
help='How many hours back to fetch (default: 48)')
parser.add_argument('--output', type=Path, default=None,
help='Output JSON file path')
parser.add_argument('--verbose', action='store_true',
help='Enable debug logging')
parser.add_argument('--force', action='store_true',
help='Force fetch even if cached output exists')
parser.add_argument('--no-cache', action='store_true',
help='Disable all caching')
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Auto-generate output path if not specified
if not args.output:
fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-reddit-", suffix=".json")
os.close(fd)
args.output = Path(temp_path)
# Resume support
if not args.force and args.output.exists():
try:
age = time.time() - args.output.stat().st_mtime
if age < RESUME_MAX_AGE_SECONDS:
with open(args.output) as f:
existing = json.load(f)
if existing.get('subreddits'):
logger.info(f"βοΈ Skipping fetch: {args.output} is {age:.0f}s old (< {RESUME_MAX_AGE_SECONDS}s). Use --force to override.")
print(f"Output (cached): {args.output}")
return 0
except (json.JSONDecodeError, KeyError):
pass
try:
cutoff = datetime.now(timezone.utc) - timedelta(hours=args.hours)
# Load sources
if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
sources = load_reddit_sources(args.config, None)
else:
sources = load_reddit_sources(args.defaults, args.config)
if not sources:
logger.warning("No Reddit sources found or all disabled")
output = {
"source": "reddit",
"fetched_at": datetime.now(timezone.utc).isoformat(),
"subreddits": [],
"skipped_reason": "No Reddit sources configured"
}
with open(args.output, "w") as f:
json.dump(output, f, indent=2)
print(f"Output (empty): {args.output}")
return 0
logger.info(f"π‘ Fetching {len(sources)} subreddits (cutoff: {cutoff.strftime('%Y-%m-%d %H:%M')} UTC)")
results = []
total_posts = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
futures = {pool.submit(fetch_subreddit, source, cutoff): source for source in sources}
for future in as_completed(futures):
result = future.result()
results.append(result)
total_posts += result.get('count', 0)
ok_count = sum(1 for r in results if r['status'] == 'ok')
output = {
"source": "reddit",
"fetched_at": datetime.now(timezone.utc).isoformat(),
"defaults_dir": str(args.defaults),
"config_dir": str(args.config) if args.config else None,
"hours": args.hours,
"cutoff": cutoff.isoformat(),
"subreddits_total": len(results),
"subreddits_ok": ok_count,
"total_posts": total_posts,
"subreddits": results
}
json_str = json.dumps(output, ensure_ascii=False, indent=2)
with open(args.output, "w", encoding='utf-8') as f:
f.write(json_str)
logger.info(f"β
Fetched {ok_count}/{len(results)} subreddits, {total_posts} posts")
print(f"Output: {args.output}")
return 0
except Exception as e:
logger.error(f"π₯ Reddit fetch failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/fetch-rss.py
```python
#!/usr/bin/env python3
"""
Fetch RSS feeds from unified sources configuration.
Reads sources.json, filters RSS sources, fetches feeds in parallel with retry
mechanism, and outputs structured JSON with articles tagged by topics.
Usage:
python3 fetch-rss.py [--config CONFIG_DIR] [--hours 48] [--output FILE] [--verbose]
"""
import json
import re
import sys
import os
import argparse
import logging
import time
import tempfile
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen, Request
from urllib.error import URLError
from urllib.parse import urljoin
from pathlib import Path
from typing import Dict, List, Any, Optional
# Try to import feedparser, fall back to regex parsing
try:
import feedparser
HAS_FEEDPARSER = True
except ImportError:
HAS_FEEDPARSER = False
logging.warning("feedparser not installed β using basic XML regex parser (may miss some feeds). Install with: pip install feedparser")
TIMEOUT = 30
MAX_WORKERS = 10
MAX_ARTICLES_PER_FEED = 20
RETRY_COUNT = 1
RETRY_DELAY = 2.0 # seconds
RSS_CACHE_PATH = "/tmp/media-news-digest-rss-cache.json"
RSS_CACHE_TTL_HOURS = 24
def setup_logging(verbose: bool) -> logging.Logger:
"""Setup logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger(__name__)
def parse_date_regex(s: str) -> Optional[datetime]:
"""Parse date string using regex patterns (fallback method)."""
if not s:
return None
s = s.strip()
# Common date formats
formats = [
"%a, %d %b %Y %H:%M:%S %z",
"%a, %d %b %Y %H:%M:%S %Z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d",
]
for fmt in formats:
try:
dt = datetime.strptime(s, fmt)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
continue
# ISO fallback
try:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
return dt
except (ValueError, AttributeError):
pass
return None
def extract_cdata(text: str) -> str:
"""Extract content from CDATA sections."""
m = re.search(r"<!\[CDATA\[(.*?)\]\]>", text, re.DOTALL)
return m.group(1) if m else text
def strip_tags(html: str) -> str:
"""Remove HTML tags from text."""
return re.sub(r"<[^>]+>", "", html).strip()
def get_tag(xml: str, tag: str) -> str:
"""Extract content from XML tag using regex."""
m = re.search(rf"<{tag}[^>]*>(.*?)</{tag}>", xml, re.DOTALL | re.IGNORECASE)
return extract_cdata(m.group(1)).strip() if m else ""
def validate_article_domain(article_link: str, source: Dict[str, Any]) -> bool:
"""Validate that article links from mirror sources point to expected domains.
Sources with 'expected_domains' field will have their article links checked.
Returns True if valid or if no domain restriction is set.
"""
expected = source.get("expected_domains")
if not expected:
return True
if not article_link:
return False
from urllib.parse import urlparse
domain = urlparse(article_link).hostname or ""
return any(domain == d or domain.endswith("." + d) for d in expected)
def resolve_link(link: str, base_url: str) -> str:
"""Resolve relative links against the feed URL. Rejects non-HTTP(S) schemes."""
if not link:
return link
if link.startswith(("http://", "https://")):
return link
resolved = urljoin(base_url, link)
if not resolved.startswith(("http://", "https://")):
return "" # reject javascript:, data:, etc.
return resolved
def parse_feed_feedparser(content: str, cutoff: datetime, feed_url: str) -> List[Dict[str, Any]]:
"""Parse feed using feedparser library."""
articles = []
try:
feed = feedparser.parse(content)
for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
title = entry.get('title', '').strip()
link = entry.get('link', '').strip()
# Try multiple date fields
pub_date = None
for date_field in ['published_parsed', 'updated_parsed']:
if hasattr(entry, date_field) and getattr(entry, date_field):
try:
pub_date = datetime(*getattr(entry, date_field)[:6], tzinfo=timezone.utc)
break
except (TypeError, ValueError):
continue
# Fallback to string parsing
if pub_date is None:
for date_field in ['published', 'updated']:
if hasattr(entry, date_field) and getattr(entry, date_field):
pub_date = parse_date_regex(getattr(entry, date_field))
if pub_date:
break
if title and link and pub_date and pub_date >= cutoff:
articles.append({
"title": title[:200],
"link": resolve_link(link, feed_url),
"date": pub_date.isoformat(),
})
except Exception as e:
logging.debug(f"feedparser parsing failed: {e}")
return articles
def parse_feed_regex(content: str, cutoff: datetime, feed_url: str) -> List[Dict[str, Any]]:
"""Parse feed using regex patterns (fallback method)."""
articles = []
# RSS 2.0 items
for item in re.finditer(r"<item[^>]*>(.*?)</item>", content, re.DOTALL):
block = item.group(1)
title = strip_tags(get_tag(block, "title"))
link = resolve_link(get_tag(block, "link"), feed_url)
date_str = get_tag(block, "pubDate") or get_tag(block, "dc:date")
pub = parse_date_regex(date_str)
if title and link and pub and pub >= cutoff:
articles.append({
"title": title[:200],
"link": link,
"date": pub.isoformat(),
})
# Atom entries fallback
if not articles:
for entry in re.finditer(r"<entry[^>]*>(.*?)</entry>", content, re.DOTALL):
block = entry.group(1)
title = strip_tags(get_tag(block, "title"))
link_m = re.search(r'<link[^>]*href=["\']([^"\']+)["\']', block)
if not link_m:
link = get_tag(block, "link")
else:
link = link_m.group(1)
link = resolve_link(link, feed_url)
date_str = get_tag(block, "updated") or get_tag(block, "published")
pub = parse_date_regex(date_str)
if title and link and pub and pub >= cutoff:
articles.append({
"title": title[:200],
"link": link,
"date": pub.isoformat(),
})
return articles[:MAX_ARTICLES_PER_FEED]
def parse_feed(content: str, cutoff: datetime, feed_url: str) -> List[Dict[str, Any]]:
"""Parse feed using best available method."""
if HAS_FEEDPARSER:
articles = parse_feed_feedparser(content, cutoff, feed_url)
if articles:
return articles
logging.debug("feedparser returned no articles, trying regex fallback")
return parse_feed_regex(content, cutoff, feed_url)
def _load_rss_cache() -> Dict[str, Any]:
"""Load RSS ETag/Last-Modified cache."""
try:
with open(RSS_CACHE_PATH, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_rss_cache(cache: Dict[str, Any]) -> None:
"""Save RSS ETag/Last-Modified cache."""
try:
with open(RSS_CACHE_PATH, 'w') as f:
json.dump(cache, f)
except Exception as e:
logging.warning(f"Failed to save RSS cache: {e}")
# Module-level cache, loaded once per run
_rss_cache: Optional[Dict[str, Any]] = None
_rss_cache_dirty = False
def _get_rss_cache(no_cache: bool = False) -> Dict[str, Any]:
global _rss_cache
if _rss_cache is None:
_rss_cache = {} if no_cache else _load_rss_cache()
return _rss_cache
def _flush_rss_cache() -> None:
global _rss_cache_dirty
if _rss_cache_dirty and _rss_cache is not None:
_save_rss_cache(_rss_cache)
_rss_cache_dirty = False
def fetch_feed_with_retry(source: Dict[str, Any], cutoff: datetime, no_cache: bool = False) -> Dict[str, Any]:
"""Fetch RSS feed with retry mechanism and conditional requests."""
source_id = source["id"]
name = source["name"]
url = source["url"]
priority = source["priority"]
topics = source["topics"]
for attempt in range(RETRY_COUNT + 1):
try:
global _rss_cache_dirty
req_headers = {"User-Agent": "MediaDigest/2.0"}
# Add conditional headers from cache
cache = _get_rss_cache(no_cache)
cache_entry = cache.get(url)
now = time.time()
ttl_seconds = RSS_CACHE_TTL_HOURS * 3600
if cache_entry and not no_cache and (now - cache_entry.get("ts", 0)) < ttl_seconds:
if cache_entry.get("etag"):
req_headers["If-None-Match"] = cache_entry["etag"]
if cache_entry.get("last_modified"):
req_headers["If-Modified-Since"] = cache_entry["last_modified"]
req = Request(url, headers=req_headers)
try:
with urlopen(req, timeout=TIMEOUT) as resp:
# Update cache with response headers
etag = resp.headers.get("ETag")
last_mod = resp.headers.get("Last-Modified")
if etag or last_mod:
cache[url] = {"etag": etag, "last_modified": last_mod, "ts": now}
_rss_cache_dirty = True
final_url = resp.url if hasattr(resp, 'url') else url
content = resp.read().decode("utf-8", errors="replace")
except URLError as e:
if hasattr(e, 'code') and e.code == 304:
logging.info(f"β {name}: not modified (304)")
return {
"source_id": source_id,
"source_type": "rss",
"name": name,
"url": url,
"priority": priority,
"topics": topics,
"status": "ok",
"attempts": attempt + 1,
"not_modified": True,
"count": 0,
"articles": [],
}
raise
articles = parse_feed(content, cutoff, final_url)
# Tag articles with topics and validate domains
validated_articles = []
for article in articles:
article["topics"] = topics[:]
if validate_article_domain(article.get("link", ""), source):
validated_articles.append(article)
else:
logging.warning(f"β οΈ {name}: rejected article with unexpected domain: {article.get('link', '')}")
articles = validated_articles
return {
"source_id": source_id,
"source_type": "rss",
"name": name,
"url": url,
"priority": priority,
"topics": topics,
"status": "ok",
"attempts": attempt + 1,
"count": len(articles),
"articles": articles,
}
except Exception as e:
error_msg = str(e)[:100]
logging.debug(f"Attempt {attempt + 1} failed for {name}: {error_msg}")
if attempt < RETRY_COUNT:
time.sleep(RETRY_DELAY * (2 ** attempt)) # Exponential backoff
continue
else:
return {
"source_id": source_id,
"source_type": "rss",
"name": name,
"url": url,
"priority": priority,
"topics": topics,
"status": "error",
"attempts": attempt + 1,
"error": error_msg,
"count": 0,
"articles": [],
}
def load_sources(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
"""Load RSS sources from unified configuration with overlay support."""
try:
from config_loader import load_merged_sources
except ImportError:
# Fallback for relative import
import sys
sys.path.append(str(Path(__file__).parent))
from config_loader import load_merged_sources
# Load merged sources from defaults + optional user overlay
all_sources = load_merged_sources(defaults_dir, config_dir)
# Filter RSS sources that are enabled
rss_sources = []
for source in all_sources:
if source.get("type") == "rss" and source.get("enabled", True):
rss_sources.append(source)
logging.info(f"Loaded {len(rss_sources)} enabled RSS sources")
return rss_sources
def main():
"""Main RSS fetching function."""
parser = argparse.ArgumentParser(
description="Parallel RSS/Atom feed fetcher for media-news-digest. "
"Fetches enabled RSS sources from unified configuration, "
"filters by time window, and outputs structured article data.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 fetch-rss.py
python3 fetch-rss.py --defaults config/defaults --config workspace/config --hours 48 -o results.json
python3 fetch-rss.py --config workspace/config --verbose # backward compatibility
"""
)
parser.add_argument(
"--defaults",
type=Path,
default=Path("config/defaults"),
help="Default configuration directory with skill defaults (default: config/defaults)"
)
parser.add_argument(
"--config",
type=Path,
help="User configuration directory for overlays (optional)"
)
parser.add_argument(
"--hours",
type=int,
default=48,
help="Time window in hours for articles (default: 48)"
)
parser.add_argument(
"--output", "-o",
type=Path,
help="Output JSON path (default: auto-generated temp file)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Bypass ETag/Last-Modified conditional request cache"
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-fetch even if cached output exists"
)
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Resume support: skip if output exists, is valid JSON, and < 1 hour old
if args.output and args.output.exists() and not args.force:
try:
age_seconds = time.time() - args.output.stat().st_mtime
if age_seconds < 3600:
with open(args.output, 'r') as f:
json.load(f) # validate JSON
logger.info(f"Skipping (cached output exists): {args.output}")
return 0
except (json.JSONDecodeError, OSError):
pass
# Auto-generate unique output path if not specified
if not args.output:
fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-rss-", suffix=".json")
os.close(fd)
args.output = Path(temp_path)
try:
cutoff = datetime.now(timezone.utc) - timedelta(hours=args.hours)
# Backward compatibility: if only --config provided, use old behavior
if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
logger.debug("Backward compatibility mode: using --config as sole source")
sources = load_sources(args.config, None)
else:
sources = load_sources(args.defaults, args.config)
if not sources:
logger.warning("No RSS sources found or all disabled")
logger.info(f"Fetching {len(sources)} RSS feeds (window: {args.hours}h)")
# Check feedparser availability
if HAS_FEEDPARSER:
logger.debug("Using feedparser library for parsing")
else:
logger.info("feedparser not available, using regex parsing")
# Initialize cache
_get_rss_cache(no_cache=args.no_cache)
results = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
futures = {pool.submit(fetch_feed_with_retry, source, cutoff, args.no_cache): source
for source in sources}
for future in as_completed(futures):
result = future.result()
results.append(result)
if result["status"] == "ok":
logger.debug(f"β
{result['name']}: {result['count']} articles")
else:
logger.debug(f"β {result['name']}: {result['error']}")
# Flush conditional request cache
_flush_rss_cache()
# Sort: priority first, then by article count
results.sort(key=lambda x: (not x.get("priority", False), -x.get("count", 0)))
ok_count = sum(1 for r in results if r["status"] == "ok")
total_articles = sum(r.get("count", 0) for r in results)
output = {
"generated": datetime.now(timezone.utc).isoformat(),
"source_type": "rss",
"defaults_dir": str(args.defaults),
"config_dir": str(args.config) if args.config else None,
"hours": args.hours,
"feedparser_available": HAS_FEEDPARSER,
"sources_total": len(results),
"sources_ok": ok_count,
"total_articles": total_articles,
"sources": results,
}
# Write output
json_str = json.dumps(output, ensure_ascii=False, indent=2)
with open(args.output, "w", encoding='utf-8') as f:
f.write(json_str)
logger.info(f"β
Done: {ok_count}/{len(results)} feeds ok, "
f"{total_articles} articles β {args.output}")
return 0
except Exception as e:
logger.error(f"π₯ RSS fetch failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/fetch-twitter.py
```python
#!/usr/bin/env python3
"""
Fetch Twitter/X posts from KOL accounts using X API.
Reads sources.json, filters Twitter sources, fetches recent posts using
either the official X API v2 or twitterapi.io, and outputs structured JSON.
Usage:
python3 fetch-twitter.py [--config CONFIG_DIR] [--hours 48] [--output FILE] [--verbose]
python3 fetch-twitter.py --backend twitterapiio # force twitterapi.io backend
Environment:
TWITTER_API_BACKEND - Backend selection: "official", "twitterapiio", or "auto" (default: auto)
X_BEARER_TOKEN - Twitter/X API bearer token (for official backend)
TWITTERAPI_IO_KEY - twitterapi.io API key (for twitterapiio backend)
"""
import json
import sys
import os
import argparse
import logging
import time
import tempfile
import re
import threading
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen, Request
from urllib.error import HTTPError
from urllib.parse import urlencode, quote
from pathlib import Path
from typing import Dict, List, Any, Optional
TIMEOUT = 30
MAX_WORKERS = 5 # Lower for API rate limits
RETRY_COUNT = 2
RETRY_DELAY = 2.0
MAX_TWEETS_PER_USER = 20
ID_CACHE_PATH = "/tmp/media-news-digest-twitter-id-cache.json"
ID_CACHE_TTL_DAYS = 7
# Twitter API v2 endpoints
OFFICIAL_API_BASE = "https://api.x.com/2"
USER_LOOKUP_ENDPOINT = f"{OFFICIAL_API_BASE}/users/by"
# twitterapi.io endpoints
TWITTERAPIIO_BASE = "https://api.twitterapi.io"
def setup_logging(verbose: bool) -> logging.Logger:
"""Setup logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger(__name__)
def clean_tweet_text(text: str) -> str:
"""Clean tweet text for better display."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Truncate if too long
if len(text) > 280:
text = text[:277] + "..."
return text
# ---------------------------------------------------------------------------
# Rate limiting
# ---------------------------------------------------------------------------
class RateLimiter:
"""Simple token-bucket rate limiter."""
def __init__(self, qps: float):
self._lock = threading.Lock()
self._min_interval = 1.0 / qps
self._last = 0.0
def wait(self):
with self._lock:
now = time.monotonic()
wait_time = self._min_interval - (now - self._last)
if wait_time > 0:
time.sleep(wait_time)
self._last = time.monotonic()
# ---------------------------------------------------------------------------
# Backend abstraction
# ---------------------------------------------------------------------------
class TwitterBackend(ABC):
"""Base class for Twitter API backends."""
@staticmethod
def _make_result(source, articles, attempt):
return {
"source_id": source["id"],
"source_type": "twitter",
"name": source["name"],
"handle": source["handle"].lstrip('@'),
"priority": source["priority"],
"topics": source["topics"],
"status": "ok",
"attempts": attempt + 1,
"count": len(articles),
"articles": articles,
}
@staticmethod
def _make_error(source, error_msg, attempt):
return {
"source_id": source["id"],
"source_type": "twitter",
"name": source["name"],
"handle": source["handle"].lstrip('@'),
"priority": source["priority"],
"topics": source["topics"],
"status": "error",
"attempts": attempt + 1,
"error": error_msg,
"count": 0,
"articles": [],
}
@abstractmethod
def fetch_all(self, sources: List[Dict[str, Any]], cutoff: datetime) -> List[Dict[str, Any]]:
"""Fetch tweets for all sources. Returns list of source result dicts."""
class OfficialBackend(TwitterBackend):
"""Official X API v2 backend (existing logic)."""
def __init__(self, bearer_token: str, no_cache: bool = False):
self.bearer_token = bearer_token
self.no_cache = no_cache
# -- ID cache helpers --
@staticmethod
def _load_id_cache() -> Dict[str, Any]:
try:
with open(ID_CACHE_PATH, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
@staticmethod
def _save_id_cache(cache: Dict[str, Any]) -> None:
try:
with open(ID_CACHE_PATH, 'w') as f:
json.dump(cache, f)
except Exception as e:
logging.warning(f"Failed to save ID cache: {e}")
def _batch_resolve_user_ids(self, handles: List[str]) -> Dict[str, str]:
now = time.time()
cache = {} if self.no_cache else self._load_id_cache()
ttl_seconds = ID_CACHE_TTL_DAYS * 86400
result: Dict[str, str] = {}
to_resolve: List[str] = []
for handle in handles:
key = handle.lower()
entry = cache.get(key)
if entry and (now - entry.get("ts", 0)) < ttl_seconds:
result[key] = entry["id"]
else:
to_resolve.append(handle)
if to_resolve:
logging.info(f"Batch resolving {len(to_resolve)} usernames (cached: {len(result)})")
headers = {
"Authorization": f"Bearer {self.bearer_token}",
"User-Agent": "MediaDigest/2.0"
}
for i in range(0, len(to_resolve), 100):
batch = to_resolve[i:i+100]
url = f"{USER_LOOKUP_ENDPOINT}?{urlencode({'usernames': ','.join(batch)})}"
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=TIMEOUT) as resp:
data = json.loads(resp.read().decode())
if 'data' in data:
for user in data['data']:
key = user['username'].lower()
result[key] = user['id']
cache[key] = {"id": user['id'], "ts": now}
if 'errors' in data:
for err in data['errors']:
logging.warning(f"User lookup error: {err.get('detail', err)}")
except Exception as e:
logging.error(f"Batch user lookup failed: {e}")
for handle in batch:
try:
fallback_url = f"{USER_LOOKUP_ENDPOINT}?{urlencode({'usernames': handle})}"
req = Request(fallback_url, headers=headers)
with urlopen(req, timeout=TIMEOUT) as resp:
fallback_data = json.loads(resp.read().decode())
if 'data' in fallback_data and fallback_data['data']:
key = handle.lower()
result[key] = fallback_data['data'][0]['id']
cache[key] = {"id": result[key], "ts": now}
except Exception as e2:
logging.warning(f"Individual lookup failed for @{handle}: {e2}")
if not self.no_cache:
self._save_id_cache(cache)
else:
logging.info(f"All {len(result)} usernames resolved from cache")
return result
@staticmethod
def _parse_date(date_str: str) -> Optional[datetime]:
try:
if date_str.endswith('Z'):
date_str = date_str[:-1] + '+00:00'
return datetime.fromisoformat(date_str)
except (ValueError, TypeError):
logging.debug(f"Failed to parse Twitter date: {date_str}")
return None
def _fetch_user_tweets(self, source: Dict[str, Any], cutoff: datetime,
user_id: Optional[str] = None) -> Dict[str, Any]:
handle = source["handle"].lstrip('@')
topics = source["topics"]
for attempt in range(RETRY_COUNT + 1):
try:
params = {
"max_results": min(MAX_TWEETS_PER_USER, 100),
"tweet.fields": "created_at,public_metrics,context_annotations,referenced_tweets",
"expansions": "author_id",
"user.fields": "verified,public_metrics"
}
if not user_id:
user_url = f"{USER_LOOKUP_ENDPOINT}?{urlencode({'usernames': handle})}"
headers = {
"Authorization": f"Bearer {self.bearer_token}",
"User-Agent": "MediaDigest/2.0"
}
req = Request(user_url, headers=headers)
with urlopen(req, timeout=TIMEOUT) as resp:
user_data = json.loads(resp.read().decode())
if 'data' not in user_data or not user_data['data']:
raise ValueError(f"User not found: {handle}")
user_id = user_data['data'][0]['id']
headers = {
"Authorization": f"Bearer {self.bearer_token}",
"User-Agent": "MediaDigest/2.0"
}
time.sleep(0.3)
tweets_url = f"{OFFICIAL_API_BASE}/users/{user_id}/tweets?{urlencode(params)}"
req = Request(tweets_url, headers=headers)
with urlopen(req, timeout=TIMEOUT) as resp:
tweets_data = json.loads(resp.read().decode())
articles = []
if 'data' in tweets_data:
for tweet in tweets_data['data']:
created_at = self._parse_date(tweet.get('created_at', ''))
if not created_at or created_at < cutoff:
continue
text = tweet.get('text', '')
if text.startswith('RT @'):
continue
referenced = tweet.get('referenced_tweets', [])
if any(ref.get('type') == 'replied_to' for ref in referenced):
continue
articles.append({
"title": clean_tweet_text(text),
"link": f"https://twitter.com/{handle}/status/{tweet['id']}",
"date": created_at.isoformat(),
"topics": topics[:],
"metrics": tweet.get('public_metrics', {}),
"tweet_id": tweet['id']
})
return self._make_result(source, articles, attempt)
except HTTPError as e:
if e.code == 429:
error_msg = "Rate limit exceeded"
logging.warning(f"Rate limit hit for @{handle}, attempt {attempt + 1}")
if attempt < RETRY_COUNT:
time.sleep(60)
continue
else:
error_msg = f"HTTP {e.code}: {e.reason}"
except Exception as e:
error_msg = str(e)[:100]
logging.debug(f"Attempt {attempt + 1} failed for @{handle}: {error_msg}")
if attempt < RETRY_COUNT:
time.sleep(RETRY_DELAY * (2 ** attempt))
continue
return self._make_error(source, error_msg, attempt)
def fetch_all(self, sources: List[Dict[str, Any]], cutoff: datetime) -> List[Dict[str, Any]]:
all_handles = [s["handle"].lstrip('@') for s in sources]
user_id_map = self._batch_resolve_user_ids(all_handles)
results: List[Dict[str, Any]] = []
total = len(sources)
done = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
futures = {}
for source in sources:
handle = source["handle"].lstrip('@')
resolved_id = user_id_map.get(handle.lower())
futures[pool.submit(self._fetch_user_tweets, source, cutoff, resolved_id)] = source
for future in as_completed(futures):
result = future.result()
results.append(result)
done += 1
if result["status"] == "ok":
logging.info(f"[{done}/{total}] β
@{result['handle']}: {result['count']} tweets"
+ (f" (top: {result['articles'][0]['metrics']['like_count']}β€οΈ)" if result.get('articles') else ""))
else:
logging.warning(f"[{done}/{total}] β @{result['handle']}: {result.get('error','unknown')}")
return results
class TwitterApiIoBackend(TwitterBackend):
"""twitterapi.io backend."""
def __init__(self, api_key: str):
self.api_key = api_key
self._limiter = RateLimiter(qps=5)
@staticmethod
def _parse_date(date_str: str) -> Optional[datetime]:
"""Parse twitterapi.io date format: 'Tue Dec 10 07:00:30 +0000 2024'."""
try:
return datetime.strptime(date_str, "%a %b %d %H:%M:%S %z %Y")
except (ValueError, TypeError):
logging.debug(f"Failed to parse twitterapi.io date: {date_str}")
return None
def _parse_tweets_page(self, tweets: list, handle: str, topics: list, cutoff: datetime) -> list:
"""Parse a page of tweets into article dicts."""
articles = []
for tweet in tweets:
# Skip retweets
if tweet.get("retweeted_tweet"):
continue
created_at = self._parse_date(tweet.get("createdAt", ""))
if not created_at or created_at < cutoff:
continue
text = tweet.get("text", "")
if text.startswith("RT @"):
continue
tweet_id = tweet.get("id", "")
link = tweet.get("url") or f"https://twitter.com/{handle}/status/{tweet_id}"
articles.append({
"title": clean_tweet_text(text),
"link": link,
"date": created_at.isoformat(),
"topics": topics[:],
"metrics": {
"like_count": tweet.get("likeCount", 0),
"retweet_count": tweet.get("retweetCount", 0),
"reply_count": tweet.get("replyCount", 0),
"quote_count": tweet.get("quoteCount", 0),
"impression_count": tweet.get("viewCount", 0),
},
"tweet_id": tweet_id,
})
return articles
def _fetch_user_tweets(self, source: Dict[str, Any], cutoff: datetime) -> Dict[str, Any]:
handle = source["handle"].lstrip('@')
topics = source["topics"]
for attempt in range(RETRY_COUNT + 1):
try:
params = urlencode({
"userName": handle,
"includeReplies": "false",
})
url = f"{TWITTERAPIIO_BASE}/twitter/user/last_tweets?{params}"
headers = {
"X-API-Key": self.api_key,
"User-Agent": "MediaDigest/2.0",
}
self._limiter.wait()
req = Request(url, headers=headers)
with urlopen(req, timeout=TIMEOUT) as resp:
raw = json.loads(resp.read().decode())
# API wraps response in {"data": {...}} envelope
data = raw.get("data", raw)
articles = self._parse_tweets_page(
data.get("tweets", []), handle, topics, cutoff
)
# Pagination: fetch one more page if available and all tweets still in window
has_next = data.get("has_next_page", False)
next_cursor = data.get("next_cursor")
if has_next and next_cursor and articles:
oldest = min(a["date"] for a in articles)
if oldest >= cutoff.isoformat():
self._limiter.wait()
page2_params = urlencode({
"userName": handle,
"includeReplies": "false",
"cursor": next_cursor,
})
page2_url = f"{TWITTERAPIIO_BASE}/twitter/user/last_tweets?{page2_params}"
req2 = Request(page2_url, headers=headers)
with urlopen(req2, timeout=TIMEOUT) as resp2:
raw2 = json.loads(resp2.read().decode())
data2 = raw2.get("data", raw2)
articles.extend(self._parse_tweets_page(
data2.get("tweets", []), handle, topics, cutoff
))
has_next = data2.get("has_next_page", False)
# Truncation warning
if has_next and articles:
oldest = min(a["date"] for a in articles)
if oldest >= cutoff.isoformat():
logging.warning(f"@{handle}: results may be truncated ({len(articles)} tweets, more available)")
return self._make_result(source, articles, attempt)
except HTTPError as e:
if e.code == 429:
error_msg = "Rate limit exceeded"
logging.warning(f"Rate limit hit for @{handle}, attempt {attempt + 1}")
if attempt < RETRY_COUNT:
time.sleep(5)
continue
else:
error_msg = f"HTTP {e.code}: {e.reason}"
except Exception as e:
error_msg = str(e)[:100]
logging.debug(f"Attempt {attempt + 1} failed for @{handle}: {error_msg}")
if attempt < RETRY_COUNT:
time.sleep(RETRY_DELAY * (2 ** attempt))
continue
return self._make_error(source, error_msg, attempt)
def fetch_all(self, sources: List[Dict[str, Any]], cutoff: datetime) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
total = len(sources)
done = 0
with ThreadPoolExecutor(max_workers=3) as pool:
futures = {pool.submit(self._fetch_user_tweets, source, cutoff): source
for source in sources}
for future in as_completed(futures):
result = future.result()
results.append(result)
done += 1
if result["status"] == "ok":
logging.info(f"[{done}/{total}] β
@{result['handle']}: {result['count']} tweets"
+ (f" (top: {result['articles'][0]['metrics']['like_count']}β€οΈ)" if result['articles'] else ""))
else:
logging.warning(f"[{done}/{total}] β @{result['handle']}: {result['error']}")
return results
# ---------------------------------------------------------------------------
# Backend selection
# ---------------------------------------------------------------------------
def select_backend(backend_name: str, no_cache: bool = False) -> Optional[TwitterBackend]:
"""Select and instantiate the appropriate backend.
Returns None if no credentials are available for the chosen backend.
"""
if backend_name == "twitterapiio":
key = os.getenv("TWITTERAPI_IO_KEY")
if not key:
logging.error("TWITTERAPI_IO_KEY not set (required for twitterapiio backend)")
return None
logging.info("Using twitterapi.io backend")
return TwitterApiIoBackend(key)
if backend_name == "official":
token = os.getenv("X_BEARER_TOKEN")
if not token:
logging.error("X_BEARER_TOKEN not set (required for official backend)")
return None
logging.info("Using official X API v2 backend")
return OfficialBackend(token, no_cache=no_cache)
# auto: try twitterapiio first, then official
if backend_name == "auto":
key = os.getenv("TWITTERAPI_IO_KEY")
if key:
logging.info("Auto-selected twitterapi.io backend (TWITTERAPI_IO_KEY set)")
return TwitterApiIoBackend(key)
token = os.getenv("X_BEARER_TOKEN")
if token:
logging.info("Auto-selected official X API v2 backend (X_BEARER_TOKEN set)")
return OfficialBackend(token, no_cache=no_cache)
logging.warning("No Twitter API credentials found (checked TWITTERAPI_IO_KEY, X_BEARER_TOKEN)")
return None
logging.error(f"Unknown backend: {backend_name}")
return None
# ---------------------------------------------------------------------------
# Source loading
# ---------------------------------------------------------------------------
def load_twitter_sources(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
"""Load Twitter sources from unified configuration with overlay support."""
try:
from config_loader import load_merged_sources
except ImportError:
# Fallback for relative import
import sys
sys.path.append(str(Path(__file__).parent))
from config_loader import load_merged_sources
# Load merged sources from defaults + optional user overlay
all_sources = load_merged_sources(defaults_dir, config_dir)
# Filter Twitter sources that are enabled
twitter_sources = []
for source in all_sources:
if source.get("type") == "twitter" and source.get("enabled", True):
if not source.get("handle"):
logging.warning(f"Twitter source {source.get('id')} missing handle, skipping")
continue
twitter_sources.append(source)
logging.info(f"Loaded {len(twitter_sources)} enabled Twitter sources")
return twitter_sources
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
"""Main Twitter fetching function."""
parser = argparse.ArgumentParser(
description="Fetch recent tweets from Twitter/X KOL accounts. "
"Supports official X API v2 and twitterapi.io backends.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
export X_BEARER_TOKEN="your_token_here"
python3 fetch-twitter.py
python3 fetch-twitter.py --defaults config/defaults --config workspace/config --hours 24 -o results.json
python3 fetch-twitter.py --backend twitterapiio # use twitterapi.io
python3 fetch-twitter.py --config workspace/config --verbose # backward compatibility
"""
)
parser.add_argument(
"--defaults",
type=Path,
default=Path("config/defaults"),
help="Default configuration directory with skill defaults (default: config/defaults)"
)
parser.add_argument(
"--config",
type=Path,
help="User configuration directory for overlays (optional)"
)
parser.add_argument(
"--hours",
type=int,
default=48,
help="Time window in hours for tweets (default: 48)"
)
parser.add_argument(
"--output", "-o",
type=Path,
help="Output JSON path (default: auto-generated temp file)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Bypass usernameβID cache (official backend only)"
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-fetch even if cached output exists"
)
parser.add_argument(
"--backend",
choices=["official", "twitterapiio", "auto"],
default=None,
help="Twitter API backend (overrides TWITTER_API_BACKEND env var). "
"auto = twitterapiio if TWITTERAPI_IO_KEY set, else official if X_BEARER_TOKEN set"
)
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Resume support: skip if output exists, is valid JSON, and < 1 hour old
if args.output and args.output.exists() and not args.force:
try:
age_seconds = time.time() - args.output.stat().st_mtime
if age_seconds < 3600:
with open(args.output, 'r') as f:
json.load(f)
logger.info(f"Skipping (cached output exists): {args.output}")
return 0
except (json.JSONDecodeError, OSError):
pass
# Resolve backend: CLI arg > env var > default (auto)
backend_name = args.backend or os.getenv("TWITTER_API_BACKEND", "auto")
backend = select_backend(backend_name, no_cache=args.no_cache)
if not backend:
logger.warning("No Twitter backend available. Writing empty result and skipping Twitter fetch.")
empty_result = {
"generated": datetime.now(timezone.utc).isoformat(),
"source_type": "twitter",
"backend": backend_name,
"hours": args.hours,
"sources_total": 0,
"sources_ok": 0,
"total_articles": 0,
"sources": [],
"skipped_reason": f"No credentials for backend '{backend_name}'"
}
output_path = args.output or Path("/tmp/md-twitter.json")
with open(output_path, "w") as f:
json.dump(empty_result, f, indent=2)
print(f"Output (empty): {output_path}")
return 0
# Auto-generate unique output path if not specified
if not args.output:
fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-twitter-", suffix=".json")
os.close(fd)
args.output = Path(temp_path)
try:
cutoff = datetime.now(timezone.utc) - timedelta(hours=args.hours)
# Backward compatibility: if only --config provided, use old behavior
if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
logger.debug("Backward compatibility mode: using --config as sole source")
sources = load_twitter_sources(args.config, None)
else:
sources = load_twitter_sources(args.defaults, args.config)
if not sources:
logger.warning("No Twitter sources found or all disabled")
logger.info(f"Fetching {len(sources)} Twitter accounts (window: {args.hours}h, backend: {backend_name})")
results = backend.fetch_all(sources, cutoff)
# Sort: priority first, then by article count
results.sort(key=lambda x: (not x.get("priority", False), -x.get("count", 0)))
ok_count = sum(1 for r in results if r["status"] == "ok")
total_tweets = sum(r.get("count", 0) for r in results)
output = {
"generated": datetime.now(timezone.utc).isoformat(),
"source_type": "twitter",
"backend": backend_name,
"defaults_dir": str(args.defaults),
"config_dir": str(args.config) if args.config else None,
"hours": args.hours,
"sources_total": len(results),
"sources_ok": ok_count,
"total_articles": total_tweets,
"sources": results,
}
# Write output
json_str = json.dumps(output, ensure_ascii=False, indent=2)
with open(args.output, "w", encoding='utf-8') as f:
f.write(json_str)
logger.info(f"β
Done: {ok_count}/{len(results)} accounts ok, "
f"{total_tweets} tweets β {args.output}")
return 0
except Exception as e:
logger.error(f"π₯ Twitter fetch failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/fetch-web.py
```python
#!/usr/bin/env python3
"""
Fetch web search results for tech digest topics.
Reads topics.json, performs web searches for each topic's search queries,
and outputs structured JSON with search results tagged by topics.
Usage:
python3 fetch-web.py [--config CONFIG_DIR] [--freshness 48h] [--output FILE] [--verbose]
Note: This script can use Brave Search API if BRAVE_API_KEY is set, otherwise
it provides a JSON interface for agents to use web_search tool.
"""
import json
import sys
import os
import argparse
import logging
import time
import tempfile
import re
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from urllib.request import urlopen, Request
from urllib.error import HTTPError
from urllib.parse import urlencode
TIMEOUT = 30
MAX_RESULTS_PER_QUERY = 5
RETRY_COUNT = 1
RETRY_DELAY = 2.0
# Brave Search API
BRAVE_API_BASE = "https://api.search.brave.com/res/v1/web/search"
TAVILY_API_BASE = "https://api.tavily.com/search"
BRAVE_RATE_LIMIT_CACHE = "/tmp/media-news-digest-brave-rate-limit.json"
def setup_logging(verbose: bool) -> logging.Logger:
"""Setup logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger(__name__)
def get_brave_api_keys() -> List[str]:
"""Get Brave Search API keys from environment.
Supports multiple keys via comma-separated BRAVE_API_KEYS (preferred)
or BRAVE_API_KEY (single key fallback):
export BRAVE_API_KEYS="key1,key2,key3"
export BRAVE_API_KEY="key1" # fallback for single key
"""
raw = os.getenv('BRAVE_API_KEYS', '') or os.getenv('BRAVE_API_KEY', '')
if not raw:
return []
return [k.strip() for k in raw.split(',') if k.strip()]
def get_brave_api_key() -> Optional[str]:
"""Get first available Brave API key (legacy compat)."""
keys = get_brave_api_keys()
return keys[0] if keys else None
def _probe_brave_key(api_key: str) -> Dict[str, Any]:
"""Probe a single Brave API key. Returns {qps, workers, exhausted, error}."""
try:
params = urlencode({'q': 'test', 'count': 1})
url = f"{BRAVE_API_BASE}?{params}"
req = Request(url, headers={
'Accept': 'application/json',
'X-Subscription-Token': api_key,
'User-Agent': 'MediaDigest/2.0'
})
with urlopen(req, timeout=TIMEOUT) as resp:
limit_header = resp.headers.get('x-ratelimit-limit', '1')
remaining = resp.headers.get('x-ratelimit-remaining', '')
per_second = int(limit_header.split(',')[0].strip())
resp.read()
exhausted = False
if remaining.isdigit() and int(remaining) == 0:
exhausted = True
workers = min(per_second // 2, 5) if per_second >= 10 else 1
return {'qps': per_second, 'workers': workers, 'exhausted': exhausted, 'error': None}
except HTTPError as e:
if e.code == 429:
return {'qps': 1, 'workers': 1, 'exhausted': True, 'error': '429 rate limited'}
return {'qps': 1, 'workers': 1, 'exhausted': False, 'error': f'HTTP {e.code}'}
except Exception as e:
return {'qps': 1, 'workers': 1, 'exhausted': False, 'error': str(e)}
def select_brave_key_and_limits(keys: List[str]) -> Tuple[Optional[str], int, int]:
"""Select the best available Brave API key and detect rate limits.
Tries each key in order. Skips exhausted keys (cached for 24h).
Returns (api_key, max_qps, max_workers) or (None, 0, 0) if all exhausted.
"""
if not keys:
return None, 0, 0
# Override via env var
brave_plan = os.getenv('BRAVE_PLAN', '').lower()
plan_qps = None
if brave_plan == 'free':
plan_qps, plan_workers = 1, 1
elif brave_plan == 'pro':
plan_qps, plan_workers = 15, 5
# Load cache
cache = {}
try:
with open(BRAVE_RATE_LIMIT_CACHE, 'r') as f:
cache = json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError):
pass
now = time.time()
key_cache = cache.get('keys', {})
for i, key in enumerate(keys):
key_id = f"key_{i}" # Don't log actual keys
cached = key_cache.get(key_id, {})
cache_age = now - cached.get('ts', 0)
# Use cache if fresh (24h)
if cache_age < 86400 and cached.get('exhausted'):
logging.debug(f"Brave {key_id}: exhausted (cached), skipping")
continue
if plan_qps is not None:
logging.info(f"Using Brave {key_id} with BRAVE_PLAN={brave_plan} override: {plan_qps} QPS")
return key, plan_qps, plan_workers
if cache_age < 86400 and 'qps' in cached and not cached.get('exhausted'):
qps = cached['qps']
workers = cached['workers']
logging.info(f"Using Brave {key_id} (cached): {qps} QPS, {workers} workers")
return key, qps, workers
# Probe
result = _probe_brave_key(key)
key_cache[key_id] = {'ts': now, **result}
if result['exhausted']:
logging.warning(f"Brave {key_id}: exhausted ({result.get('error', 'quota reached')}), trying next")
continue
if result['error']:
logging.warning(f"Brave {key_id}: probe error ({result['error']}), trying next")
continue
logging.info(f"Using Brave {key_id}: {result['qps']} QPS, {result['workers']} workers")
# Save cache
try:
cache['keys'] = key_cache
with open(BRAVE_RATE_LIMIT_CACHE, 'w') as f:
json.dump(cache, f)
except OSError:
pass
return key, result['qps'], result['workers']
# All keys exhausted
logging.warning("All Brave API keys exhausted or errored")
# Save cache
try:
cache['keys'] = key_cache
with open(BRAVE_RATE_LIMIT_CACHE, 'w') as f:
json.dump(cache, f)
except OSError:
pass
return None, 0, 0
def detect_brave_rate_limit(api_key: str) -> Tuple[int, int]:
"""Legacy wrapper: detect rate limit for a single key."""
_, qps, workers = select_brave_key_and_limits([api_key])
return max(qps, 1), max(workers, 1)
def search_brave(query: str, api_key: str, freshness: Optional[str] = None) -> Dict[str, Any]:
"""Perform search using Brave Search API."""
params = {
'q': query,
'count': MAX_RESULTS_PER_QUERY,
'search_lang': 'en',
'country': 'ALL',
'safesearch': 'moderate',
'text_decorations': 'false'
}
if freshness:
params['freshness'] = freshness
url = f"{BRAVE_API_BASE}?{urlencode(params)}"
headers = {
'Accept': 'application/json',
'X-Subscription-Token': api_key,
'User-Agent': 'MediaDigest/2.0'
}
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=TIMEOUT) as resp:
raw = resp.read()
# Handle gzip if server sends it anyway
if raw[:2] == b'\x1f\x8b':
import gzip
raw = gzip.decompress(raw)
data = json.loads(raw.decode())
results = []
if 'web' in data and 'results' in data['web']:
for result in data['web']['results']:
results.append({
'title': result.get('title', ''),
'link': result.get('url', ''),
'snippet': result.get('description', ''),
'date': datetime.now(timezone.utc).isoformat() # Search timestamp
})
return {
'status': 'ok',
'query': query,
'results': results,
'total': len(results)
}
except Exception as e:
return {
'status': 'error',
'query': query,
'error': str(e)[:100],
'results': [],
'total': 0
}
def filter_content(text: str, must_include: List[str], exclude: List[str]) -> bool:
"""Check if content matches inclusion/exclusion criteria."""
text_lower = text.lower()
# Check must_include (any match)
if must_include:
has_required = any(keyword.lower() in text_lower for keyword in must_include)
if not has_required:
return False
# Check exclude (any match disqualifies)
if exclude:
has_excluded = any(keyword.lower() in text_lower for keyword in exclude)
if has_excluded:
return False
return True
def search_topic_brave(topic: Dict[str, Any], api_key: str, freshness: Optional[str] = None,
max_workers: int = 1, delay: float = 0.5) -> Dict[str, Any]:
"""Search all queries for a topic using Brave API.
Args:
max_workers: Number of parallel search threads (1 = sequential)
delay: Delay between requests in sequential mode (ignored when parallel)
"""
topic_id = topic["id"]
queries = topic["search"]["queries"]
must_include = topic["search"].get("must_include", [])
exclude = topic["search"].get("exclude", [])
all_results = []
query_stats = []
if max_workers > 1:
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(search_brave, q, api_key, freshness): q for q in queries}
for future in as_completed(futures):
search_result = future.result()
query_stats.append({
'query': search_result['query'],
'status': search_result['status'],
'count': search_result['total']
})
if search_result['status'] == 'ok':
for result in search_result['results']:
combined_text = f"{result['title']} {result['snippet']}"
if filter_content(combined_text, must_include, exclude):
result['topics'] = [topic_id]
all_results.append(result)
else:
for query in queries:
search_result = search_brave(query, api_key, freshness)
query_stats.append({
'query': query,
'status': search_result['status'],
'count': search_result['total']
})
if search_result['status'] == 'ok':
for result in search_result['results']:
combined_text = f"{result['title']} {result['snippet']}"
if filter_content(combined_text, must_include, exclude):
result['topics'] = [topic_id]
all_results.append(result)
time.sleep(delay)
return {
'topic_id': topic_id,
'status': 'ok',
'queries_executed': len(queries),
'queries_ok': sum(1 for q in query_stats if q['status'] == 'ok'),
'query_stats': query_stats,
'count': len(all_results),
'articles': all_results
}
def get_tavily_api_key() -> Optional[str]:
"""Get Tavily API key from environment."""
return os.getenv('TAVILY_API_KEY', '').strip() or None
def search_tavily(query: str, api_key: str, topic: str = "general",
max_results: int = 10, search_depth: str = "basic",
days: Optional[int] = None) -> Dict[str, Any]:
"""Perform search using Tavily Search API.
Args:
topic: 'general' or 'news' (news for real-time updates)
days: Limit results to the last N days (None = no limit)
"""
payload = {
"api_key": api_key,
"query": query,
"search_depth": search_depth,
"topic": topic,
"max_results": max_results,
"include_answer": False,
}
if days is not None:
payload["days"] = days
try:
data = json.dumps(payload).encode()
req = Request(TAVILY_API_BASE, data=data, headers={
"Content-Type": "application/json",
"User-Agent": "MediaDigest/3.0"
}, method="POST")
with urlopen(req, timeout=TIMEOUT) as resp:
result = json.loads(resp.read().decode())
articles = []
for r in result.get("results", []):
articles.append({
"title": r.get("title", ""),
"link": r.get("url", ""),
"snippet": r.get("content", "")[:300],
"date": r.get("published_date", ""),
"source": "tavily",
})
return {
"query": query,
"status": "ok",
"total": len(articles),
"results": articles,
}
except HTTPError as e:
logging.warning(f"Tavily search error for '{query}': HTTP {e.code}")
return {"query": query, "status": "error", "total": 0, "results": [], "error": f"HTTP {e.code}"}
except Exception as e:
logging.warning(f"Tavily search error for '{query}': {e}")
return {"query": query, "status": "error", "total": 0, "results": [], "error": str(e)}
def search_topic_tavily(topic: Dict[str, Any], api_key: str, days: Optional[int] = None) -> Dict[str, Any]:
"""Search all queries for a topic using Tavily API."""
topic_id = topic["id"]
queries = topic["search"]["queries"]
must_include = topic["search"].get("must_include", [])
exclude = topic["search"].get("exclude", [])
all_results = []
query_stats = []
for query in queries:
search_result = search_tavily(query, api_key, topic="news", days=days)
query_stats.append({
"query": search_result["query"],
"status": search_result["status"],
"count": search_result["total"],
})
if search_result["status"] == "ok":
for result in search_result["results"]:
combined_text = f"{result['title']} {result['snippet']}"
if filter_content(combined_text, must_include, exclude):
result["topics"] = [topic_id]
all_results.append(result)
ok_count = sum(1 for s in query_stats if s["status"] == "ok")
return {
"topic": topic_id,
"status": "ok" if ok_count > 0 else "error",
"queries": len(queries),
"queries_ok": ok_count,
"count": len(all_results),
"articles": all_results,
"query_details": query_stats,
}
def generate_search_interface(topic: Dict[str, Any]) -> Dict[str, Any]:
"""Generate JSON interface for agent web search."""
topic_id = topic["id"]
queries = topic["search"]["queries"]
must_include = topic["search"].get("must_include", [])
exclude = topic["search"].get("exclude", [])
return {
'topic_id': topic_id,
'status': 'interface',
'search_required': True,
'queries': queries,
'filters': {
'must_include': must_include,
'exclude': exclude
},
'instructions': [
f"Use web_search tool for each query in 'queries' list",
f"Filter results using 'filters.must_include' and 'filters.exclude'",
f"Tag matching articles with topic: '{topic_id}'",
f"Expected max results per query: {MAX_RESULTS_PER_QUERY}"
],
'count': 0,
'articles': []
}
def load_topics(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
"""Load topics from configuration with overlay support."""
try:
from config_loader import load_merged_topics
except ImportError:
# Fallback for relative import
import sys
sys.path.append(str(Path(__file__).parent))
from config_loader import load_merged_topics
# Load merged topics from defaults + optional user overlay
topics = load_merged_topics(defaults_dir, config_dir)
logging.info(f"Loaded {len(topics)} topics for web search")
return topics
def convert_freshness(hours: int) -> str:
"""Convert hours to Brave API freshness format."""
if hours <= 24:
return "pd" # past day
elif hours <= 168: # 7 days
return "pw" # past week
elif hours <= 720: # 30 days
return "pm" # past month
else:
return "py" # past year
def main():
"""Main web search function."""
parser = argparse.ArgumentParser(
description="Perform web searches for tech digest topics. "
"Can use Brave Search API (BRAVE_API_KEY) or generate interface for agents.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# With Brave API
export BRAVE_API_KEY="your_key_here"
python3 fetch-web.py --defaults config/defaults --config workspace/config --freshness 24h
# Without API (generates interface)
python3 fetch-web.py --config workspace/config --output web-search-interface.json # backward compatibility
"""
)
parser.add_argument(
"--defaults",
type=Path,
default=Path("config/defaults"),
help="Default configuration directory with skill defaults (default: config/defaults)"
)
parser.add_argument(
"--config",
type=Path,
help="User configuration directory for overlays (optional)"
)
parser.add_argument(
"--freshness",
default="48h",
help="Search freshness: 24h, 48h, 1w, 1m (default: 48h)"
)
parser.add_argument(
"--output", "-o",
type=Path,
help="Output JSON path (default: auto-generated temp file)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-fetch even if cached output exists"
)
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Resume support: skip if output exists, is valid JSON, and < 1 hour old
if args.output and args.output.exists() and not args.force:
try:
age_seconds = time.time() - args.output.stat().st_mtime
if age_seconds < 3600:
with open(args.output, 'r') as f:
json.load(f)
logger.info(f"Skipping (cached output exists): {args.output}")
return 0
except (json.JSONDecodeError, OSError):
pass
# Auto-generate unique output path if not specified
if not args.output:
fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-web-", suffix=".json")
os.close(fd)
args.output = Path(temp_path)
try:
# Backward compatibility: if only --config provided, use old behavior
if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
logger.debug("Backward compatibility mode: using --config as sole source")
topics = load_topics(args.config, None)
else:
topics = load_topics(args.defaults, args.config)
if not topics:
logger.warning("No topics found")
return 1
# Backend selection: WEB_SEARCH_BACKEND env or auto-detect
web_backend = os.getenv('WEB_SEARCH_BACKEND', 'auto').lower()
tavily_key = get_tavily_api_key()
brave_keys = get_brave_api_keys()
use_tavily = False
use_brave = False
api_key = None
max_qps = 1
max_workers = 1
if web_backend == 'tavily' and tavily_key:
use_tavily = True
elif web_backend == 'brave' and brave_keys:
api_key, max_qps, max_workers = select_brave_key_and_limits(brave_keys)
use_brave = bool(api_key)
elif web_backend == 'auto':
if tavily_key:
use_tavily = True
elif brave_keys:
api_key, max_qps, max_workers = select_brave_key_and_limits(brave_keys)
use_brave = bool(api_key)
if use_tavily:
logger.info(f"Using Tavily Search API for {len(topics)} topics")
# Convert freshness to days for Tavily
tavily_days = None
if args.freshness in ('pd',): tavily_days = 1
elif args.freshness in ('pw',): tavily_days = 7
elif args.freshness in ('pm',): tavily_days = 30
elif args.freshness in ('py',): tavily_days = 365
else:
try:
tavily_days = max(1, int(args.freshness.rstrip('h')) // 24)
except (ValueError, AttributeError):
tavily_days = 2
results = []
for topic in topics:
if not topic.get("search", {}).get("queries"):
logger.debug(f"Topic {topic['id']} has no search queries, skipping")
continue
logger.debug(f"Searching topic: {topic['id']}")
result = search_topic_tavily(topic, tavily_key, days=tavily_days)
results.append(result)
total_articles = sum(r.get("count", 0) for r in results)
ok_topics = sum(1 for r in results if r["status"] == "ok")
output = {
"generated": datetime.now(timezone.utc).isoformat(),
"source_type": "web",
"defaults_dir": str(args.defaults),
"config_dir": str(args.config) if args.config else None,
"freshness": args.freshness,
"api_used": "tavily",
"topics_total": len(topics),
"topics_ok": ok_topics,
"total_articles": total_articles,
"topics": results,
}
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
logger.info(f"\u2705 Done: {ok_topics}/{len(topics)} topics ok, {total_articles} articles β {args.output}")
return 0
elif use_brave:
logger.info(f"Using Brave Search API for {len(topics)} topics ({len(brave_keys)} key(s) configured)")
delay = 1.0 / max_qps if max_workers == 1 else 0
# Convert freshness to Brave API format
# Accept both Brave native (pd/pw/pm/py) and human-friendly (24h/48h/1w/1m)
if args.freshness in ('pd', 'pw', 'pm', 'py'):
brave_freshness = args.freshness
else:
freshness_map = {'1w': 168, '1m': 720, '1y': 8760}
if args.freshness in freshness_map:
freshness_hours = freshness_map[args.freshness]
else:
try:
freshness_hours = int(args.freshness.rstrip('h'))
except ValueError:
logger.warning(f"Unrecognized freshness format '{args.freshness}', defaulting to 48h")
freshness_hours = 48
brave_freshness = convert_freshness(freshness_hours)
results = []
for topic in topics:
if not topic.get("search", {}).get("queries"):
logger.debug(f"Topic {topic['id']} has no search queries, skipping")
continue
logger.debug(f"Searching topic: {topic['id']}")
result = search_topic_brave(topic, api_key, brave_freshness,
max_workers=max_workers, delay=delay)
results.append(result)
total_articles = sum(r.get("count", 0) for r in results)
ok_topics = sum(1 for r in results if r["status"] == "ok")
output = {
"generated": datetime.now(timezone.utc).isoformat(),
"source_type": "web",
"defaults_dir": str(args.defaults),
"config_dir": str(args.config) if args.config else None,
"freshness": args.freshness,
"api_used": "brave",
"topics_total": len(results),
"topics_ok": ok_topics,
"total_articles": total_articles,
"topics": results
}
logger.info(f"β
Searched {ok_topics}/{len(results)} topics, "
f"{total_articles} articles found")
else:
logger.info("No BRAVE_API_KEY found, generating search interface for agents")
results = []
for topic in topics:
if not topic.get("search", {}).get("queries"):
continue
result = generate_search_interface(topic)
results.append(result)
output = {
"generated": datetime.now(timezone.utc).isoformat(),
"source_type": "web",
"defaults_dir": str(args.defaults),
"config_dir": str(args.config) if args.config else None,
"freshness": args.freshness,
"api_used": "interface",
"topics_total": len(results),
"topics_ok": 0, # Requires manual execution
"total_articles": 0,
"topics": results,
"agent_instructions": [
"This file contains search interface for web_search tool",
"For each topic, execute the queries using web_search",
"Apply the filters (must_include/exclude) to results",
"Tag matching articles with the topic_id",
"Update this file with results for merge-sources.py"
]
}
logger.info(f"β
Generated search interface for {len(results)} topics")
# Write output
json_str = json.dumps(output, ensure_ascii=False, indent=2)
with open(args.output, "w", encoding='utf-8') as f:
f.write(json_str)
logger.info(f"Output written to: {args.output}")
return 0
except Exception as e:
logger.error(f"π₯ Web search failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/generate-pdf.py
```python
#!/usr/bin/env python3
"""
Generate styled PDF from markdown digest report.
Converts a media-news-digest markdown report into a professional PDF
with Chinese font support, emoji icons, and clean typography.
Usage:
python3 generate-pdf.py --input /tmp/md-report.md --output /tmp/md-digest.pdf [--verbose]
Requirements:
- weasyprint (pip install weasyprint)
- Noto Sans CJK SC font (apt install fonts-noto-cjk)
"""
import argparse
import html
import re
import sys
import logging
from pathlib import Path
from urllib.parse import urlparse
# ---------------------------------------------------------------------------
# Markdown β HTML conversion (with sanitization)
# ---------------------------------------------------------------------------
def escape(text: str) -> str:
return html.escape(text, quote=True)
def is_safe_url(url: str) -> bool:
try:
parsed = urlparse(url.strip())
return parsed.scheme in ('http', 'https')
except Exception:
return False
def _process_inline(text: str) -> str:
"""Process inline markdown with HTML escaping."""
result = escape(text)
# Bold: **text**
result = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', result)
# Inline code: `text`
result = re.sub(
r'`(.+?)`',
r'<code>\1</code>',
result
)
# Angle-bracket links: <https://...>
def restore_link(m):
url = html.unescape(m.group(1))
if is_safe_url(url):
escaped_url = escape(url)
try:
domain = urlparse(url).netloc
return f'<a href="{escaped_url}">{escape(domain)}</a>'
except Exception:
return f'<a href="{escaped_url}">{escaped_url}</a>'
return escape(url)
result = re.sub(r'<(https?://[^&]+?)>', restore_link, result)
# Markdown links: [text](url)
def restore_md_link(m):
label = html.unescape(m.group(1))
url = html.unescape(m.group(2))
if is_safe_url(url):
return f'<a href="{escape(url)}">{escape(label)}</a>'
return escape(label)
result = re.sub(r'\[([^\]]+?)\]\(([^)]+?)\)', restore_md_link, result)
return result
def markdown_to_html(md_content: str) -> str:
"""Convert markdown digest to styled HTML for PDF rendering."""
lines = md_content.strip().split('\n')
html_parts = []
in_list = False
for line in lines:
stripped = line.strip()
if not stripped:
if in_list:
html_parts.append('</ul>')
in_list = False
continue
# H1
if stripped.startswith('# '):
title = _process_inline(stripped[2:])
html_parts.append(f'<h1>{title}</h1>')
continue
# H2
if stripped.startswith('## '):
if in_list:
html_parts.append('</ul>')
in_list = False
section = _process_inline(stripped[3:])
html_parts.append(f'<h2>{section}</h2>')
continue
# H3
if stripped.startswith('### '):
if in_list:
html_parts.append('</ul>')
in_list = False
section = _process_inline(stripped[4:])
html_parts.append(f'<h3>{section}</h3>')
continue
# Blockquote
if stripped.startswith('> '):
text = _process_inline(stripped[2:])
html_parts.append(f'<blockquote>{text}</blockquote>')
continue
# Horizontal rule
if stripped == '---':
html_parts.append('<hr>')
continue
# Bullet items
if stripped.startswith('β’ ') or stripped.startswith('- '):
if not in_list:
html_parts.append('<ul>')
in_list = True
item_text = stripped[2:]
safe_item = _process_inline(item_text)
html_parts.append(f'<li>{safe_item}</li>')
continue
# Angle-bracket link on its own line (often source URLs)
if stripped.startswith('<http') and in_list:
url = stripped.strip('<> ')
if is_safe_url(url):
escaped_url = escape(url)
try:
domain = urlparse(url).netloc
label = escape(domain)
except Exception:
label = escaped_url
html_parts.append(f'<li class="source-link"><a href="{escaped_url}">{label}</a></li>')
continue
# Stats/footer
if stripped.startswith('π') or stripped.startswith('π€'):
text = _process_inline(stripped)
html_parts.append(f'<p class="footer">{text}</p>')
continue
# Regular paragraph
text = _process_inline(stripped)
html_parts.append(f'<p>{text}</p>')
if in_list:
html_parts.append('</ul>')
return '\n'.join(html_parts)
# ---------------------------------------------------------------------------
# PDF CSS
# ---------------------------------------------------------------------------
PDF_CSS = """
@page {
size: A4;
margin: 2cm 2.5cm;
@top-center {
content: "Tech Digest";
font-size: 9px;
color: #999;
font-family: 'Noto Sans CJK SC', 'Noto Sans SC', sans-serif;
}
@bottom-center {
content: counter(page) " / " counter(pages);
font-size: 9px;
color: #999;
font-family: 'Noto Sans CJK SC', 'Noto Sans SC', sans-serif;
}
}
body {
font-family: 'Noto Sans CJK SC', 'Noto Sans SC', 'PingFang SC',
'Microsoft YaHei', 'Segoe UI', Roboto, sans-serif;
font-size: 11pt;
line-height: 1.7;
color: #1a1a1a;
}
h1 {
font-size: 22pt;
color: #111;
border-bottom: 3px solid #2563eb;
padding-bottom: 8px;
margin-bottom: 20px;
margin-top: 0;
}
h2 {
font-size: 15pt;
color: #1e40af;
margin-top: 28px;
margin-bottom: 12px;
padding-bottom: 4px;
border-bottom: 1px solid #e5e7eb;
}
h3 {
font-size: 13pt;
color: #374151;
margin-top: 20px;
margin-bottom: 8px;
}
blockquote {
background: #f0f4ff;
border-left: 4px solid #2563eb;
padding: 12px 16px;
margin: 16px 0;
color: #374151;
font-size: 10.5pt;
border-radius: 0 6px 6px 0;
}
ul {
padding-left: 20px;
margin: 8px 0;
}
li {
margin-bottom: 10px;
line-height: 1.6;
}
li.source-link {
list-style: none;
margin-bottom: 2px;
margin-top: -6px;
}
li.source-link a {
font-size: 9pt;
}
a {
color: #2563eb;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
strong {
color: #111;
}
code {
font-family: 'Noto Sans Mono CJK SC', 'SF Mono', 'Fira Code', monospace;
font-size: 9pt;
background: #f3f4f6;
padding: 2px 5px;
border-radius: 3px;
color: #6b7280;
}
hr {
border: none;
border-top: 1px solid #e5e7eb;
margin: 28px 0;
}
p.footer {
font-size: 8.5pt;
color: #9ca3af;
margin-top: 4px;
}
/* First page title area */
h1 + blockquote {
margin-top: 12px;
}
/* Emoji rendering */
body {
-webkit-font-smoothing: antialiased;
}
"""
# ---------------------------------------------------------------------------
# HTML wrapper
# ---------------------------------------------------------------------------
def wrap_html(body: str) -> str:
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<style>
{PDF_CSS}
</style>
</head>
<body>
{body}
</body>
</html>"""
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Generate styled PDF from markdown digest report",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python3 generate-pdf.py -i /tmp/md-report.md -o /tmp/md-digest.pdf
python3 generate-pdf.py -i report.md -o digest.pdf --verbose
Requirements:
pip install weasyprint
apt install fonts-noto-cjk (for Chinese support)
"""
)
parser.add_argument("--input", "-i", required=True, help="Input markdown file")
parser.add_argument("--output", "-o", required=True, help="Output PDF file")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(levelname)s: %(message)s"
)
try:
import weasyprint
except ImportError:
logging.error("weasyprint not installed. Run: pip install weasyprint")
sys.exit(1)
input_path = Path(args.input)
if not input_path.exists():
logging.error(f"Input file not found: {args.input}")
sys.exit(1)
md_content = input_path.read_text(encoding='utf-8')
logging.info(f"Converting {args.input} ({len(md_content)} chars)")
# Convert markdown β HTML β PDF
body_html = markdown_to_html(md_content)
full_html = wrap_html(body_html)
# Optionally save intermediate HTML for debugging
if args.verbose:
html_debug = Path(args.output).with_suffix('.html')
html_debug.write_text(full_html, encoding='utf-8')
logging.debug(f"Debug HTML saved: {html_debug}")
# Generate PDF
logging.info("Generating PDF...")
doc = weasyprint.HTML(string=full_html)
doc.write_pdf(args.output)
output_size = Path(args.output).stat().st_size
logging.info(f"β
PDF generated: {args.output} ({output_size / 1024:.0f} KB)")
if __name__ == "__main__":
main()
```
### scripts/merge-sources.py
```python
#!/usr/bin/env python3
"""
Merge data from all sources (RSS, Twitter, Web) with quality scoring.
Reads output from fetch-rss.py, fetch-twitter.py, and fetch-web.py,
merges articles, removes duplicates, applies quality scoring, and
groups by topics for final digest output.
Usage:
python3 merge-sources.py [--rss FILE] [--twitter FILE] [--web FILE] [--output FILE] [--verbose]
"""
import json
import sys
import os
import argparse
import logging
import tempfile
import re
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional, Set
from difflib import SequenceMatcher
from urllib.parse import urlparse
# Quality scoring weights
SCORE_MULTI_SOURCE = 5 # Article appears in multiple sources
SCORE_PRIORITY_SOURCE = 3 # From high-priority source
SCORE_RECENT = 2 # Recent article (< 24h)
SCORE_ENGAGEMENT_VIRAL = 5 # Viral tweet (1000+ likes or 500+ RTs)
SCORE_ENGAGEMENT_HIGH = 3 # High engagement (500+ likes or 200+ RTs)
SCORE_ENGAGEMENT_MED = 2 # Medium engagement (100+ likes or 50+ RTs)
SCORE_ENGAGEMENT_LOW = 1 # Some engagement (50+ likes or 20+ RTs)
PENALTY_DUPLICATE = -10 # Duplicate/very similar title
PENALTY_OLD_REPORT = -5 # Already in previous digest
# Deduplication thresholds
TITLE_SIMILARITY_THRESHOLD = 0.85
DOMAIN_DUPLICATE_THRESHOLD = 0.95
def setup_logging(verbose: bool) -> logging.Logger:
"""Setup logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger(__name__)
def load_source_data(file_path: Optional[Path]) -> Dict[str, Any]:
"""Load source data from JSON file."""
if not file_path or not file_path.exists():
return {"sources": [], "total_articles": 0}
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except Exception as e:
logging.warning(f"Failed to load {file_path}: {e}")
return {"sources": [], "total_articles": 0}
def normalize_title(title: str) -> str:
"""Normalize title for comparison."""
# Remove common prefixes/suffixes
title = re.sub(r'^(RT\s+@\w+:\s*)', '', title, flags=re.IGNORECASE)
title = re.sub(r'\s*[|\-β]\s*[^|]*$', '', title) # Remove " | Site Name" endings
# Normalize whitespace and punctuation
title = re.sub(r'\s+', ' ', title).strip()
title = re.sub(r'[^\w\s]', '', title.lower())
return title
def calculate_title_similarity(title1: str, title2: str) -> float:
"""Calculate similarity between two titles."""
norm1 = normalize_title(title1)
norm2 = normalize_title(title2)
if not norm1 or not norm2:
return 0.0
return SequenceMatcher(None, norm1, norm2).ratio()
def get_domain(url: str) -> str:
"""Extract domain from URL."""
try:
return urlparse(url).netloc.lower().replace('www.', '')
except Exception:
return ''
def normalize_url(url: str) -> str:
"""Normalize URL for dedup comparison (strip query, fragment, trailing slash, www.)."""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower().replace('www.', '')
path = parsed.path.rstrip('/')
return f"{domain}{path}"
except Exception:
return url
def calculate_base_score(article: Dict[str, Any], source: Dict[str, Any]) -> float:
"""Calculate base quality score for an article."""
score = 0.0
# Priority source bonus
if source.get("priority", False):
score += SCORE_PRIORITY_SOURCE
# Recency bonus (< 24 hours)
try:
article_date = datetime.fromisoformat(article["date"].replace('Z', '+00:00'))
hours_old = (datetime.now(timezone.utc) - article_date).total_seconds() / 3600
if hours_old < 24:
score += SCORE_RECENT
except Exception:
pass
# Twitter engagement bonus (tiered)
if source.get("source_type") == "twitter" and "metrics" in article:
metrics = article["metrics"]
likes = metrics.get("like_count", 0)
retweets = metrics.get("retweet_count", 0)
if likes >= 1000 or retweets >= 500:
score += SCORE_ENGAGEMENT_VIRAL
elif likes >= 500 or retweets >= 200:
score += SCORE_ENGAGEMENT_HIGH
elif likes >= 100 or retweets >= 50:
score += SCORE_ENGAGEMENT_MED
elif likes >= 50 or retweets >= 20:
score += SCORE_ENGAGEMENT_LOW
# RSS from priority sources get extra weight (official blogs, research papers)
if source.get("source_type") == "rss" and source.get("priority", False):
score += 2 # Extra priority RSS bonus
return score
def _extract_tokens(title: str) -> Set[str]:
"""Extract significant tokens from a normalized title for bucketing."""
norm = normalize_title(title)
# Split into tokens, filter short/common words
stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'in', 'on', 'at',
'to', 'for', 'of', 'and', 'or', 'with', 'by', 'from', 'as', 'it',
'its', 'that', 'this', 'be', 'has', 'had', 'have', 'not', 'but',
'what', 'how', 'new', 'will', 'can', 'do', 'does', 'did'}
tokens = set()
for word in norm.split():
if len(word) >= 3 and word not in stopwords:
tokens.add(word)
return tokens
def _build_token_buckets(articles: List[Dict[str, Any]]) -> Dict[int, Set[int]]:
"""Build token-based buckets mapping each article index to candidate duplicate indices.
Two articles are candidates if they share 2+ significant tokens.
Returns dict: article_index -> set of candidate article indices to compare against.
"""
from collections import defaultdict
# token -> list of article indices
token_to_indices: Dict[str, List[int]] = defaultdict(list)
article_tokens: List[Set[str]] = []
for i, article in enumerate(articles):
tokens = _extract_tokens(article.get("title", ""))
article_tokens.append(tokens)
for token in tokens:
token_to_indices[token].append(i)
# For each article, find candidates sharing 2+ tokens
candidates: Dict[int, Set[int]] = defaultdict(set)
for i, tokens in enumerate(article_tokens):
# Count how many tokens each other article shares with this one
overlap_count: Dict[int, int] = defaultdict(int)
for token in tokens:
for j in token_to_indices[token]:
if j != i:
overlap_count[j] += 1
for j, count in overlap_count.items():
if count >= 2:
candidates[i].add(j)
return candidates
def deduplicate_articles(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Remove duplicate articles based on title similarity.
Uses token-based bucketing to avoid O(nΒ²) SequenceMatcher comparisons.
Only articles sharing 2+ significant title tokens are compared.
Domain saturation is handled separately per-topic after grouping.
"""
if not articles:
return articles
# Sort by quality score (highest first) to keep best versions
articles.sort(key=lambda x: x.get("quality_score", 0), reverse=True)
# Phase 1: URL dedup (exact URL match after normalization)
url_seen: Dict[str, int] = {} # normalized_url -> index in articles
url_duplicates: Set[int] = set()
for i, article in enumerate(articles):
url = article.get("link", "")
if not url:
continue
norm_url = normalize_url(url)
if norm_url in url_seen:
# Keep the one with higher quality_score (articles already sorted by score)
url_duplicates.add(i)
logging.debug(f"URL duplicate: {url} ~= {articles[url_seen[norm_url]].get('link','')}")
else:
url_seen[norm_url] = i
if url_duplicates:
articles = [a for i, a in enumerate(articles) if i not in url_duplicates]
logging.info(f"URL dedup: removed {len(url_duplicates)} duplicates")
# Phase 2: Title similarity dedup
deduplicated = []
# Build token buckets for candidate pairs
candidates = _build_token_buckets(articles)
# Track which indices have been marked as duplicates
duplicate_indices: Set[int] = set()
for i, article in enumerate(articles):
if i in duplicate_indices:
continue
title = article.get("title", "")
# Mark future candidates as duplicates using SequenceMatcher (only within bucket)
for j in candidates.get(i, set()):
if j > i and j not in duplicate_indices:
other_title = articles[j].get("title", "")
# Quick length check β titles with >30% length difference are unlikely duplicates
norm_i = normalize_title(title)
norm_j = normalize_title(other_title)
if abs(len(norm_i) - len(norm_j)) > 0.3 * max(len(norm_i), len(norm_j), 1):
continue
similarity = calculate_title_similarity(title, other_title)
if similarity >= TITLE_SIMILARITY_THRESHOLD:
logging.debug(f"Title duplicate: '{other_title}' ~= '{title}' ({similarity:.2f})")
duplicate_indices.add(j)
deduplicated.append(article)
logging.info(f"Deduplication: {len(articles)} β {len(deduplicated)} articles")
return deduplicated
# Domains exempt from per-topic limits (multi-author platforms)
DOMAIN_LIMIT_EXEMPT = {"x.com", "twitter.com", "github.com", "reddit.com"}
def apply_domain_limits(articles: List[Dict[str, Any]], max_per_domain: int = 3) -> List[Dict[str, Any]]:
"""Limit articles per domain within a single topic group.
Should be called per-topic after group_by_topics() to ensure
each topic gets its own domain budget.
"""
domain_counts: Dict[str, int] = {}
result = []
for article in articles:
domain = get_domain(article.get("link", ""))
if domain and domain not in DOMAIN_LIMIT_EXEMPT:
count = domain_counts.get(domain, 0)
if count >= max_per_domain:
logging.debug(f"Domain limit ({max_per_domain}): skipping {domain} article in topic")
continue
domain_counts[domain] = count + 1
result.append(article)
return result
def merge_article_sources(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Merge articles that appear from multiple sources."""
if not articles:
return articles
# Group articles by normalized title
title_groups = {}
for article in articles:
norm_title = normalize_title(article.get("title", ""))
if norm_title not in title_groups:
title_groups[norm_title] = []
title_groups[norm_title].append(article)
merged = []
for group in title_groups.values():
if len(group) == 1:
merged.append(group[0])
else:
# Multiple sources for same story - merge and boost score
primary = max(group, key=lambda x: x.get("quality_score", 0))
# Collect all source types
source_types = set(article.get("source_type", "") for article in group)
source_names = [article.get("source_name", "") for article in group]
# Multi-source bonus
multi_source_bonus = len(source_types) * SCORE_MULTI_SOURCE
primary["quality_score"] = primary.get("quality_score", 0) + multi_source_bonus
# Add metadata about multiple sources
primary["multi_source"] = True
primary["source_count"] = len(group)
primary["all_sources"] = source_names[:3] # Limit to avoid bloat
logging.debug(f"Merged {len(group)} sources for: '{primary['title'][:50]}...'")
merged.append(primary)
return merged
def load_previous_digests(archive_dir: Path, days: int = 7) -> Set[str]:
"""Load titles from previous digests to avoid repeats."""
if not archive_dir.exists():
return set()
seen_titles = set()
cutoff = datetime.now() - timedelta(days=days)
try:
for file_path in archive_dir.glob("*.md"):
# Extract date from filename
match = re.search(r'(\d{4}-\d{2}-\d{2})', file_path.name)
if match:
try:
file_date = datetime.strptime(match.group(1), "%Y-%m-%d")
if file_date < cutoff:
continue
except ValueError:
continue
# Extract titles from markdown
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Simple title extraction (assumes format like "- [Title](link)")
for match in re.finditer(r'-\s*\[([^\]]+)\]', content):
title = normalize_title(match.group(1))
if title:
seen_titles.add(title)
except Exception as e:
logging.debug(f"Failed to load previous digests: {e}")
logging.info(f"Loaded {len(seen_titles)} titles from previous {days} days")
return seen_titles
def apply_previous_digest_penalty(articles: List[Dict[str, Any]],
previous_titles: Set[str]) -> List[Dict[str, Any]]:
"""Apply penalty to articles that appeared in previous digests."""
if not previous_titles:
return articles
penalized_count = 0
for article in articles:
norm_title = normalize_title(article.get("title", ""))
if norm_title in previous_titles:
article["quality_score"] = article.get("quality_score", 0) + PENALTY_OLD_REPORT
article["in_previous_digest"] = True
penalized_count += 1
logging.info(f"Applied previous digest penalty to {penalized_count} articles")
return articles
def group_by_topics(articles: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""Group articles by their topics."""
topic_groups = {}
for article in articles:
topics = article.get("topics", [])
if not topics:
topics = ["uncategorized"]
for topic in topics:
if topic not in topic_groups:
topic_groups[topic] = []
# Add copy with single topic for cleaner grouping
article_copy = article.copy()
article_copy["primary_topic"] = topic
topic_groups[topic].append(article_copy)
# Sort articles within each topic by quality score
for topic in topic_groups:
topic_groups[topic].sort(key=lambda x: x.get("quality_score", 0), reverse=True)
return topic_groups
def main():
"""Main merge and scoring function."""
parser = argparse.ArgumentParser(
description="Merge articles from all sources with quality scoring and deduplication.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 merge-sources.py --rss rss.json --twitter twitter.json --web web.json
python3 merge-sources.py --rss rss.json --output merged.json --verbose
python3 merge-sources.py --archive-dir workspace/archive/media-digest
"""
)
parser.add_argument(
"--rss",
type=Path,
help="RSS fetch results JSON file"
)
parser.add_argument(
"--twitter",
type=Path,
help="Twitter fetch results JSON file"
)
parser.add_argument(
"--web",
type=Path,
help="Web search results JSON file"
)
parser.add_argument(
"--github",
type=Path,
help="GitHub releases results JSON file"
)
parser.add_argument(
"--trending",
type=Path,
help="GitHub trending repos JSON file"
)
parser.add_argument(
"--reddit",
type=Path,
help="Reddit posts results JSON file"
)
parser.add_argument(
"--output", "-o",
type=Path,
help="Output JSON path (default: auto-generated temp file)"
)
parser.add_argument(
"--archive-dir",
type=Path,
help="Archive directory for previous digest penalty"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Auto-generate unique output path if not specified
if not args.output:
fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-merged-", suffix=".json")
os.close(fd)
args.output = Path(temp_path)
try:
# Load source data
rss_data = load_source_data(args.rss)
twitter_data = load_source_data(args.twitter)
web_data = load_source_data(args.web)
github_data = load_source_data(args.github)
trending_data = load_source_data(args.trending) if hasattr(args, "trending") else None
reddit_data = load_source_data(args.reddit)
logger.info(f"Loaded sources - RSS: {rss_data.get('total_articles', 0)}, "
f"Twitter: {twitter_data.get('total_articles', 0)}, "
f"Web: {web_data.get('total_articles', 0)}, "
f"GitHub: {github_data.get('total_articles', 0)} releases + {trending_data.get('total', 0) if trending_data else 0} trending, "
f"Reddit: {reddit_data.get('total_posts', 0)}")
# Collect all articles with source context
all_articles = []
# Process RSS articles
for source in rss_data.get("sources", []):
for article in source.get("articles", []):
article["source_type"] = "rss"
article["source_name"] = source.get("name", "")
article["source_id"] = source.get("source_id", "")
article["quality_score"] = calculate_base_score(article, source)
all_articles.append(article)
# Process Twitter articles
for source in twitter_data.get("sources", []):
for article in source.get("articles", []):
article["source_type"] = "twitter"
article["source_name"] = f"@{source.get('handle', '')}"
article["display_name"] = source.get("name", "")
article["source_id"] = source.get("source_id", "")
article["quality_score"] = calculate_base_score(article, source)
all_articles.append(article)
# Process Web articles
for topic_result in web_data.get("topics", []):
for article in topic_result.get("articles", []):
article["source_type"] = "web"
article["source_name"] = "Web Search"
article["source_id"] = f"web-{topic_result.get('topic_id', '')}"
# Build a minimal source dict so web articles go through the same scoring
web_source = {
"source_type": "web",
"priority": False,
}
article["quality_score"] = calculate_base_score(article, web_source)
all_articles.append(article)
# Process GitHub articles
for source in github_data.get("sources", []):
for article in source.get("articles", []):
article["source_type"] = "github"
article["source_name"] = source.get("name", "")
article["source_id"] = source.get("source_id", "")
article["quality_score"] = calculate_base_score(article, source)
all_articles.append(article)
# Process Reddit articles
for source in reddit_data.get("subreddits", []):
for article in source.get("articles", []):
article["source_type"] = "reddit"
article["source_name"] = f"r/{source.get('subreddit', '')}"
article["source_id"] = source.get("source_id", "")
reddit_source = {
"source_type": "reddit",
"priority": source.get("priority", False),
}
article["quality_score"] = calculate_base_score(article, reddit_source)
# Reddit score bonus
score = article.get("score", 0)
if score > 500:
article["quality_score"] += 5
elif score > 200:
article["quality_score"] += 3
elif score > 100:
article["quality_score"] += 1
all_articles.append(article)
# Load GitHub trending repos
if trending_data:
for repo in trending_data.get("repos", []):
article = {
"title": f"{repo['repo']}: {repo['description']}" if repo.get('description') else repo['repo'],
"link": repo.get("url", f"https://github.com/{repo['repo']}"),
"snippet": repo.get("description", ""),
"date": repo.get("pushed_at", ""),
"source": "github-trending",
"source_type": "github_trending",
"topics": repo.get("topics", []),
"stars": repo.get("stars", 0),
"daily_stars_est": repo.get("daily_stars_est", 0),
"forks": repo.get("forks", 0),
"language": repo.get("language", ""),
"quality_score": 5 + min(10, repo.get("daily_stars_est", 0) // 10),
}
all_articles.append(article)
total_collected = len(all_articles)
logger.info(f"Total articles collected: {total_collected}")
# Load previous digest titles for penalty
previous_titles = set()
if args.archive_dir:
previous_titles = load_previous_digests(args.archive_dir)
# Apply previous digest penalty
all_articles = apply_previous_digest_penalty(all_articles, previous_titles)
# Merge multi-source articles
all_articles = merge_article_sources(all_articles)
logger.info(f"After merging multi-source: {len(all_articles)}")
# Deduplicate articles
all_articles = deduplicate_articles(all_articles)
# Group by topics
topic_groups = group_by_topics(all_articles)
# Apply per-topic domain limits (max 3 articles per domain per topic)
for topic in topic_groups:
before = len(topic_groups[topic])
topic_groups[topic] = apply_domain_limits(topic_groups[topic])
after = len(topic_groups[topic])
if before != after:
logger.info(f"Domain limits ({topic}): {before} β {after}")
# Recalculate total after domain limits
total_after_domain_limits = sum(len(articles) for articles in topic_groups.values())
topic_counts = {topic: len(articles) for topic, articles in topic_groups.items()}
output = {
"generated": datetime.now(timezone.utc).isoformat(),
"input_sources": {
"rss_articles": rss_data.get("total_articles", 0),
"twitter_articles": twitter_data.get("total_articles", 0),
"web_articles": web_data.get("total_articles", 0),
"github_articles": github_data.get("total_articles", 0),
"github_trending": trending_data.get("total", 0) if trending_data else 0,
"reddit_posts": reddit_data.get("total_posts", 0),
"total_input": total_collected
},
"processing": {
"deduplication_applied": True,
"multi_source_merging": True,
"previous_digest_penalty": len(previous_titles) > 0,
"quality_scoring": True
},
"output_stats": {
"total_articles": total_after_domain_limits,
"topics_count": len(topic_groups),
"topic_distribution": topic_counts
},
"topics": {
topic: {
"count": len(articles),
"articles": articles
} for topic, articles in topic_groups.items()
}
}
# Write output
json_str = json.dumps(output, ensure_ascii=False, indent=2)
with open(args.output, "w", encoding='utf-8') as f:
f.write(json_str)
logger.info(f"β
Merged and scored articles:")
logger.info(f" Input: {total_collected} articles")
logger.info(f" Output: {total_after_domain_limits} articles across {len(topic_groups)} topics")
logger.info(f" File: {args.output}")
return 0
except Exception as e:
logger.error(f"π₯ Merge failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/sanitize-html.py
```python
#!/usr/bin/env python3
"""
Sanitize media digest markdown report into safe HTML email.
Reads a markdown report file, escapes all text content to prevent XSS,
and outputs a styled HTML email body safe for injection into email clients.
Usage:
python3 sanitize-html.py --input /tmp/md-report.md --output /tmp/md-email.html [--verbose]
Security:
- All text content is HTML-escaped (prevents XSS from malicious RSS/Twitter/web content)
- Only whitelisted tags/attributes are allowed
- URLs are validated (must be http/https)
- No JavaScript, event handlers, or data: URIs allowed
"""
import argparse
import html
import re
import sys
import logging
from urllib.parse import urlparse
def escape(text: str) -> str:
"""HTML-escape text content."""
return html.escape(text, quote=True)
def is_safe_url(url: str) -> bool:
"""Validate URL is http(s) only β no javascript:, data:, etc."""
try:
parsed = urlparse(url.strip())
return parsed.scheme in ('http', 'https')
except Exception:
return False
def safe_link(url: str, label: str = None, style: str = "color:#0969da;font-size:13px") -> str:
"""Generate a safe HTML link with escaped content."""
url = url.strip()
if not is_safe_url(url):
return escape(label or url)
escaped_url = escape(url)
escaped_label = escape(label or url)
return f'<a href="{escaped_url}" style="{style}">{escaped_label}</a>'
def markdown_to_safe_html(md_content: str) -> str:
"""Convert markdown digest report to sanitized HTML email."""
lines = md_content.strip().split('\n')
html_parts = []
# Email wrapper open
html_parts.append(
'<div style="max-width:640px;margin:0 auto;font-family:'
'-apple-system,BlinkMacSystemFont,\'Segoe UI\',Roboto,sans-serif;'
'color:#1a1a1a;line-height:1.6">'
)
in_list = False
for line in lines:
stripped = line.strip()
# Skip empty lines
if not stripped:
if in_list:
html_parts.append('</ul>')
in_list = False
continue
# H1: # Title
if stripped.startswith('# '):
title = escape(stripped[2:])
html_parts.append(
f'<h1 style="font-size:22px;border-bottom:2px solid #e5e5e5;'
f'padding-bottom:8px">{title}</h1>'
)
continue
# H2: ## Section
if stripped.startswith('## '):
if in_list:
html_parts.append('</ul>')
in_list = False
section = escape(stripped[3:])
html_parts.append(
f'<h2 style="font-size:17px;margin-top:24px;color:#333">{section}</h2>'
)
continue
# Blockquote: > executive summary
if stripped.startswith('> '):
text = escape(stripped[2:])
html_parts.append(
f'<p style="color:#555;font-size:14px;background:#f8f9fa;'
f'padding:12px;border-radius:6px">{text}</p>'
)
continue
# Horizontal rule
if stripped == '---':
html_parts.append('<hr style="border:none;border-top:1px solid #e5e5e5;margin:24px 0">')
continue
# Bullet items: β’ or -
if stripped.startswith('β’ ') or stripped.startswith('- '):
if not in_list:
html_parts.append('<ul style="padding-left:20px">')
in_list = True
item_text = stripped[2:]
safe_item = _process_inline(item_text)
html_parts.append(f'<li style="margin-bottom:10px">{safe_item}</li>')
continue
# Continuation of bullet (indented line with link)
if stripped.startswith('<http') and in_list:
url = stripped.strip('<> ')
link = safe_link(url)
html_parts.append(f'<li style="margin-bottom:2px;list-style:none">{link}</li>')
continue
# Stats/footer line
if stripped.startswith('π') or stripped.startswith('π€'):
text = _process_inline(stripped)
html_parts.append(f'<p style="font-size:12px;color:#888">{text}</p>')
continue
# Regular paragraph
text = _process_inline(stripped)
html_parts.append(f'<p>{text}</p>')
if in_list:
html_parts.append('</ul>')
html_parts.append('</div>')
return '\n'.join(html_parts)
def _process_inline(text: str) -> str:
"""Process inline markdown (bold, links, code) with HTML escaping."""
# First escape everything
result = escape(text)
# Restore bold: **text** β <strong>text</strong>
result = re.sub(
r'\*\*(.+?)\*\*',
r'<strong>\1</strong>',
result
)
# Restore inline code: `text` β <code>text</code>
result = re.sub(
r'`(.+?)`',
lambda m: f'<code style="font-size:12px;color:#888;background:#f4f4f4;'
f'padding:2px 6px;border-radius:3px">{m.group(1)}</code>',
result
)
# Restore angle-bracket links: <https://...> β <a href>
def restore_link(m):
url = html.unescape(m.group(1))
if is_safe_url(url):
escaped_url = escape(url)
# Show shortened domain
try:
domain = urlparse(url).netloc
return f'<a href="{escaped_url}" style="color:#0969da;font-size:13px">{escape(domain)}</a>'
except Exception:
return f'<a href="{escaped_url}" style="color:#0969da;font-size:13px">{escaped_url}</a>'
return escape(url)
result = re.sub(r'<(https?://[^&]+?)>', restore_link, result)
# Restore markdown links: [text](url) β already escaped, need to unescape for parsing
def restore_md_link(m):
label = html.unescape(m.group(1))
url = html.unescape(m.group(2))
if is_safe_url(url):
return f'<a href="{escape(url)}" style="color:#0969da">{escape(label)}</a>'
return escape(label)
result = re.sub(r'\[([^\]]+?)\]\(([^)]+?)\)', restore_md_link, result)
return result
def main():
parser = argparse.ArgumentParser(
description="Convert markdown digest to sanitized HTML email"
)
parser.add_argument("--input", "-i", required=True, help="Input markdown file")
parser.add_argument("--output", "-o", required=True, help="Output HTML file")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(levelname)s: %(message)s"
)
try:
with open(args.input, 'r') as f:
md_content = f.read()
except FileNotFoundError:
logging.error(f"Input file not found: {args.input}")
sys.exit(1)
logging.info(f"Converting {args.input} ({len(md_content)} chars)")
html_output = markdown_to_safe_html(md_content)
with open(args.output, 'w') as f:
f.write(html_output)
logging.info(f"Wrote sanitized HTML to {args.output} ({len(html_output)} chars)")
if __name__ == "__main__":
main()
```
### scripts/send-email.py
```python
#!/usr/bin/env python3
"""
Send HTML email with optional PDF attachment via msmtp or sendmail.
Properly constructs MIME multipart message so HTML body renders correctly
even when attachments are included.
Usage:
python3 send-email.py --to [email protected] --subject "Daily Digest" \
--html /tmp/md-email.html [--attach /tmp/md-digest.pdf] [--from "Bot <[email protected]>"]
"""
import argparse
import base64
import subprocess
import sys
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.utils import formatdate
from pathlib import Path
def build_message(subject: str, from_addr: str, to_addrs: list,
html_path: Path, attach_path: Path = None) -> str:
"""Build a proper MIME message with HTML body and optional attachment."""
html_content = html_path.read_text(encoding='utf-8')
if attach_path and attach_path.exists():
# Multipart mixed: HTML body + attachment
msg = MIMEMultipart('mixed')
html_part = MIMEText(html_content, 'html', 'utf-8')
msg.attach(html_part)
pdf_data = attach_path.read_bytes()
pdf_part = MIMEApplication(pdf_data, _subtype='pdf')
pdf_part.add_header('Content-Disposition', 'attachment',
filename=attach_path.name)
msg.attach(pdf_part)
else:
# Simple HTML message
msg = MIMEText(html_content, 'html', 'utf-8')
msg['Subject'] = subject
msg['From'] = from_addr
msg['To'] = ', '.join(to_addrs)
msg['Date'] = formatdate(localtime=True)
return msg.as_string()
def send_via_msmtp(message: str, to_addrs: list) -> bool:
"""Send via msmtp (preferred)."""
try:
result = subprocess.run(
['msmtp', '--read-envelope-from'] + to_addrs,
input=message.encode('utf-8'),
capture_output=True,
timeout=30
)
if result.returncode == 0:
return True
logging.error(f"msmtp failed: {result.stderr.decode()}")
return False
except FileNotFoundError:
logging.debug("msmtp not found")
return False
except Exception as e:
logging.error(f"msmtp error: {e}")
return False
def send_via_sendmail(message: str, to_addrs: list) -> bool:
"""Send via sendmail (fallback)."""
for cmd in ['sendmail', '/usr/sbin/sendmail']:
try:
result = subprocess.run(
[cmd, '-t'] + to_addrs,
input=message.encode('utf-8'),
capture_output=True,
timeout=30
)
if result.returncode == 0:
return True
logging.error(f"{cmd} failed: {result.stderr.decode()}")
except FileNotFoundError:
continue
except Exception as e:
logging.error(f"{cmd} error: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description="Send HTML email with optional PDF attachment",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python3 send-email.py --to [email protected] --subject "Daily Digest" --html /tmp/md-email.html
python3 send-email.py --to [email protected] --to [email protected] --subject "Weekly" --html body.html --attach digest.pdf
python3 send-email.py --to [email protected] --subject "Test" --html body.html --from "Bot <[email protected]>"
"""
)
parser.add_argument('--to', action='append', required=True, help='Recipient email (repeatable)')
parser.add_argument('--subject', '-s', required=True, help='Email subject')
parser.add_argument('--html', required=True, type=Path, help='HTML body file')
parser.add_argument('--attach', type=Path, default=None, help='PDF attachment file')
parser.add_argument('--from', dest='from_addr', default=None, help='From address')
parser.add_argument('--verbose', '-v', action='store_true')
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(levelname)s: %(message)s"
)
if not args.html.exists():
logging.error(f"HTML file not found: {args.html}")
sys.exit(1)
# Expand comma-separated addresses
to_addrs = []
for addr in args.to:
to_addrs.extend([a.strip() for a in addr.split(',') if a.strip()])
from_addr = args.from_addr or 'noreply@localhost'
logging.info(f"Building email: {args.subject} β {', '.join(to_addrs)}")
if args.attach:
logging.info(f"Attachment: {args.attach} ({'exists' if args.attach.exists() else 'MISSING'})")
message = build_message(args.subject, from_addr, to_addrs, args.html, args.attach)
# Try msmtp first, then sendmail
if send_via_msmtp(message, to_addrs):
logging.info("β
Sent via msmtp")
return 0
if send_via_sendmail(message, to_addrs):
logging.info("β
Sent via sendmail")
return 0
logging.error("β All send methods failed")
return 1
if __name__ == "__main__":
sys.exit(main())
```
### scripts/source-health.py
```python
#!/usr/bin/env python3
"""
Source health monitoring for media-news-digest pipeline.
Tracks per-source success/failure history and reports unhealthy sources.
Usage:
python3 source-health.py --rss rss.json --twitter twitter.json --github github.json
"""
import json
import sys
import argparse
import logging
import time
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime
HEALTH_FILE = "/tmp/media-news-digest-source-health.json"
HISTORY_DAYS = 7
FAILURE_THRESHOLD = 0.5 # >50% failure rate triggers warning
def setup_logging(verbose: bool) -> logging.Logger:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, format='%(asctime)s - %(levelname)s - %(message)s')
return logging.getLogger(__name__)
def load_health_data() -> Dict[str, Any]:
try:
with open(HEALTH_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_health_data(data: Dict[str, Any]) -> None:
with open(HEALTH_FILE, 'w') as f:
json.dump(data, f, indent=2)
def load_source_file(path: Optional[Path]) -> list:
if not path or not path.exists():
return []
try:
with open(path, 'r') as f:
data = json.load(f)
return data.get("sources", [])
except (json.JSONDecodeError, OSError):
return []
def load_source_file_flexible(path: Optional[Path]) -> list:
"""Load sources from a JSON file, trying 'sources', 'subreddits', and 'topics' keys."""
if not path or not path.exists():
return []
try:
with open(path, 'r') as f:
data = json.load(f)
# Try standard keys
if "sources" in data:
return data["sources"]
if "subreddits" in data:
return data["subreddits"]
if "topics" in data:
# Create synthetic sources from topic results
synthetic = []
for topic in data["topics"]:
synthetic.append({
"source_id": f"web-{topic.get('topic_id', 'unknown')}",
"name": f"Web: {topic.get('topic_id', 'unknown')}",
"status": topic.get("status", "ok"),
"articles": topic.get("articles", []),
})
return synthetic
return []
except (json.JSONDecodeError, OSError):
return []
def update_health(health: Dict[str, Any], sources: list, now: float) -> None:
cutoff = now - HISTORY_DAYS * 86400
for source in sources:
sid = source.get("source_id", source.get("id", "unknown"))
if sid not in health:
health[sid] = {"name": source.get("name", sid), "checks": []}
# Prune old entries
health[sid]["checks"] = [c for c in health[sid]["checks"] if c["ts"] > cutoff]
health[sid]["checks"].append({
"ts": now,
"ok": source.get("status") == "ok",
})
def report_unhealthy(health: Dict[str, Any], logger: logging.Logger) -> int:
unhealthy = 0
for sid, info in health.items():
checks = info.get("checks", [])
if len(checks) < 2:
continue
failures = sum(1 for c in checks if not c["ok"])
rate = failures / len(checks)
if rate > FAILURE_THRESHOLD:
logger.warning(f"β οΈ Unhealthy source: {info.get('name', sid)} "
f"({failures}/{len(checks)} failures, {rate:.0%} failure rate)")
unhealthy += 1
return unhealthy
def main():
parser = argparse.ArgumentParser(description="Track source health for media-news-digest pipeline.")
parser.add_argument("--rss", type=Path, help="RSS output JSON")
parser.add_argument("--twitter", type=Path, help="Twitter output JSON")
parser.add_argument("--github", type=Path, help="GitHub output JSON")
parser.add_argument("--reddit", type=Path, help="Reddit output JSON")
parser.add_argument("--web", type=Path, help="Web search output JSON")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
logger = setup_logging(args.verbose)
health = load_health_data()
now = time.time()
# Standard sources (use 'sources' key)
for path in [args.rss, args.twitter, args.github]:
sources = load_source_file(path)
if sources:
update_health(health, sources, now)
# Reddit and Web use flexible loading (subreddits/topics keys)
for path in [args.reddit, args.web]:
sources = load_source_file_flexible(path)
if sources:
update_health(health, sources, now)
save_health_data(health)
unhealthy = report_unhealthy(health, logger)
total = len(health)
logger.info(f"π Health check: {total} sources tracked, {unhealthy} unhealthy")
return 0
if __name__ == "__main__":
sys.exit(main())
```
### scripts/summarize-merged.py
```python
#!/usr/bin/env python3
"""
Print a human-readable summary of merged JSON data for LLM consumption.
Usage:
python3 summarize-merged.py [--input /tmp/md-merged.json] [--top N] [--topic TOPIC]
"""
import json
import argparse
from pathlib import Path
def summarize(data: dict, top_n: int = 10, topic_filter: str = None):
"""Print structured summary of merged data."""
# Metadata
meta = data.get("output_stats", {})
print(f"=== Merged Data Summary ===")
print(f"Total articles: {meta.get('total_articles', '?')}")
print(f"Topics: {', '.join(data.get('topics', {}).keys())}")
print()
topics = data.get("topics", {})
for topic_id, topic_data in topics.items():
if topic_filter and topic_id != topic_filter:
continue
articles = topic_data.get("articles", [])
if not isinstance(articles, list):
continue
print(f"=== {topic_id} ({len(articles)} articles) ===")
# Sort by quality_score descending
sorted_articles = sorted(
[a for a in articles if isinstance(a, dict)],
key=lambda a: a.get("quality_score", 0),
reverse=True
)
for i, a in enumerate(sorted_articles[:top_n]):
title = a.get("title", "?")[:100]
source = a.get("source_name", "?")
source_type = a.get("source_type", "?")
qs = a.get("quality_score", 0)
link = a.get("link") or a.get("reddit_url") or a.get("external_url", "")
snippet = (a.get("snippet") or a.get("summary") or "")[:150]
# Metrics for Twitter
metrics = a.get("metrics", {})
display_name = a.get("display_name", "")
print(f"\n [{i+1}] ({qs:.0f}pts) [{source_type}] {title}")
print(f" Source: {source}", end="")
if display_name:
print(f" ({display_name})", end="")
print()
if link:
print(f" Link: {link}")
if snippet:
print(f" Snippet: {snippet}")
if metrics:
parts = []
for k, v in metrics.items():
if v and v > 0:
parts.append(f"{k}={v}")
if parts:
print(f" Metrics: {', '.join(parts)}")
# Reddit-specific
reddit_score = a.get("score")
num_comments = a.get("num_comments")
if reddit_score is not None:
print(f" Reddit: {reddit_score}β", end="")
if num_comments:
print(f" Β· {num_comments} comments", end="")
print()
print()
def main():
parser = argparse.ArgumentParser(description="Summarize merged JSON for LLM consumption")
parser.add_argument("--input", "-i", type=Path, default=Path("/tmp/md-merged.json"))
parser.add_argument("--top", "-n", type=int, default=10, help="Top N articles per topic")
parser.add_argument("--topic", "-t", type=str, default=None, help="Filter to specific topic")
args = parser.parse_args()
if not args.input.exists():
print(f"Error: {args.input} not found. Run the pipeline first.")
return
with open(args.input) as f:
data = json.load(f)
summarize(data, top_n=args.top, topic_filter=args.topic)
if __name__ == "__main__":
main()
```
### scripts/test-pipeline.sh
```bash
#!/bin/bash
# Pipeline smoke test β runs fetch steps with filtering, validates outputs
# Usage:
# ./test-pipeline.sh # run all sources
# ./test-pipeline.sh --only twitter,rss # only these source types
# ./test-pipeline.sh --skip web # skip web search
# ./test-pipeline.sh --topics crypto # only sources with these topics
# ./test-pipeline.sh --ids sama-twitter,openai-rss # specific source IDs
# ./test-pipeline.sh --hours 12 # custom time window
# ./test-pipeline.sh --keep # keep output dir after test
# ./test-pipeline.sh --twitter-backend twitterapiio # force twitter backend
set -e
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
DEFAULTS="$SCRIPT_DIR/../config/defaults"
OUTDIR=$(mktemp -d /tmp/media-digest-test-XXXXXX)
PASSED=0
SKIPPED=0
FAILED=0
HOURS=24
KEEP=false
ONLY=""
SKIP=""
TOPICS=""
IDS=""
TWITTER_BACKEND=""
VERBOSE=""
CONFIG=""
# ββ Parse args ββ
while [[ $# -gt 0 ]]; do
case "$1" in
--only) ONLY="$2"; shift 2 ;;
--skip) SKIP="$2"; shift 2 ;;
--topics) TOPICS="$2"; shift 2 ;;
--ids) IDS="$2"; shift 2 ;;
--hours) HOURS="$2"; shift 2 ;;
--keep) KEEP=true; shift ;;
--twitter-backend|--backend) TWITTER_BACKEND="$2"; shift 2 ;;
--config) CONFIG="$2"; shift 2 ;;
--verbose|-v) VERBOSE="--verbose"; shift ;;
--help|-h)
cat <<'HELP'
Pipeline smoke test β runs fetch steps with filtering, merges, and validates outputs.
USAGE:
./test-pipeline.sh [OPTIONS]
OPTIONS:
--only TYPES Only run these source types (comma-separated)
Values: rss, twitter, github, reddit, web
Example: --only twitter,rss
--skip TYPES Skip these source types (comma-separated)
Values: rss, twitter, github, reddit, web
Example: --skip web,reddit
--topics TOPICS Only include sources matching these topics (comma-separated)
Values: llm, ai-agent, frontier-tech, crypto
Example: --topics crypto,llm
--ids IDS Only include specific source IDs (comma-separated)
IDs are defined in config/defaults/sources.json
Example: --ids sama-twitter,openai-rss,vitalik-twitter
--hours N Time window for fetching articles (default: 24)
Example: --hours 48
--twitter-backend NAME
Force a specific Twitter API backend
Values: official, twitterapiio, auto
official = X API v2 (needs X_BEARER_TOKEN)
twitterapiio = twitterapi.io (needs TWITTERAPI_IO_KEY)
auto = try twitterapiio first, fallback to official
--config DIR User config overlay directory (optional)
Example: --config workspace/config
--verbose, -v Enable verbose logging for fetch scripts
--keep Keep output directory after test (default: clean up on success)
--help, -h Show this help message
EXAMPLES:
./test-pipeline.sh # full pipeline, all sources
./test-pipeline.sh --only twitter --twitter-backend twitterapiio # twitter only via twitterapi.io
./test-pipeline.sh --topics crypto --hours 48 --keep # crypto sources, 48h window
./test-pipeline.sh --skip web,reddit -v # skip web+reddit, verbose
./test-pipeline.sh --ids sama-twitter,karpathy-twitter --only twitter
ENVIRONMENT:
X_BEARER_TOKEN Official X API v2 bearer token (for --backend official)
TWITTERAPI_IO_KEY twitterapi.io API key (for --backend twitterapiio)
TWITTER_API_BACKEND Default twitter backend if --backend not given (official|twitterapiio|auto)
BRAVE_API_KEY Brave Search API key (for web fetch)
GITHUB_TOKEN GitHub token (optional, increases GitHub API rate limits)
HELP
exit 0
;;
*) echo "Unknown option: $1"; exit 1 ;;
esac
done
# ββ Helpers ββ
should_run() {
local type="$1"
# Check --only filter
if [ -n "$ONLY" ]; then
echo ",$ONLY," | grep -qi ",$type," || return 1
fi
# Check --skip filter
if [ -n "$SKIP" ]; then
echo ",$SKIP," | grep -qi ",$type," && return 1
fi
return 0
}
run_step() {
local name="$1"; shift
local start=$(date +%s)
if "$@" 2>&1; then
local elapsed=$(( $(date +%s) - start ))
echo "β
$name (${elapsed}s)"
PASSED=$((PASSED + 1))
else
local code=$?
local elapsed=$(( $(date +%s) - start ))
echo "β $name (exit $code, ${elapsed}s)"
FAILED=$((FAILED + 1))
fi
}
validate_json() {
local file="$1" name="$2"
if [ -f "$file" ] && python3 -c "
import json, sys
d = json.load(open(sys.argv[1]))
# Print summary stats
if 'sources' in d and isinstance(d['sources'], list):
ok = sum(1 for s in d['sources'] if s.get('status') == 'ok')
total = len(d['sources'])
articles = sum(s.get('count', len(s.get('articles', []))) for s in d['sources'])
print(f' π {ok}/{total} sources ok, {articles} articles')
elif 'topics' in d:
topics = d['topics']
if isinstance(topics, dict):
total = sum(len(t.get('articles', [])) for t in topics.values())
print(f' π {len(topics)} topics, {total} articles')
elif isinstance(topics, list):
total = sum(len(t.get('articles', [])) for t in topics)
print(f' π {len(topics)} topics, {total} articles')
" "$file" 2>/dev/null; then
echo "β
$name JSON valid"
PASSED=$((PASSED + 1))
else
echo "β $name JSON invalid or missing"
FAILED=$((FAILED + 1))
fi
}
# ββ Generate filtered sources if --topics or --ids specified ββ
EXTRA_ARGS=()
if [ -n "$TOPICS" ] || [ -n "$IDS" ]; then
FILTER_CONFIG="$OUTDIR/filter-config"
mkdir -p "$FILTER_CONFIG"
python3 -c "
import json, sys
topics_filter = '${TOPICS}'.split(',') if '${TOPICS}' else []
ids_filter = '${IDS}'.split(',') if '${IDS}' else []
d = json.load(open('${DEFAULTS}/sources.json'))
filtered = []
for s in d['sources']:
if ids_filter and s['id'] not in ids_filter:
continue
if topics_filter and not any(t in s.get('topics', []) for t in topics_filter):
continue
filtered.append(s)
d['sources'] = filtered
print(f'Filtered: {len(filtered)} sources', file=sys.stderr)
json.dump(d, open('${FILTER_CONFIG}/sources.json', 'w'), indent=2)
" 2>&1
DEFAULTS="$FILTER_CONFIG"
fi
if [ -n "$CONFIG" ]; then
EXTRA_ARGS+=("--config" "$CONFIG")
fi
if [ -n "$VERBOSE" ]; then
EXTRA_ARGS+=("$VERBOSE")
fi
echo "π§ͺ Pipeline Test (hours=$HOURS, outdir=$OUTDIR)"
echo " Sources: $(python3 -c "import json; d=json.load(open('${DEFAULTS}/sources.json')); types={}
for s in d['sources']: t=s['type']; types[t]=types.get(t,0)+1
print(' | '.join(f'{t}:{n}' for t,n in sorted(types.items())))" 2>/dev/null)"
echo ""
# ββ Fetch steps ββ
# RSS
if should_run "rss"; then
run_step "fetch-rss" python3 "$SCRIPT_DIR/fetch-rss.py" --defaults "$DEFAULTS" --hours "$HOURS" --output "$OUTDIR/rss.json" --force "${EXTRA_ARGS[@]}"
validate_json "$OUTDIR/rss.json" "rss"
else
echo "β fetch-rss (skipped)"
SKIPPED=$((SKIPPED + 1))
fi
# GitHub
if should_run "github"; then
run_step "fetch-github" python3 "$SCRIPT_DIR/fetch-github.py" --defaults "$DEFAULTS" --hours "$HOURS" --output "$OUTDIR/github.json" --force "${EXTRA_ARGS[@]}"
validate_json "$OUTDIR/github.json" "github"
else
echo "β fetch-github (skipped)"
SKIPPED=$((SKIPPED + 1))
fi
# Twitter
if should_run "twitter"; then
TWITTER_ARGS=("--defaults" "$DEFAULTS" "--hours" "$HOURS" "--output" "$OUTDIR/twitter.json" "--force" "${EXTRA_ARGS[@]}")
[ -n "$TWITTER_BACKEND" ] && TWITTER_ARGS+=("--backend" "$TWITTER_BACKEND")
if [ -n "$X_BEARER_TOKEN" ] || [ -n "$TWITTERAPI_IO_KEY" ]; then
run_step "fetch-twitter" python3 "$SCRIPT_DIR/fetch-twitter.py" "${TWITTER_ARGS[@]}"
validate_json "$OUTDIR/twitter.json" "twitter"
else
echo "β fetch-twitter (no X_BEARER_TOKEN or TWITTERAPI_IO_KEY)"
SKIPPED=$((SKIPPED + 1))
fi
else
echo "β fetch-twitter (skipped)"
SKIPPED=$((SKIPPED + 1))
fi
# Reddit
if should_run "reddit"; then
if [ -f "$SCRIPT_DIR/fetch-reddit.py" ]; then
run_step "fetch-reddit" python3 "$SCRIPT_DIR/fetch-reddit.py" --defaults "$DEFAULTS" --hours "$HOURS" --output "$OUTDIR/reddit.json" --force "${EXTRA_ARGS[@]}"
validate_json "$OUTDIR/reddit.json" "reddit"
else
echo "β fetch-reddit (script not found)"
SKIPPED=$((SKIPPED + 1))
fi
else
echo "β fetch-reddit (skipped)"
SKIPPED=$((SKIPPED + 1))
fi
# Web search
if should_run "web"; then
if [ -n "$BRAVE_API_KEY" ]; then
run_step "fetch-web" python3 "$SCRIPT_DIR/fetch-web.py" --defaults "$DEFAULTS" --freshness pd --output "$OUTDIR/web.json" --force "${EXTRA_ARGS[@]}"
validate_json "$OUTDIR/web.json" "web"
else
echo "β fetch-web (no BRAVE_API_KEY)"
SKIPPED=$((SKIPPED + 1))
fi
else
echo "β fetch-web (skipped)"
SKIPPED=$((SKIPPED + 1))
fi
# ββ Merge ββ
MERGE_ARGS=("--output" "$OUTDIR/merged.json")
[ -f "$OUTDIR/rss.json" ] && MERGE_ARGS+=("--rss" "$OUTDIR/rss.json")
[ -f "$OUTDIR/twitter.json" ] && MERGE_ARGS+=("--twitter" "$OUTDIR/twitter.json")
[ -f "$OUTDIR/web.json" ] && MERGE_ARGS+=("--web" "$OUTDIR/web.json")
[ -f "$OUTDIR/github.json" ] && MERGE_ARGS+=("--github" "$OUTDIR/github.json")
[ -f "$OUTDIR/reddit.json" ] && MERGE_ARGS+=("--reddit" "$OUTDIR/reddit.json")
if [ ${#MERGE_ARGS[@]} -gt 2 ]; then
run_step "merge-sources" python3 "$SCRIPT_DIR/merge-sources.py" "${MERGE_ARGS[@]}"
validate_json "$OUTDIR/merged.json" "merged"
# Validate merged structure
if python3 -c "
import json, sys
d = json.load(open(sys.argv[1]))
assert 'topics' in d and 'output_stats' in d
stats = d['output_stats']
print(f' π Merged: {stats.get(\"total_articles\", \"?\")} articles across {len(d[\"topics\"])} topics')
" "$OUTDIR/merged.json" 2>/dev/null; then
echo "β
merged structure valid"
PASSED=$((PASSED + 1))
else
echo "β merged structure invalid"
FAILED=$((FAILED + 1))
fi
else
echo "β merge (no source files to merge)"
SKIPPED=$((SKIPPED + 1))
fi
# ββ Summary ββ
echo ""
echo "ββββββββββββββββββββββββββββββββββ"
echo "π Results: $PASSED passed, $FAILED failed, $SKIPPED skipped"
echo " Output: $OUTDIR"
if [ "$KEEP" = false ] && [ "$FAILED" -eq 0 ]; then
rm -rf "$OUTDIR"
echo " (cleaned up β use --keep to preserve)"
fi
[ "$FAILED" -eq 0 ] && exit 0 || exit 1
```
### scripts/validate-config.py
```python
#!/usr/bin/env python3
"""
Configuration validation script for media-news-digest.
Validates sources.json and topics.json against JSON Schema and performs
additional consistency checks.
Usage:
python3 validate-config.py [--defaults DEFAULTS_DIR] [--config CONFIG_DIR] [--verbose]
"""
import json
import argparse
import logging
import sys
import os
from pathlib import Path
from typing import Dict, Any, Set
try:
import jsonschema
from jsonschema import validate, ValidationError
HAS_JSONSCHEMA = True
except ImportError:
HAS_JSONSCHEMA = False
def setup_logging(verbose: bool) -> logging.Logger:
"""Setup logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger(__name__)
def load_json_file(file_path: Path) -> Dict[str, Any]:
"""Load and parse JSON file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"Config file not found: {file_path}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in {file_path}: {e}")
def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any],
config_type: str) -> bool:
"""Validate data against JSON schema."""
if not HAS_JSONSCHEMA:
logging.warning("jsonschema not available, skipping schema validation")
return True
try:
# Extract the relevant schema definition
if config_type == "sources":
schema_def = {
"type": "object",
"required": ["sources"],
"properties": {
"sources": {
"type": "array",
"items": schema["definitions"]["source"]
}
}
}
elif config_type == "topics":
schema_def = {
"type": "object",
"required": ["topics"],
"properties": {
"topics": {
"type": "array",
"items": schema["definitions"]["topic"]
}
}
}
else:
raise ValueError(f"Unknown config type: {config_type}")
validate(instance=data, schema=schema_def)
logging.info(f"β
{config_type}.json passed schema validation")
return True
except ValidationError as e:
logging.error(f"β Schema validation failed for {config_type}.json:")
logging.error(f" Path: {' -> '.join(str(p) for p in e.absolute_path)}")
logging.error(f" Error: {e.message}")
return False
def validate_sources_consistency(sources_data: Dict[str, Any],
topics_data: Dict[str, Any]) -> bool:
"""Validate consistency between sources and topics."""
errors = []
# Get valid topic IDs
valid_topics = {topic["id"] for topic in topics_data["topics"]}
logging.debug(f"Valid topic IDs: {valid_topics}")
# Check source topic references
for source in sources_data["sources"]:
source_id = source.get("id", "unknown")
source_topics = set(source.get("topics", []))
# Check for invalid topic references
invalid_topics = source_topics - valid_topics
if invalid_topics:
errors.append(f"Source '{source_id}' references invalid topics: {invalid_topics}")
# Check for empty topic lists
if not source_topics:
errors.append(f"Source '{source_id}' has no topics assigned")
# Check for duplicate source IDs
source_ids = [source.get("id") for source in sources_data["sources"]]
duplicates = {id for id in source_ids if source_ids.count(id) > 1}
if duplicates:
errors.append(f"Duplicate source IDs found: {duplicates}")
# Check for duplicate topic IDs
topic_ids = [topic.get("id") for topic in topics_data["topics"]]
duplicates = {id for id in topic_ids if topic_ids.count(id) > 1}
if duplicates:
errors.append(f"Duplicate topic IDs found: {duplicates}")
if errors:
logging.error("β Consistency validation failed:")
for error in errors:
logging.error(f" {error}")
return False
else:
logging.info("β
Consistency validation passed")
return True
def validate_source_types(sources_data: Dict[str, Any]) -> bool:
"""Validate source-type specific requirements."""
errors = []
for source in sources_data["sources"]:
source_id = source.get("id", "unknown")
source_type = source.get("type")
if source_type == "rss":
if not source.get("url"):
errors.append(f"RSS source '{source_id}' missing required 'url' field")
elif source_type == "twitter":
if not source.get("handle"):
errors.append(f"Twitter source '{source_id}' missing required 'handle' field")
elif source_type == "github":
if not source.get("repo"):
errors.append(f"GitHub source '{source_id}' missing required 'repo' field")
elif source_type == "reddit":
if not source.get("subreddit"):
errors.append(f"Reddit source '{source_id}' missing required 'subreddit' field")
elif source_type == "web":
# Web sources are handled by topics, no specific validation needed
pass
else:
errors.append(f"Source '{source_id}' has invalid type: {source_type}")
if errors:
logging.error("β Source type validation failed:")
for error in errors:
logging.error(f" {error}")
return False
else:
logging.info("β
Source type validation passed")
return True
def main():
"""Main validation function."""
parser = argparse.ArgumentParser(
description="Validate media-news-digest configuration files",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 validate-config.py
python3 validate-config.py --defaults config/defaults --config workspace/config --verbose
python3 validate-config.py --config workspace/config --verbose # backward compatibility
"""
)
parser.add_argument(
"--defaults",
type=Path,
default=Path("config/defaults"),
help="Default configuration directory with skill defaults (default: config/defaults)"
)
parser.add_argument(
"--config",
type=Path,
help="User configuration directory for overlays (optional)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
logger = setup_logging(args.verbose)
# Load config_loader for merged configurations
try:
from config_loader import load_merged_sources, load_merged_topics
except ImportError:
# Fallback for relative import
import sys
sys.path.append(str(Path(__file__).parent))
from config_loader import load_merged_sources, load_merged_topics
# File paths
schema_path = Path("config/schema.json")
if args.config:
logger.info(f"Validating merged configuration: defaults={args.defaults}, config={args.config}")
else:
logger.info(f"Validating default configuration: {args.defaults}")
try:
# Backward compatibility: if only --config provided, use old behavior
if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
logger.debug("Backward compatibility mode: using --config as sole source")
defaults_dir = args.config
config_dir = None
else:
defaults_dir = args.defaults
config_dir = args.config
# Load schema
schema = load_json_file(schema_path)
logger.debug("Loaded schema.json")
# Load merged configuration data
merged_sources = load_merged_sources(defaults_dir, config_dir)
merged_topics = load_merged_topics(defaults_dir, config_dir)
# Convert to the format expected by validation functions
sources_data = {"sources": merged_sources}
topics_data = {"topics": merged_topics}
logger.debug(f"Loaded {len(merged_sources)} merged sources, {len(merged_topics)} merged topics")
# Perform validations
all_valid = True
# Schema validation
all_valid &= validate_against_schema(sources_data, schema, "sources")
all_valid &= validate_against_schema(topics_data, schema, "topics")
# Consistency validation
all_valid &= validate_sources_consistency(sources_data, topics_data)
# Source type validation
all_valid &= validate_source_types(sources_data)
# Summary
if all_valid:
logger.info("π All validations passed!")
return 0
else:
logger.error("π₯ Validation failed!")
return 1
except Exception as e:
logger.error(f"π₯ Validation error: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())
```