Back to skills
SkillHub ClubAnalyze Data & AIFull StackData / AI

media-news-digest

Generate media & entertainment industry news digests. Covers Hollywood trades (THR, Deadline, Variety), box office, streaming, awards season, film festivals, and production news. Four-source data collection from RSS feeds, Twitter/X KOLs, Reddit, and web search. Pipeline-based scripts with retry mechanisms and deduplication. Supports Discord and email output with PDF attachments.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,087
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
C64.8

Install command

npx @skill-hub/cli install openclaw-skills-media-news-digest

Repository

openclaw/skills

Skill path: skills/dinstein/media-news-digest

Generate media & entertainment industry news digests. Covers Hollywood trades (THR, Deadline, Variety), box office, streaming, awards season, film festivals, and production news. Four-source data collection from RSS feeds, Twitter/X KOLs, Reddit, and web search. Pipeline-based scripts with retry mechanisms and deduplication. Supports Discord and email output with PDF attachments.

Open repository

Best for

Primary workflow: Analyze Data & AI.

Technical facets: Full Stack, Data / AI.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install media-news-digest into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding media-news-digest to shared team environments
  • Use media-news-digest for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: media-news-digest
description: Generate media & entertainment industry news digests. Covers Hollywood trades (THR, Deadline, Variety), box office, streaming, awards season, film festivals, and production news. Four-source data collection from RSS feeds, Twitter/X KOLs, Reddit, and web search. Pipeline-based scripts with retry mechanisms and deduplication. Supports Discord and email output with PDF attachments.
version: "2.1.1"
homepage: https://github.com/draco-agent/media-news-digest
source: https://github.com/draco-agent/media-news-digest
metadata:
  openclaw:
    requires:
      bins: ["python3"]
    optionalBins: ["mail", "msmtp"]
    credentialAccess: >
      This skill does NOT read, store, or manage any platform credentials itself.
      Email delivery uses send-email.py with system mail (msmtp). Twitter and web search
      API keys are passed via environment variables and used only for outbound API calls.
      No credentials are written to disk by this skill.
env:
  - name: X_BEARER_TOKEN
    required: false
    description: Twitter/X API v2 bearer token for KOL monitoring (official backend)
  - name: TWITTERAPI_IO_KEY
    required: false
    description: twitterapi.io API key (alternative Twitter backend)
  - name: TWITTER_API_BACKEND
    required: false
    description: "Twitter backend selection: official, twitterapiio, or auto (default: auto)"
  - name: BRAVE_API_KEY
    required: false
    description: Brave Search API key for web search (single key)
  - name: BRAVE_API_KEYS
    required: false
    description: "Comma-separated Brave API keys for multi-key rotation (preferred over BRAVE_API_KEY)"
  - name: TAVILY_API_KEY
    required: false
    description: Tavily Search API key (alternative web search backend)
---

# Media News Digest

Automated media & entertainment industry news digest system. Covers Hollywood trades, box office, streaming platforms, awards season, film festivals, production news, and industry deals.

## Quick Start

1. **Generate Digest** (unified pipeline β€” runs all 4 sources in parallel):
   ```bash
   python3 scripts/run-pipeline.py \
     --defaults <SKILL_DIR>/config/defaults \
     --config <WORKSPACE>/config \
     --hours 48 --freshness pd \
     --archive-dir <WORKSPACE>/archive/media-news-digest/ \
     --output /tmp/md-merged.json --verbose --force
   ```

2. **Use Templates**: Apply Discord or email templates to merged output

## Data Sources (65 total, 64 enabled)

- **RSS Feeds (36, 35 enabled)**: THR, Deadline, Variety, IndieWire, The Wrap, Collider, Vulture, Awards Daily, Gold Derby, Screen Rant, Empire, The Playlist, /Film, Entertainment Weekly, Roger Ebert, CinemaBlend, Den of Geek, The Direct, MovieWeb, CBR, What's on Netflix, Decider, Anime News Network, and more
- **Twitter/X KOLs (18)**: @THR, @DEADLINE, @Variety, @FilmUpdates, @DiscussingFilm, @BoxOfficeMojo, @MattBelloni, @Borys_Kit, @TheAcademy, @letterboxd, @A24, and more
- **Reddit (11)**: r/movies, r/boxoffice, r/television, r/Oscars, r/TrueFilm, r/entertainment, r/netflix, r/marvelstudios, r/DC_Cinematic, r/anime, r/flicks
- **Web Search (9 topics)**: Brave Search / Tavily with freshness filters

## Topics (9 sections)

- πŸ‡¨πŸ‡³ China / 中国影视 β€” China mainland box office, Chinese films, Chinese streaming
- 🎬 Production / εˆΆδ½œεŠ¨ζ€ β€” New projects, casting, filming updates
- πŸ’° Deals & Business / θ‘ŒδΈšδΊ€ζ˜“ β€” M&A, rights, talent deals
- 🎞️ Upcoming Releases / εŒ—ηΎŽθΏ‘ζœŸδΈŠζ˜  β€” Theater openings, release dates, trailers
- 🎟️ Box Office / η₯¨ζˆΏ β€” NA/global box office, opening weekends
- πŸ“Ί Streaming / 桁εͺ’体 β€” Netflix, Disney+, Apple TV+, HBO, viewership
- πŸ† Awards / 钁ε₯–ε­£ β€” Oscars, Golden Globes, Emmys, BAFTAs
- πŸŽͺ Film Festivals / η”΅ε½±θŠ‚ β€” Cannes, Venice, TIFF, Sundance, Berlin
- ⭐ Reviews & Buzz / 影评口璑 β€” Critical reception, RT/Metacritic scores

## Scripts Pipeline

### Unified Pipeline
```bash
python3 scripts/run-pipeline.py \
  --defaults config/defaults --config workspace/config \
  --hours 48 --freshness pd \
  --archive-dir workspace/archive/media-news-digest/ \
  --output /tmp/md-merged.json --verbose --force
```
- **Features**: Runs all 4 fetch steps in parallel, then merges + deduplicates + scores
- **Output**: Final merged JSON ready for report generation (~30s total)
- **Flags**: `--skip rss,twitter` to skip steps, `--enrich` for full-text enrichment

### Individual Scripts
- `fetch-rss.py` β€” Parallel RSS fetcher (10 workers, 30s timeout, caching)
- `fetch-twitter.py` β€” Dual backend: official X API v2 + twitterapi.io (auto fallback, 3-worker concurrency)
- `fetch-web.py` β€” Web search via Brave (multi-key rotation) or Tavily
- `fetch-reddit.py` β€” Reddit public JSON API (4 workers, no auth)
- `merge-sources.py` β€” Quality scoring, URL dedup, multi-source merging
- `summarize-merged.py` β€” Structured overview sorted by quality_score
- `enrich-articles.py` β€” Full-text enrichment for top articles
- `generate-pdf.py` β€” PDF generation with Chinese typography + emoji
- `send-email.py` β€” MIME email with HTML body + PDF attachment
- `sanitize-html.py` β€” XSS-safe markdown to HTML conversion
- `validate-config.py` β€” Configuration validator
- `source-health.py` β€” Source health tracking
- `config_loader.py` β€” Config overlay loader (defaults + user overrides)
- `test-pipeline.sh` β€” Pipeline testing with --only/--skip/--twitter-backend filters

## Cron Integration

Reference `references/digest-prompt.md` in cron prompts.

### Daily Digest
```
MODE = daily, FRESHNESS = pd, RSS_HOURS = 48
```

### Weekly Digest
```
MODE = weekly, FRESHNESS = pw, RSS_HOURS = 168
```

## Dependencies

All scripts work with **Python 3.8+ standard library only**. `feedparser` optional but recommended.


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/run-pipeline.py

```python
#!/usr/bin/env python3
"""
Unified data collection pipeline for media-news-digest.

Runs all 5 fetch steps (RSS, Twitter, GitHub, Reddit, Web) in parallel,
then merges + deduplicates + scores into a single output JSON.

Replaces the agent's sequential 6-step tool-call loop with one command,
eliminating ~60-120s of LLM round-trip overhead.

Usage:
    python3 run-pipeline.py \
      --defaults <SKILL_DIR>/config/defaults \
      --config <WORKSPACE>/config \
      --hours 48 --freshness pd \
      --archive-dir <WORKSPACE>/archive/media-news-digest/ \
      --output /tmp/md-merged.json \
      --verbose
"""

import json
import sys
import os
import subprocess
import time
import argparse
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, Any

SCRIPTS_DIR = Path(__file__).parent
DEFAULT_TIMEOUT = 180  # per-step timeout in seconds


def setup_logging(verbose: bool) -> logging.Logger:
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format="%(asctime)s [%(levelname)s] %(message)s",
        datefmt="%H:%M:%S",
    )
    return logging.getLogger(__name__)


def run_step(
    name: str,
    script: str,
    args_list: list,
    output_path: Path,
    timeout: int = DEFAULT_TIMEOUT,
    force: bool = False,
) -> Dict[str, Any]:
    """Run a fetch script as a subprocess, return result metadata."""
    t0 = time.time()
    cmd = [sys.executable, str(SCRIPTS_DIR / script)] + args_list + [
        "--output", str(output_path),
    ]
    if force:
        cmd.append("--force")

    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            env=os.environ,
        )
        elapsed = time.time() - t0
        ok = result.returncode == 0

        # Try to read output stats
        count = 0
        if ok and output_path.exists():
            try:
                with open(output_path) as f:
                    data = json.load(f)
                count = (
                    data.get("total_articles")
                    or data.get("total_posts")
                    or data.get("total_releases")
                    or data.get("total_results")
                    or data.get("total")
                    or 0
                )
            except (json.JSONDecodeError, OSError):
                pass

        return {
            "name": name,
            "status": "ok" if ok else "error",
            "elapsed_s": round(elapsed, 1),
            "count": count,
            "stderr_tail": (result.stderr or "").strip().split("\n")[-3:] if not ok else [],
        }

    except subprocess.TimeoutExpired:
        elapsed = time.time() - t0
        return {
            "name": name,
            "status": "timeout",
            "elapsed_s": round(elapsed, 1),
            "count": 0,
            "stderr_tail": [f"Killed after {timeout}s"],
        }
    except Exception as e:
        elapsed = time.time() - t0
        return {
            "name": name,
            "status": "error",
            "elapsed_s": round(elapsed, 1),
            "count": 0,
            "stderr_tail": [str(e)],
        }


def main() -> int:
    parser = argparse.ArgumentParser(
        description="Run the full media-news-digest data pipeline in one shot.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("--defaults", type=Path, required=True, help="Skill defaults config dir")
    parser.add_argument("--config", type=Path, default=None, help="User config overlay dir")
    parser.add_argument("--hours", type=int, default=48, help="Time window in hours")
    parser.add_argument("--freshness", type=str, default="pd", help="Web search freshness (pd/pw/pm)")
    parser.add_argument("--archive-dir", type=Path, default=None, help="Archive dir for dedup penalty")
    parser.add_argument("--output", "-o", type=Path, default=Path("/tmp/md-merged.json"), help="Final merged output")
    parser.add_argument("--step-timeout", type=int, default=DEFAULT_TIMEOUT, help="Per-step timeout (seconds)")
    parser.add_argument("--twitter-backend", choices=["official", "twitterapiio", "auto"], default=None, help="Twitter API backend to use")
    parser.add_argument("--verbose", "-v", action="store_true")
    parser.add_argument("--force", action="store_true", help="Force re-fetch ignoring caches")
    parser.add_argument("--enrich", action="store_true", help="Enable full-text enrichment for top articles")
    parser.add_argument("--skip", type=str, default="", help="Comma-separated list of steps to skip (rss,twitter,github,reddit,web)")
    parser.add_argument("--reuse-dir", type=Path, default=None, help="Reuse existing intermediate directory instead of creating new one")

    args = parser.parse_args()
    logger = setup_logging(args.verbose)

    # Parse --skip into a set
    skip_steps = set(s.strip().lower() for s in args.skip.split(',') if s.strip())

    # Intermediate output paths
    import tempfile
    if args.reuse_dir:
        _run_dir = str(args.reuse_dir)
        os.makedirs(_run_dir, exist_ok=True)
    else:
        _run_dir = tempfile.mkdtemp(prefix="td-pipeline-")
    tmp_rss = Path(_run_dir) / "rss.json"
    tmp_twitter = Path(_run_dir) / "twitter.json"
    tmp_github = Path(_run_dir) / "github.json"
    tmp_trending = Path(_run_dir) / "trending.json"
    tmp_reddit = Path(_run_dir) / "reddit.json"
    tmp_web = Path(_run_dir) / "web.json"
    logger.info(f"πŸ“ Run directory: {_run_dir}")

    # Common args for all fetch scripts
    common = ["--defaults", str(args.defaults)]
    if args.config:
        common += ["--config", str(args.config)]
    common += ["--hours", str(args.hours)]
    verbose_flag = ["--verbose"] if args.verbose else []

    # Define the 5 parallel fetch steps
    steps = [
        ("RSS", "fetch-rss.py", common + verbose_flag, tmp_rss),
        ("Twitter", "fetch-twitter.py", common + verbose_flag + (["--backend", args.twitter_backend] if args.twitter_backend else []), tmp_twitter),
        ("GitHub", "fetch-github.py", common + verbose_flag, tmp_github),
        ("GitHub Trending", "fetch-github.py", ["--trending", "--hours", str(args.hours)] + verbose_flag, tmp_trending),
        ("Reddit", "fetch-reddit.py", common + verbose_flag, tmp_reddit),
        ("Web", "fetch-web.py",
         ["--defaults", str(args.defaults)]
         + (["--config", str(args.config)] if args.config else [])
         + ["--freshness", args.freshness]
         + verbose_flag,
         tmp_web),
    ]

    # Filter steps by --skip and --reuse-dir
    active_steps = []
    for name, script, step_args, out_path in steps:
        step_key = name.lower()
        if step_key in skip_steps:
            logger.info(f"  ⏭️  {name}: skipped (--skip)")
            continue
        if args.reuse_dir and out_path.exists() and not args.force:
            logger.info(f"  ♻️  {name}: reusing existing {out_path}")
            continue
        active_steps.append((name, script, step_args, out_path))

    logger.info(f"πŸš€ Starting pipeline: {len(active_steps)}/{len(steps)} sources, {args.hours}h window, freshness={args.freshness}")
    t_start = time.time()

    # Phase 1: Parallel fetch
    step_results = []
    if active_steps:
        with ThreadPoolExecutor(max_workers=len(active_steps)) as pool:
            futures = {}
            for name, script, step_args, out_path in active_steps:
                f = pool.submit(run_step, name, script, step_args, out_path, args.step_timeout, args.force)
                futures[f] = name

            for future in as_completed(futures):
                res = future.result()
                step_results.append(res)
                status_icon = {"ok": "βœ…", "error": "❌", "timeout": "⏰"}.get(res["status"], "?")
                logger.info(f"  {status_icon} {res['name']}: {res['count']} items ({res['elapsed_s']}s)")
                if res["status"] != "ok" and res["stderr_tail"]:
                    for line in res["stderr_tail"]:
                        logger.debug(f"    {line}")

    fetch_elapsed = time.time() - t_start
    logger.info(f"πŸ“‘ Fetch phase done in {fetch_elapsed:.1f}s")

    # Phase 2: Merge
    logger.info("πŸ”€ Merging & scoring...")
    merge_args = ["--verbose"] if args.verbose else []
    for flag, path in [("--rss", tmp_rss), ("--twitter", tmp_twitter),
                       ("--github", tmp_github), ("--trending", tmp_trending), ("--reddit", tmp_reddit),
                       ("--web", tmp_web)]:
        if path.exists():
            merge_args += [flag, str(path)]
    if args.archive_dir:
        merge_args += ["--archive-dir", str(args.archive_dir)]
    merge_args += ["--output", str(args.output)]

    merge_result = run_step("Merge", "merge-sources.py", merge_args, args.output, timeout=60, force=False)

    # Phase 3: Enrich high-scoring articles with full text
    if merge_result["status"] == "ok" and args.enrich and "enrich" not in skip_steps:
        logger.info("πŸ“° Enriching top articles with full text...")
        enrich_args = ["--input", str(args.output), "--output", str(args.output)]
        enrich_args += ["--verbose"] if args.verbose else []
        enrich_result = run_step("Enrich", "enrich-articles.py", enrich_args, args.output, timeout=120, force=False)
    else:
        enrich_result = {"name": "Enrich", "status": "skipped", "elapsed_s": 0, "count": 0, "stderr_tail": []}

    total_elapsed = time.time() - t_start

    # Summary
    logger.info(f"{'=' * 50}")
    logger.info(f"πŸ“Š Pipeline Summary ({total_elapsed:.1f}s total)")
    for r in step_results:
        logger.info(f"   {r['name']:10s} {r['status']:7s} {r['count']:4d} items  {r['elapsed_s']:5.1f}s")
    logger.info(f"   {'Merge':10s} {merge_result['status']:7s} {merge_result.get('count',0):4d} items  {merge_result['elapsed_s']:5.1f}s")
    logger.info(f"   Output: {args.output}")

    if merge_result["status"] != "ok":
        logger.error(f"❌ Merge failed: {merge_result['stderr_tail']}")
        return 1

    # Write pipeline metadata alongside output for agent consumption
    meta = {
        "pipeline_version": "1.0.0",
        "total_elapsed_s": round(total_elapsed, 1),
        "fetch_elapsed_s": round(fetch_elapsed, 1),
        "steps": step_results,
        "merge": merge_result,
        "output": str(args.output),
    }
    meta_path = args.output.with_suffix(".meta.json")
    with open(meta_path, "w") as f:
        json.dump(meta, f, indent=2)

    if not args.reuse_dir:
        import shutil
        try:
            shutil.rmtree(_run_dir)
            logger.debug(f"Cleaned up {_run_dir}")
        except Exception:
            pass

    logger.info(f"βœ… Done β†’ {args.output}")
    return 0


if __name__ == "__main__":
    sys.exit(main())

```

### references/digest-prompt.md

```markdown
# Digest Prompt Template

Replace `<...>` placeholders before use. Daily defaults shown; weekly overrides in parentheses.

## Placeholders

| Placeholder | Default | Weekly Override |
|-------------|---------|----------------|
| `<MODE>` | `daily` | `weekly` |
| `<TIME_WINDOW>` | `past 1-2 days` | `past 7 days` |
| `<FRESHNESS>` | `pd` | `pw` |
| `<RSS_HOURS>` | `48` | `168` |
| `<ITEMS_PER_SECTION>` | `3-5` | `5-8` |
| `<BLOG_PICKS_COUNT>` | `2-3` | `3-5` |
| `<EXTRA_SECTIONS>` | *(none)* | `πŸ“Š Weekly Trend Summary` |
| `<SUBJECT>` | `Daily Media Digest - YYYY-MM-DD - 🎬 每ζ—₯影视ζ—₯ζŠ₯` | `Weekly Media Digest - YYYY-MM-DD - 🎬 每周影视周ζŠ₯` |
| `<WORKSPACE>` | Your workspace path | |
| `<SKILL_DIR>` | Installed skill directory | |
| `<DISCORD_CHANNEL_ID>` | Target channel ID | |
| `<EMAIL>` | *(optional)* Recipient email | |
| `<EMAIL_FROM>` | *(optional)* e.g. `MyBot <[email protected]>` | |
| `<LANGUAGE>` | `Chinese` | |
| `<TEMPLATE>` | `discord` / `email` / `markdown` | |
| `<DATE>` | Today's date YYYY-MM-DD (caller provides) | |
| `<VERSION>` | Read from SKILL.md frontmatter | |

---

Generate the <MODE> media & entertainment digest for **<DATE>**. Use `<DATE>` as the report date β€” do NOT infer it.

## Configuration

Read config files (workspace overrides take priority over defaults):
1. **Sources**: `<WORKSPACE>/config/sources.json` β†’ fallback `<SKILL_DIR>/config/defaults/sources.json`
2. **Topics**: `<WORKSPACE>/config/topics.json` β†’ fallback `<SKILL_DIR>/config/defaults/topics.json`

## Context: Previous Report

Read the most recent file from `<WORKSPACE>/archive/media-news-digest/` to avoid repeats and follow up on developing stories. Skip if none exists.

## Data Collection Pipeline

**Use the unified pipeline** (runs all 4 sources in parallel, ~30s):
```bash
python3 <SKILL_DIR>/scripts/run-pipeline.py \
  --defaults <SKILL_DIR>/config/defaults \
  --config <WORKSPACE>/config \
  --hours <RSS_HOURS> --freshness <FRESHNESS> \
  --archive-dir <WORKSPACE>/archive/media-news-digest/ \
  --output /tmp/md-merged.json --verbose --force
```

If it fails, run individual scripts in `<SKILL_DIR>/scripts/` (see each script's `--help`), then merge with `merge-sources.py`.

## Report Generation

Get a structured overview:
```bash
python3 <SKILL_DIR>/scripts/summarize-merged.py --input /tmp/md-merged.json --top <ITEMS_PER_SECTION>
```
Use this output to select articles β€” **do NOT write ad-hoc Python to parse the JSON**. Apply the template from `<SKILL_DIR>/references/templates/<TEMPLATE>.md`.

Select articles **purely by quality_score regardless of source type**. Articles in merged JSON are already sorted by quality_score descending within each topic β€” respect this order. For Reddit posts, append `*[Reddit r/xxx, {{score}}↑]*`.

Each article line must include its quality score using πŸ”₯ prefix. Format: `πŸ”₯{score} | {summary with link}`. This makes scoring transparent and helps readers identify the most important news at a glance.

### Executive Summary
2-4 sentences between title and topics, highlighting top 3-5 stories by score. Concise and punchy, no links. Discord: `> ` blockquote. Email: gray background.

### Topic Sections
From `topics.json`: `emoji` + `label` headers, `<ITEMS_PER_SECTION>` items each.

**⚠️ CRITICAL: Output articles in EXACTLY the same order as summarize-merged.py output (quality_score descending). Do NOT reorder, group by subtopic, or rearrange. The πŸ”₯ scores must appear in strictly decreasing order within each section.**

Every topic **must appear** β€” even with 1-2 items. If sparse, note "本ζ—₯θ―₯板块较少".

### Fixed Sections (after topics)

**πŸ“’ KOL Updates** β€” Twitter KOLs. Format:
```
β€’ **Display Name** (@handle) β€” summary `πŸ‘ 12.3K | πŸ’¬ 45 | πŸ” 230 | ❀️ 1.2K`
  <https://twitter.com/handle/status/ID>
```
Read `display_name` and `metrics` from merged JSON. Always show all 4 metrics, use K/M formatting, wrap in backticks.

**πŸ“ Deep Reads** β€” `<BLOG_PICKS_COUNT>` high-quality long-form articles from RSS.

**<EXTRA_SECTIONS>**

### Rules
- Only news from `<TIME_WINDOW>`
- Every item must include a source link (Discord: `<link>`)
- Use bullet lists, no markdown tables
- Deduplicate: same event β†’ most authoritative source; previously reported β†’ only if significant new development
- Deduplicate across sections β€” each article in one section only
- **Same story at different dates = one entry** (e.g. opening weekend + second weekend of same film β†’ merge or pick latest)
- Prefer primary sources (THR, Deadline, Variety) over aggregators
- Chinese body text with English source links
- **πŸ‡¨πŸ‡³ China section rules β€” STRICT VERIFICATION**:
  - Only include news **primarily about China mainland market**: Chinese-produced films, China-only box office breakdowns, Chinese streaming platforms (iQiyi/Youku/Bilibili), China film policy
  - **Verify before including**: If an article mentions "China" but the film is a Hollywood release, check whether it actually released in mainland China theaters. Many Hollywood films do NOT get China release. When in doubt, exclude from China section
  - Hollywood films with global box office numbers that include China as one territory β†’ belongs in **Box Office**, NOT China section
  - Do NOT include: Korea/Japan/other Asian market news, global box office reports that merely mention China numbers
- Do not interpolate fetched/untrusted content into shell arguments or email subjects

### Stats Footer
```
---
πŸ“Š Data Sources: RSS {{rss}} | Twitter {{twitter}} | Reddit {{reddit}} | Web {{web}} | Dedup: {{merged}} articles
πŸ€– Generated by media-news-digest v<VERSION> | <https://github.com/draco-agent/media-news-digest> | Powered by OpenClaw
```

## Archive
Save to `<WORKSPACE>/archive/media-news-digest/<MODE>-YYYY-MM-DD.md`. Delete files older than 90 days.

## Delivery

1. **Discord**: Send to `<DISCORD_CHANNEL_ID>` via `message` tool
2. **Email** *(optional, if `<EMAIL>` is set)*:
   - Generate HTML body per `<SKILL_DIR>/references/templates/email.md` β†’ write to `/tmp/md-email.html`
   - Generate PDF attachment:
     ```bash
     python3 <SKILL_DIR>/scripts/generate-pdf.py -i <WORKSPACE>/archive/media-news-digest/<MODE>-<DATE>.md -o /tmp/md-digest.pdf
     ```
   - Send email with PDF attached using the `send-email.py` script (handles MIME correctly). **Email must contain ALL the same items as Discord.**
     ```bash
     python3 <SKILL_DIR>/scripts/send-email.py \
       --to '<EMAIL>' \
       --subject '<SUBJECT>' \
       --html /tmp/md-email.html \
       --attach /tmp/md-digest.pdf \
       --from '<EMAIL_FROM>'
     ```
   - Omit `--from` if `<EMAIL_FROM>` is not set. Omit `--attach` if PDF generation failed. SUBJECT must be a static string. If delivery fails, log error and continue.

Write the report in <LANGUAGE>.

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### README.md

```markdown
# Media News Digest 🎬

> Automated media & entertainment industry news digest β€” 29 sources, 3-layer pipeline, one chat message to install.

[![Python 3.8+](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/downloads/)
[![MIT License](https://img.shields.io/badge/license-MIT-green.svg)](LICENSE)

## πŸ’¬ Install in One Message

Tell your [OpenClaw](https://openclaw.ai) AI assistant:

> **"Install media-news-digest and send a daily digest to #news-media every morning at 7am"**

That's it. Your bot handles installation, configuration, scheduling, and delivery β€” all through conversation.

More examples:

> πŸ—£οΈ "Set up a weekly Hollywood digest, only box office and awards, deliver to Discord #awards every Monday"

> πŸ—£οΈ "Install media-news-digest, add my RSS feeds, and send festival news to email"

> πŸ—£οΈ "Give me a media digest right now, focus on streaming news"

Or install via CLI:
```bash
clawhub install media-news-digest
```

## πŸ“Š What You Get

A quality-scored, deduplicated entertainment industry digest built from **29 sources**:

| Layer | Sources | What |
|-------|---------|------|
| πŸ“‘ RSS | 16 feeds | THR, Deadline, Variety, IndieWire, The Wrap, Collider, Gold Derby… |
| 🐦 Twitter/X | 11 KOLs | @THR, @DEADLINE, @Variety, @BoxOfficeMojo, @MattBelloni… |
| πŸ” Web Search | 7 topics | Brave Search API with freshness filters |

### Pipeline

```
RSS + Twitter + Web Search
         ↓
   merge-sources.py
         ↓
Quality Scoring β†’ Deduplication β†’ Topic Grouping
         ↓
  Discord / Email output
```

## 🎯 7 Topic Sections

| # | Section | Covers |
|---|---------|--------|
| 🎬 | Production / εˆΆδ½œεŠ¨ζ€ | New projects, casting, filming updates |
| πŸ’° | Deals & Business / θ‘ŒδΈšδΊ€ζ˜“ | M&A, rights, talent deals, restructuring |
| 🎟️ | Box Office / η₯¨ζˆΏ | NA/global box office, opening weekends |
| πŸ“Ί | Streaming / 桁εͺ’体 | Netflix, Disney+, Apple TV+, viewership |
| πŸ† | Awards / 钁ε₯–ε­£ | Oscars, Golden Globes, Emmys, campaigns |
| πŸŽͺ | Film Festivals / η”΅ε½±θŠ‚ | Cannes, Venice, TIFF, Sundance, Berlin |
| ⭐ | Reviews & Buzz / 影评口璑 | Critical reception, RT/Metacritic scores |

## πŸ“‘ RSS Sources (16 enabled)

THR Β· Deadline Β· Variety Β· IndieWire Β· The Wrap Β· Collider Β· Awards Daily Β· Gold Derby Β· Screen Rant Β· Empire Β· The Playlist Β· /Film Β· JoBlo Β· FirstShowing.net Β· ComingSoon.net Β· World of Reel

## 🐦 Twitter/X KOLs (11)

@THR Β· @DEADLINE Β· @Variety Β· @FilmUpdates Β· @DiscussingFilm Β· @ScottFeinberg Β· @kristapley Β· @BoxOfficeMojo Β· @GiteshPandya Β· @MattBelloni Β· @Borys_Kit

## βš™οΈ Configuration

### Default configs in `config/defaults/`:
- `sources.json` β€” RSS feeds, Twitter handles
- `topics.json` β€” Report sections with search queries

### User overrides in `workspace/config/`:
- Same `id` β†’ user version wins
- New `id` β†’ appended to defaults

## πŸš€ Quick Start

```bash
# Full pipeline
python3 scripts/fetch-rss.py --defaults config/defaults --hours 48 --output /tmp/md-rss.json
python3 scripts/fetch-twitter.py --defaults config/defaults --hours 48 --output /tmp/md-twitter.json
python3 scripts/fetch-web.py --defaults config/defaults --freshness pd --output /tmp/md-web.json
python3 scripts/merge-sources.py --rss /tmp/md-rss.json --twitter /tmp/md-twitter.json --web /tmp/md-web.json --output /tmp/md-merged.json
```

## πŸ“¦ Dependencies

```bash
pip install -r requirements.txt
```

All scripts work with **Python 3.8+ standard library only**. `feedparser` optional but recommended.

## πŸ“‹ Cron Integration

Reference `references/digest-prompt.md` in OpenClaw cron prompts. See [SKILL.md](SKILL.md) for full documentation.

## License

MIT

```

### _meta.json

```json
{
  "owner": "dinstein",
  "slug": "media-news-digest",
  "displayName": "Media News Digest",
  "latest": {
    "version": "2.1.1",
    "publishedAt": 1772383203855,
    "commit": "https://github.com/openclaw/skills/commit/d19246ed40e7cf4d1be0a83d90a8dedc927c8697"
  },
  "history": [
    {
      "version": "1.8.1",
      "publishedAt": 1771769764440,
      "commit": "https://github.com/openclaw/skills/commit/bf67402bec3290462e4ca78b6fe0f7fdd34fcc6c"
    },
    {
      "version": "1.8.0",
      "publishedAt": 1771656728335,
      "commit": "https://github.com/openclaw/skills/commit/e509b40d4f6b44c00f6c9f465d277aa911295daa"
    },
    {
      "version": "1.7.1",
      "publishedAt": 1771470668230,
      "commit": "https://github.com/openclaw/skills/commit/184aaec7885bca0e14da5c4377d0aa0f2007315a"
    },
    {
      "version": "1.6.3",
      "publishedAt": 1771375582182,
      "commit": "https://github.com/openclaw/skills/commit/43e2fd70fbd7b9a09a50458f9144e10c702a15cf"
    },
    {
      "version": "1.6.1",
      "publishedAt": 1771329585077,
      "commit": "https://github.com/openclaw/skills/commit/e0e0d2efcfb74132dabf647a5b6c8357f1877c66"
    },
    {
      "version": "1.6.0",
      "publishedAt": 1771302892917,
      "commit": "https://github.com/openclaw/skills/commit/b3025b45d4f97bd911f598bb95c91c70c4c62a3e"
    },
    {
      "version": "1.3.0",
      "publishedAt": 1771232597533,
      "commit": "https://github.com/openclaw/skills/commit/323487faab39653aee723884764b6c24a15f3014"
    }
  ]
}

```

### references/templates/discord.md

```markdown
# Media Digest Discord Template

Discord-optimized format with bullet points and link suppression.

## Template Structure

```markdown
# 🎬 每ζ—₯影视ζ—₯ζŠ₯ β€” {{DATE}}

> {{EXECUTIVE_SUMMARY}}

{{#topics}}
## {{emoji}} {{label}}

{{#articles}}
β€’ πŸ”₯{{quality_score}} | {{chinese_summary}}
  <{{link}}>
  {{#multi_source}}*[{{source_count}} sources]*{{/multi_source}}

{{/articles}}
{{/topics}}

## πŸ“’ KOL Updates

{{#kol_tweets}}
β€’ **{{display_name}}** (@{{handle}}) β€” {{summary}} `πŸ‘ {{views}} | πŸ’¬ {{replies}} | πŸ” {{retweets}} | ❀️ {{likes}}`
  <{{tweet_link}}>
{{/kol_tweets}}

## πŸ“ Deep Reads

{{#deep_reads}}
β€’ {{title}} β€” {{description}}
  <{{link}}>
{{/deep_reads}}

---
πŸ“Š Data Sources: RSS {{rss_count}} | Twitter {{twitter_count}} | Web {{web_count}} | After dedup: {{merged_count}} articles
πŸ€– Generated by media-news-digest v{{version}} | <https://github.com/draco-agent/media-news-digest> | Powered by OpenClaw
```

## Delivery

- **Default: Channel** β€” Send to the Discord channel specified by `DISCORD_CHANNEL_ID`
- Use `message` tool with `target` set to the channel ID for channel delivery
- For DM delivery instead, set `target` to a user ID

## Discord-Specific Rules

- **Link suppression**: Wrap links in `<>` to prevent embeds
- **Bullet format**: Use `β€’` for clean mobile display
- **No tables**: Discord mobile doesn't handle markdown tables well
- **Emoji headers**: Visual hierarchy with topic emojis
- **Character limits**: Discord messages have 2000 char limit, may need splitting
- **Chinese body + English links**: Summary in Chinese, original links preserved

```

### references/templates/email.md

```markdown
# Media Digest Email Template

HTML email format optimized for Gmail/Outlook rendering.

## Delivery

**Step 1: Sanitize** β€” convert markdown report to XSS-safe HTML:
```bash
python3 <SKILL_DIR>/scripts/sanitize-html.py --input /tmp/md-report.md --output /tmp/md-email.html
```

**Step 2: Send** via `gog gmail send`:
```bash
gog gmail send --to '<EMAIL>' --subject '<SUBJECT>' --body-html "$(cat /tmp/md-email.html)"
```

⚠️ **Security**: Never manually build HTML from fetched content. Always use sanitize-html.py which HTML-escapes all text, validates URLs (http/https only), and strips dangerous content.

## Template Structure

```html
<div style="max-width:640px;margin:0 auto;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;color:#1a1a1a;line-height:1.6">

  <h1 style="font-size:22px;border-bottom:2px solid #e5e5e5;padding-bottom:8px">
    🎬 每ζ—₯影视ζ—₯ζŠ₯ β€” {{DATE}}
  </h1>

  <p style="color:#555;font-size:14px;background:#f8f9fa;padding:12px;border-radius:6px">
    {{EXECUTIVE_SUMMARY}}
  </p>

  <h2 style="font-size:17px;margin-top:24px;color:#333">{{emoji}} {{label}}</h2>
  <ul style="padding-left:20px">
    <li style="margin-bottom:10px">
      <strong>πŸ”₯{{quality_score}} | {{chinese_summary}}</strong> β€” {{description}}
      <br><a href="{{link}}" style="color:#0969da;font-size:13px">{{link}}</a>
    </li>
  </ul>

  <h2 style="font-size:17px;margin-top:24px;color:#333">πŸ“’ KOL Updates</h2>
  <ul style="padding-left:20px">
    <li style="margin-bottom:10px">
      <strong>{{display_name}}</strong> (@{{handle}}) β€” {{summary}}
      <br><code style="font-size:12px;color:#888;background:#f4f4f4;padding:2px 6px;border-radius:3px">πŸ‘ {{views}} | πŸ’¬ {{replies}} | πŸ” {{retweets}} | ❀️ {{likes}}</code>
      <br><a href="{{tweet_link}}" style="color:#0969da;font-size:13px">{{tweet_link}}</a>
    </li>
  </ul>

  <hr style="border:none;border-top:1px solid #e5e5e5;margin:24px 0">
  <p style="font-size:12px;color:#888">
    πŸ“Š Data Sources: RSS {{rss_count}} | Twitter {{twitter_count}} | Web {{web_count}} | After dedup: {{merged_count}} articles
    <br>πŸ€– Generated by <a href="https://github.com/draco-agent/media-news-digest" style="color:#0969da">media-news-digest</a> v{{version}} | Powered by <a href="https://openclaw.ai" style="color:#0969da">OpenClaw</a>
  </p>

</div>
```

## Style Guidelines

- **Max width**: 640px centered (mobile-friendly)
- **Fonts**: System font stack (no web fonts in email)
- **All styles inline**: Email clients strip `<style>` tags
- **Links**: Use full URLs, styled with `color:#0969da`
- **Headings**: h1 for title (22px), h2 for topics (17px)
- **Lists**: `<ul>` with `<li>`, adequate spacing
- **Footer**: Small gray text with stats + repo link + OpenClaw branding
- **No images**: Pure text/HTML for maximum compatibility
- **No tables for layout**: Use div + inline styles
- **Chinese body + English links**

```

### scripts/config_loader.py

```python
#!/usr/bin/env python3
"""
Configuration overlay loader for media-news-digest.

Handles loading and merging of default configurations with optional user overlays.
Supports sources.json and topics.json with overlay logic for customization.
"""

import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any

logger = logging.getLogger(__name__)


def load_merged_sources(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
    """
    Load and merge sources from defaults and optional user config overlay.
    
    Args:
        defaults_dir: Path to default configuration directory (skill defaults)
        config_dir: Optional path to user configuration directory (overlay)
    
    Returns:
        List of merged source configurations
        
    Merge Logic:
        1. Load defaults/sources.json as base
        2. If config_dir provided and has sources.json, load user overlay
        3. For each user source:
           - If id matches default source: user version completely replaces default
           - If id is new: append to list
           - If user source has "enabled": false: disable matching default source
    """
    defaults_path = defaults_dir / "sources.json"
    
    # Load default sources
    try:
        with open(defaults_path, 'r', encoding='utf-8') as f:
            defaults_data = json.load(f)
        default_sources = defaults_data.get("sources", [])
        logger.debug(f"Loaded {len(default_sources)} default sources from {defaults_path}")
    except FileNotFoundError:
        raise FileNotFoundError(f"Default sources config not found: {defaults_path}")
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON in default sources config: {e}")
    
    # Validate required fields
    validated = []
    required_fields = {"id", "type", "enabled"}
    for i, source in enumerate(default_sources):
        missing = required_fields - set(source.keys())
        if missing:
            logger.warning(f"Source #{i} missing required fields {missing}, skipping: {source}")
            continue
        validated.append(source)
    default_sources = validated

    # If no user config directory specified, return defaults only
    if config_dir is None:
        return default_sources
        
    config_path = config_dir / "media-news-digest-sources.json"
    
    # Try to load user overlay
    try:
        with open(config_path, 'r', encoding='utf-8') as f:
            config_data = json.load(f)
        user_sources = config_data.get("sources", [])
        logger.debug(f"Loaded {len(user_sources)} user sources from {config_path}")
    except FileNotFoundError:
        logger.debug(f"No user sources config found at {config_path}, using defaults only")
        return default_sources
    except json.JSONDecodeError as e:
        logger.warning(f"Invalid JSON in user sources config {config_path}: {e}, using defaults only")
        return default_sources
    
    # Merge logic: create lookup by id for efficient merging
    merged_sources = {}
    
    # Start with all default sources
    for source in default_sources:
        source_id = source.get("id")
        if source_id:
            merged_sources[source_id] = source.copy()
    
    # Apply user overlay
    for user_source in user_sources:
        source_id = user_source.get("id")
        if not source_id:
            continue
            
        if source_id in merged_sources:
            # User source overrides default completely
            if user_source.get("enabled") is False:
                # User explicitly disables this source
                merged_sources[source_id]["enabled"] = False
                logger.debug(f"User disabled source: {source_id}")
            else:
                # User replaces entire source config
                merged_sources[source_id] = user_source.copy()
                logger.debug(f"User overrode source: {source_id}")
        else:
            # New user source, append
            merged_sources[source_id] = user_source.copy()
            logger.debug(f"User added new source: {source_id}")
    
    # Convert back to list, maintaining order (defaults first, then user additions)
    result = []
    
    # Add default sources (potentially overridden)
    for source in default_sources:
        source_id = source.get("id")
        if source_id and source_id in merged_sources:
            result.append(merged_sources[source_id])
    
    # Add new user sources
    for user_source in user_sources:
        source_id = user_source.get("id")
        if source_id and source_id not in [s.get("id") for s in default_sources]:
            result.append(merged_sources[source_id])
    
    logger.info(f"Merged configuration: {len(default_sources)} defaults + {len(user_sources)} user = {len(result)} total sources")
    return result


def load_merged_topics(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
    """
    Load and merge topics from defaults and optional user config overlay.
    
    Args:
        defaults_dir: Path to default configuration directory (skill defaults)
        config_dir: Optional path to user configuration directory (overlay)
    
    Returns:
        List of merged topic configurations
        
    Merge Logic:
        1. Load defaults/topics.json as base
        2. If config_dir provided and has topics.json, load user overlay
        3. For each user topic:
           - If id matches default topic: user version completely replaces default
           - If id is new: append to list
    """
    defaults_path = defaults_dir / "topics.json"
    
    # Load default topics
    try:
        with open(defaults_path, 'r', encoding='utf-8') as f:
            defaults_data = json.load(f)
        default_topics = defaults_data.get("topics", [])
        logger.debug(f"Loaded {len(default_topics)} default topics from {defaults_path}")
    except FileNotFoundError:
        raise FileNotFoundError(f"Default topics config not found: {defaults_path}")
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON in default topics config: {e}")
    
    # If no user config directory specified, return defaults only
    if config_dir is None:
        return default_topics
        
    config_path = config_dir / "media-news-digest-topics.json"
    
    # Try to load user overlay
    try:
        with open(config_path, 'r', encoding='utf-8') as f:
            config_data = json.load(f)
        user_topics = config_data.get("topics", [])
        logger.debug(f"Loaded {len(user_topics)} user topics from {config_path}")
    except FileNotFoundError:
        logger.debug(f"No user topics config found at {config_path}, using defaults only")
        return default_topics
    except json.JSONDecodeError as e:
        logger.warning(f"Invalid JSON in user topics config {config_path}: {e}, using defaults only")
        return default_topics
    
    # Merge logic: create lookup by id for efficient merging
    merged_topics = {}
    
    # Start with all default topics
    for topic in default_topics:
        topic_id = topic.get("id")
        if topic_id:
            merged_topics[topic_id] = topic.copy()
    
    # Apply user overlay
    for user_topic in user_topics:
        topic_id = user_topic.get("id")
        if not topic_id:
            continue
            
        if topic_id in merged_topics:
            # User topic overrides default completely
            merged_topics[topic_id] = user_topic.copy()
            logger.debug(f"User overrode topic: {topic_id}")
        else:
            # New user topic, append
            merged_topics[topic_id] = user_topic.copy()
            logger.debug(f"User added new topic: {topic_id}")
    
    # Convert back to list, maintaining order (defaults first, then user additions)
    result = []
    
    # Add default topics (potentially overridden)
    for topic in default_topics:
        topic_id = topic.get("id")
        if topic_id and topic_id in merged_topics:
            result.append(merged_topics[topic_id])
    
    # Add new user topics
    for user_topic in user_topics:
        topic_id = user_topic.get("id")
        if topic_id and topic_id not in [t.get("id") for t in default_topics]:
            result.append(merged_topics[topic_id])
    
    logger.info(f"Merged topics: {len(default_topics)} defaults + {len(user_topics)} user = {len(result)} total topics")
    return result



```

### scripts/enrich-articles.py

```python
#!/usr/bin/env python3
"""
Enrich high-scoring articles with full text content.

Fetches full article text for top articles from merged JSON, using:
1. Cloudflare Markdown for Agents (Accept: text/markdown) β€” preferred
2. HTML readability extraction β€” fallback
3. Skip β€” for paywalled/JS-heavy pages

Usage:
    python3 enrich-articles.py --input merged.json --output enriched.json [--min-score 10] [--verbose]
"""

import json
import re
import sys
import os
import argparse
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from html.parser import HTMLParser
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from urllib.request import urlopen, Request
from urllib.error import HTTPError, URLError

TIMEOUT = 10
MAX_WORKERS = 5
DEFAULT_MIN_SCORE = 10
DEFAULT_MAX_ARTICLES = 15
DEFAULT_MAX_CHARS = 2000
USER_AGENT = "MediaDigest/3.0 (article enrichment)"

SKIP_DOMAINS = {
    "twitter.com", "x.com",
    "reddit.com", "old.reddit.com",
    "github.com",
    "youtube.com", "youtu.be",
    "nytimes.com", "bloomberg.com", "wsj.com", "ft.com",
    "arxiv.org",
}


def setup_logging(verbose=False):
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(level=level, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
    return logging.getLogger(__name__)


def get_domain(url):
    try:
        from urllib.parse import urlparse
        return urlparse(url).netloc.lower().lstrip("www.")
    except Exception:
        return ""


class TextExtractor(HTMLParser):
    def __init__(self):
        super().__init__()
        self._text = []
        self._skip = False
        self._skip_tags = {"script", "style", "nav", "footer", "header", "aside", "noscript"}

    def handle_starttag(self, tag, attrs):
        if tag in self._skip_tags:
            self._skip = True

    def handle_endtag(self, tag):
        if tag in self._skip_tags:
            self._skip = False
        if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "li"):
            self._text.append("\n")

    def handle_data(self, data):
        if not self._skip:
            self._text.append(data)

    def get_text(self):
        raw = "".join(self._text)
        raw = re.sub(r"[ \t]+", " ", raw)
        raw = re.sub(r"\n{3,}", "\n\n", raw)
        return raw.strip()


def extract_readable_text(html):
    article_match = re.search(r"<article[^>]*>(.*?)</article>", html, re.DOTALL | re.IGNORECASE)
    fragment = article_match.group(1) if article_match else html
    extractor = TextExtractor()
    try:
        extractor.feed(fragment)
    except Exception:
        return ""
    return extractor.get_text()


def fetch_full_text(url, max_chars=DEFAULT_MAX_CHARS):
    domain = get_domain(url)
    if domain in SKIP_DOMAINS:
        return {"text": "", "method": "skipped", "tokens": 0, "error": f"domain {domain} in skip list"}

    try:
        headers = {"Accept": "text/markdown, text/html;q=0.9", "User-Agent": USER_AGENT}
        req = Request(url, headers=headers)
        with urlopen(req, timeout=TIMEOUT) as resp:
            content_type = resp.headers.get("Content-Type", "")
            token_header = resp.headers.get("x-markdown-tokens", "")
            raw = resp.read()

            if raw[:2] == b"\x1f\x8b":
                import gzip
                raw = gzip.decompress(raw)

            text = raw.decode("utf-8", errors="replace")

            if "text/markdown" in content_type:
                tokens = int(token_header) if token_header.isdigit() else len(text) // 4
                return {"text": text[:max_chars], "method": "cf-markdown", "tokens": tokens, "error": None}

            extracted = extract_readable_text(text)
            if len(extracted) < 100:
                return {"text": "", "method": "html-too-short", "tokens": 0, "error": "extracted text too short"}

            return {"text": extracted[:max_chars], "method": "html-extract", "tokens": len(extracted[:max_chars]) // 4, "error": None}

    except HTTPError as e:
        return {"text": "", "method": "error", "tokens": 0, "error": f"HTTP {e.code}"}
    except URLError as e:
        return {"text": "", "method": "error", "tokens": 0, "error": f"URL error: {e.reason}"}
    except Exception as e:
        return {"text": "", "method": "error", "tokens": 0, "error": str(e)[:100]}


def enrich_articles(articles, min_score=DEFAULT_MIN_SCORE, max_articles=DEFAULT_MAX_ARTICLES, max_chars=DEFAULT_MAX_CHARS):
    # Eligible: high-score articles OR RSS blog articles (lower threshold for blogs)
    blog_domains = {
        "simonwillison.net", "overreacted.io", "eli.thegreenplace.net",
        "matklad.github.io", "lucumr.pocoo.org", "devblogs.microsoft.com",
        "rachelbythebay.com", "xeiaso.net", "pluralistic.net", "lcamtuf.substack.com",
        "hillelwayne.com", "dynomight.net", "geoffreylitt.com", "fabiensanglard.net",
        "blog.cloudflare.com", "antirez.com", "paulgraham.com", "danluu.com",
        "latent.space", "www.latent.space",
    }
    eligible = []
    for a in articles:
        if a.get("full_text") or not a.get("link"):
            continue
        score = a.get("quality_score", 0)
        domain = get_domain(a.get("link", ""))
        # Blog articles get lower threshold (score >= 3), others use min_score
        if score >= min_score or (domain in blog_domains and score >= 3):
            eligible.append(a)

    seen_urls = {}
    unique = []
    for a in eligible:
        url = a["link"]
        if url not in seen_urls:
            seen_urls[url] = a
            unique.append(a)

    unique.sort(key=lambda x: -x.get("quality_score", 0))
    to_fetch = unique[:max_articles]

    if not to_fetch:
        logging.info("No articles eligible for enrichment")
        return 0, 0, 0

    logging.info(f"Enriching {len(to_fetch)} articles (min_score={min_score})")

    attempted = success = cf_count = 0
    results = {}

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        futures = {pool.submit(fetch_full_text, a["link"], max_chars): a["link"] for a in to_fetch}
        for future in as_completed(futures):
            url = futures[future]
            attempted += 1
            result = future.result()
            results[url] = result
            if result["text"]:
                success += 1
                if result["method"] == "cf-markdown":
                    cf_count += 1
                logging.debug(f"  βœ… [{result['method']}] {url[:60]}... ({result['tokens']} tokens)")
            else:
                logging.debug(f"  ⏭️ [{result['method']}] {url[:60]}... ({result.get('error', '')})")

    for a in articles:
        url = a.get("link", "")
        if url in results and results[url]["text"]:
            r = results[url]
            a["full_text"] = r["text"]
            a["full_text_method"] = r["method"]
            a["full_text_tokens"] = r["tokens"]

    logging.info(f"Enrichment: {success}/{attempted} enriched ({cf_count} via CF Markdown)")
    return attempted, success, cf_count


def main():
    parser = argparse.ArgumentParser(description="Enrich articles with full text")
    parser.add_argument("--input", "-i", type=Path, required=True, help="Input merged JSON")
    parser.add_argument("--output", "-o", type=Path, help="Output enriched JSON (default: overwrite input)")
    parser.add_argument("--min-score", type=int, default=DEFAULT_MIN_SCORE)
    parser.add_argument("--max-articles", type=int, default=DEFAULT_MAX_ARTICLES)
    parser.add_argument("--max-chars", type=int, default=DEFAULT_MAX_CHARS)
    parser.add_argument("--verbose", "-v", action="store_true")
    parser.add_argument("--force", action="store_true", help="Ignored (pipeline compat)")
    args = parser.parse_args()
    setup_logging(args.verbose)

    if not args.input.exists():
        logging.error(f"Input file not found: {args.input}")
        return 1

    output_path = args.output or args.input

    try:
        with open(args.input, "r", encoding="utf-8") as f:
            data = json.load(f)

        all_articles = []
        topics = data.get("topics", {})
        if isinstance(topics, dict):
            for topic_data in topics.values():
                if isinstance(topic_data, dict):
                    all_articles.extend(topic_data.get("articles", []))
                elif isinstance(topic_data, list):
                    all_articles.extend(topic_data)

        t0 = time.time()
        attempted, success, cf_count = enrich_articles(all_articles, args.min_score, args.max_articles, args.max_chars)
        elapsed = time.time() - t0

        data["enrichment"] = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "attempted": attempted, "success": success, "cf_markdown": cf_count,
            "elapsed_s": round(elapsed, 1), "min_score": args.min_score, "max_chars": args.max_chars,
        }

        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

        logging.info(f"βœ… Done: {success}/{attempted} enriched in {elapsed:.1f}s β†’ {output_path}")
        return 0

    except Exception as e:
        logging.error(f"πŸ’₯ Enrichment failed: {e}")
        return 1


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/fetch-reddit.py

```python
#!/usr/bin/env python3
"""
Fetch Reddit posts from unified sources configuration.

Reads sources.json, filters Reddit sources, fetches posts via Reddit JSON API,
and outputs structured JSON with posts tagged by topics.

Usage:
    python3 fetch-reddit.py [--defaults DEFAULTS_DIR] [--config CONFIG_DIR] [--hours 48] [--output FILE] [--verbose] [--force] [--no-cache]

Environment:
    No API key required. Uses Reddit's public JSON API.
"""

import json
import sys
import os
import argparse
import logging
import ssl
import time
import tempfile
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Dict, Any, Optional
from urllib.request import Request, urlopen

_SSL_CTX = ssl.create_default_context()
from urllib.error import HTTPError, URLError

# Constants
MAX_WORKERS = 4
TIMEOUT = 30
RETRY_COUNT = 2
RETRY_DELAY = 3
USER_AGENT = "MediaDigest/1.8 (bot; +https://github.com/draco-agent/media-news-digest)"
RESUME_MAX_AGE_SECONDS = 3600  # 1 hour


def setup_logging(verbose: bool = False) -> logging.Logger:
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format='%(asctime)s [%(levelname)s] %(message)s',
        datefmt='%H:%M:%S'
    )
    return logging.getLogger(__name__)


def load_reddit_sources(defaults_dir: Optional[Path], config_dir: Optional[Path]) -> List[Dict[str, Any]]:
    """Load Reddit sources from config, with user overrides."""
    sys.path.insert(0, str(Path(__file__).parent))
    from config_loader import load_merged_sources as load_sources
    
    all_sources = load_sources(defaults_dir, config_dir)
    reddit_sources = []
    for s in all_sources:
        if s.get('type') != 'reddit':
            continue
        if not s.get('enabled', True):
            continue
        if not s.get('subreddit'):
            logging.warning(f"Reddit source {s.get('id')} missing subreddit, skipping")
            continue
        reddit_sources.append(s)
    
    return reddit_sources


def fetch_subreddit(source: Dict[str, Any], cutoff: datetime) -> Dict[str, Any]:
    """Fetch posts from a subreddit using Reddit's JSON API."""
    source_id = source['id']
    subreddit = source['subreddit']
    sort = source.get('sort', 'hot')
    limit = source.get('limit', 25)
    min_score = source.get('min_score', 0)
    priority = source.get('priority', False)
    topics = source.get('topics', [])
    name = source.get('name', f'r/{subreddit}')
    
    url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={limit}&raw_json=1"
    
    for attempt in range(RETRY_COUNT + 1):
        try:
            req = Request(url, headers={
                'User-Agent': USER_AGENT,
                'Accept': 'text/html,application/json',
                'Accept-Language': 'en-US,en;q=0.9',
            })
            
            with urlopen(req, timeout=TIMEOUT, context=_SSL_CTX) as resp:
                data = json.loads(resp.read().decode('utf-8'))
            
            articles = []
            children = data.get('data', {}).get('children', [])
            
            for child in children:
                post = child.get('data', {})
                if not post:
                    continue
                
                # Parse timestamp
                created_utc = post.get('created_utc', 0)
                post_time = datetime.fromtimestamp(created_utc, tz=timezone.utc)
                
                # Filter by time
                if post_time < cutoff:
                    continue
                
                # Filter by score
                score = post.get('score', 0)
                if score < min_score:
                    continue
                
                # Skip stickied/pinned posts
                if post.get('stickied', False):
                    continue
                
                # Get the external URL (if it's a link post) vs self post
                permalink = f"https://www.reddit.com{post.get('permalink', '')}"
                external_url = post.get('url', '')
                is_self = post.get('is_self', True)
                
                # If it's a self post or URL points to reddit, use permalink
                if is_self or 'reddit.com' in external_url or 'redd.it' in external_url:
                    link = permalink
                    external_url = None
                else:
                    link = external_url
                
                title = post.get('title', '').strip()
                if not title:
                    continue
                
                flair = post.get('link_flair_text', '')
                num_comments = post.get('num_comments', 0)
                upvote_ratio = post.get('upvote_ratio', 0)
                
                articles.append({
                    "title": title,
                    "link": link,
                    "reddit_url": permalink,
                    "external_url": external_url,
                    "date": post_time.isoformat(),
                    "score": score,
                    "num_comments": num_comments,
                    "flair": flair,
                    "is_self": is_self,
                    "topics": topics[:],
                    "metrics": {
                        "score": score,
                        "num_comments": num_comments,
                        "upvote_ratio": upvote_ratio
                    }
                })
            
            return {
                "source_id": source_id,
                "source_type": "reddit",
                "name": name,
                "subreddit": subreddit,
                "sort": sort,
                "priority": priority,
                "topics": topics,
                "status": "ok",
                "attempts": attempt + 1,
                "count": len(articles),
                "articles": articles,
            }
        
        except HTTPError as e:
            if e.code == 429:
                logging.warning(f"Rate limit for r/{subreddit}, attempt {attempt + 1}")
                if attempt < RETRY_COUNT:
                    time.sleep(10)
                    continue
            elif e.code == 403:
                logging.warning(f"r/{subreddit} is private or quarantined")
                return {
                    "source_id": source_id,
                    "source_type": "reddit",
                    "name": name,
                    "subreddit": subreddit,
                    "status": "error",
                    "error": f"HTTP {e.code}: Forbidden",
                    "count": 0,
                    "articles": [],
                }
            error_msg = f"HTTP {e.code}"
            logging.warning(f"Error fetching r/{subreddit}: {error_msg}")
        except (URLError, OSError) as e:
            error_msg = str(e)
            logging.warning(f"Network error for r/{subreddit}: {error_msg}")
        except Exception as e:
            error_msg = str(e)
            logging.error(f"Unexpected error for r/{subreddit}: {error_msg}")
        
        if attempt < RETRY_COUNT:
            time.sleep(RETRY_DELAY)
    
    return {
        "source_id": source_id,
        "source_type": "reddit",
        "name": name,
        "subreddit": subreddit,
        "status": "error",
        "error": error_msg,
        "count": 0,
        "articles": [],
    }


def main() -> int:
    parser = argparse.ArgumentParser(
        description="Fetch Reddit posts from configured subreddits.\n"
                    "Uses Reddit's public JSON API (no authentication required).",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""\
Examples:
    python3 fetch-reddit.py --defaults config/defaults --output /tmp/md-reddit.json --verbose
    python3 fetch-reddit.py --defaults config/defaults --config ~/workspace/config --hours 48
    """
    )
    parser.add_argument('--defaults', type=Path, default=Path('config/defaults'),
                       help='Default config directory')
    parser.add_argument('--config', type=Path, default=None,
                       help='User config directory (overrides defaults)')
    parser.add_argument('--hours', type=int, default=48,
                       help='How many hours back to fetch (default: 48)')
    parser.add_argument('--output', type=Path, default=None,
                       help='Output JSON file path')
    parser.add_argument('--verbose', action='store_true',
                       help='Enable debug logging')
    parser.add_argument('--force', action='store_true',
                       help='Force fetch even if cached output exists')
    parser.add_argument('--no-cache', action='store_true',
                       help='Disable all caching')
    
    args = parser.parse_args()
    logger = setup_logging(args.verbose)
    
    # Auto-generate output path if not specified
    if not args.output:
        fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-reddit-", suffix=".json")
        os.close(fd)
        args.output = Path(temp_path)
    
    # Resume support
    if not args.force and args.output.exists():
        try:
            age = time.time() - args.output.stat().st_mtime
            if age < RESUME_MAX_AGE_SECONDS:
                with open(args.output) as f:
                    existing = json.load(f)
                if existing.get('subreddits'):
                    logger.info(f"⏭️  Skipping fetch: {args.output} is {age:.0f}s old (< {RESUME_MAX_AGE_SECONDS}s). Use --force to override.")
                    print(f"Output (cached): {args.output}")
                    return 0
        except (json.JSONDecodeError, KeyError):
            pass
    
    try:
        cutoff = datetime.now(timezone.utc) - timedelta(hours=args.hours)
        
        # Load sources
        if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
            sources = load_reddit_sources(args.config, None)
        else:
            sources = load_reddit_sources(args.defaults, args.config)
        
        if not sources:
            logger.warning("No Reddit sources found or all disabled")
            output = {
                "source": "reddit",
                "fetched_at": datetime.now(timezone.utc).isoformat(),
                "subreddits": [],
                "skipped_reason": "No Reddit sources configured"
            }
            with open(args.output, "w") as f:
                json.dump(output, f, indent=2)
            print(f"Output (empty): {args.output}")
            return 0
        
        logger.info(f"πŸ“‘ Fetching {len(sources)} subreddits (cutoff: {cutoff.strftime('%Y-%m-%d %H:%M')} UTC)")
        
        results = []
        total_posts = 0
        
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
            futures = {pool.submit(fetch_subreddit, source, cutoff): source for source in sources}
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                total_posts += result.get('count', 0)
        
        ok_count = sum(1 for r in results if r['status'] == 'ok')
        
        output = {
            "source": "reddit",
            "fetched_at": datetime.now(timezone.utc).isoformat(),
            "defaults_dir": str(args.defaults),
            "config_dir": str(args.config) if args.config else None,
            "hours": args.hours,
            "cutoff": cutoff.isoformat(),
            "subreddits_total": len(results),
            "subreddits_ok": ok_count,
            "total_posts": total_posts,
            "subreddits": results
        }
        
        json_str = json.dumps(output, ensure_ascii=False, indent=2)
        with open(args.output, "w", encoding='utf-8') as f:
            f.write(json_str)
        
        logger.info(f"βœ… Fetched {ok_count}/{len(results)} subreddits, {total_posts} posts")
        print(f"Output: {args.output}")
        return 0
    
    except Exception as e:
        logger.error(f"πŸ’₯ Reddit fetch failed: {e}")
        return 1


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/fetch-rss.py

```python
#!/usr/bin/env python3
"""
Fetch RSS feeds from unified sources configuration.

Reads sources.json, filters RSS sources, fetches feeds in parallel with retry
mechanism, and outputs structured JSON with articles tagged by topics.

Usage:
    python3 fetch-rss.py [--config CONFIG_DIR] [--hours 48] [--output FILE] [--verbose]
"""

import json
import re
import sys
import os
import argparse
import logging
import time
import tempfile
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen, Request
from urllib.error import URLError
from urllib.parse import urljoin
from pathlib import Path
from typing import Dict, List, Any, Optional

# Try to import feedparser, fall back to regex parsing
try:
    import feedparser
    HAS_FEEDPARSER = True
except ImportError:
    HAS_FEEDPARSER = False
    logging.warning("feedparser not installed β€” using basic XML regex parser (may miss some feeds). Install with: pip install feedparser")

TIMEOUT = 30
MAX_WORKERS = 10  
MAX_ARTICLES_PER_FEED = 20
RETRY_COUNT = 1
RETRY_DELAY = 2.0  # seconds
RSS_CACHE_PATH = "/tmp/media-news-digest-rss-cache.json"
RSS_CACHE_TTL_HOURS = 24


def setup_logging(verbose: bool) -> logging.Logger:
    """Setup logging configuration."""
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    return logging.getLogger(__name__)


def parse_date_regex(s: str) -> Optional[datetime]:
    """Parse date string using regex patterns (fallback method)."""
    if not s:
        return None
    s = s.strip()
    
    # Common date formats
    formats = [
        "%a, %d %b %Y %H:%M:%S %z",
        "%a, %d %b %Y %H:%M:%S %Z", 
        "%Y-%m-%dT%H:%M:%S%z",
        "%Y-%m-%dT%H:%M:%SZ",
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d",
    ]
    
    for fmt in formats:
        try:
            dt = datetime.strptime(s, fmt)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return dt
        except ValueError:
            continue
            
    # ISO fallback
    try:
        dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
        return dt
    except (ValueError, AttributeError):
        pass
        
    return None


def extract_cdata(text: str) -> str:
    """Extract content from CDATA sections."""
    m = re.search(r"<!\[CDATA\[(.*?)\]\]>", text, re.DOTALL)
    return m.group(1) if m else text


def strip_tags(html: str) -> str:
    """Remove HTML tags from text."""
    return re.sub(r"<[^>]+>", "", html).strip()


def get_tag(xml: str, tag: str) -> str:
    """Extract content from XML tag using regex."""
    m = re.search(rf"<{tag}[^>]*>(.*?)</{tag}>", xml, re.DOTALL | re.IGNORECASE)
    return extract_cdata(m.group(1)).strip() if m else ""


def validate_article_domain(article_link: str, source: Dict[str, Any]) -> bool:
    """Validate that article links from mirror sources point to expected domains.
    
    Sources with 'expected_domains' field will have their article links checked.
    Returns True if valid or if no domain restriction is set.
    """
    expected = source.get("expected_domains")
    if not expected:
        return True
    if not article_link:
        return False
    from urllib.parse import urlparse
    domain = urlparse(article_link).hostname or ""
    return any(domain == d or domain.endswith("." + d) for d in expected)


def resolve_link(link: str, base_url: str) -> str:
    """Resolve relative links against the feed URL. Rejects non-HTTP(S) schemes."""
    if not link:
        return link
    if link.startswith(("http://", "https://")):
        return link
    resolved = urljoin(base_url, link)
    if not resolved.startswith(("http://", "https://")):
        return ""  # reject javascript:, data:, etc.
    return resolved


def parse_feed_feedparser(content: str, cutoff: datetime, feed_url: str) -> List[Dict[str, Any]]:
    """Parse feed using feedparser library."""
    articles = []
    
    try:
        feed = feedparser.parse(content)
        
        for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
            title = entry.get('title', '').strip()
            link = entry.get('link', '').strip()
            
            # Try multiple date fields
            pub_date = None
            for date_field in ['published_parsed', 'updated_parsed']:
                if hasattr(entry, date_field) and getattr(entry, date_field):
                    try:
                        pub_date = datetime(*getattr(entry, date_field)[:6], tzinfo=timezone.utc)
                        break
                    except (TypeError, ValueError):
                        continue
                        
            # Fallback to string parsing
            if pub_date is None:
                for date_field in ['published', 'updated']:
                    if hasattr(entry, date_field) and getattr(entry, date_field):
                        pub_date = parse_date_regex(getattr(entry, date_field))
                        if pub_date:
                            break
                            
            if title and link and pub_date and pub_date >= cutoff:
                articles.append({
                    "title": title[:200],
                    "link": resolve_link(link, feed_url),
                    "date": pub_date.isoformat(),
                })
                
    except Exception as e:
        logging.debug(f"feedparser parsing failed: {e}")
        
    return articles


def parse_feed_regex(content: str, cutoff: datetime, feed_url: str) -> List[Dict[str, Any]]:
    """Parse feed using regex patterns (fallback method)."""
    articles = []

    # RSS 2.0 items
    for item in re.finditer(r"<item[^>]*>(.*?)</item>", content, re.DOTALL):
        block = item.group(1)
        title = strip_tags(get_tag(block, "title"))
        link = resolve_link(get_tag(block, "link"), feed_url)
        date_str = get_tag(block, "pubDate") or get_tag(block, "dc:date")
        pub = parse_date_regex(date_str)

        if title and link and pub and pub >= cutoff:
            articles.append({
                "title": title[:200],
                "link": link,
                "date": pub.isoformat(),
            })

    # Atom entries fallback
    if not articles:
        for entry in re.finditer(r"<entry[^>]*>(.*?)</entry>", content, re.DOTALL):
            block = entry.group(1)
            title = strip_tags(get_tag(block, "title"))
            link_m = re.search(r'<link[^>]*href=["\']([^"\']+)["\']', block)
            if not link_m:
                link = get_tag(block, "link")
            else:
                link = link_m.group(1)
            link = resolve_link(link, feed_url)
            date_str = get_tag(block, "updated") or get_tag(block, "published")
            pub = parse_date_regex(date_str)

            if title and link and pub and pub >= cutoff:
                articles.append({
                    "title": title[:200],
                    "link": link,
                    "date": pub.isoformat(),
                })

    return articles[:MAX_ARTICLES_PER_FEED]


def parse_feed(content: str, cutoff: datetime, feed_url: str) -> List[Dict[str, Any]]:
    """Parse feed using best available method."""
    if HAS_FEEDPARSER:
        articles = parse_feed_feedparser(content, cutoff, feed_url)
        if articles:
            return articles
        logging.debug("feedparser returned no articles, trying regex fallback")
        
    return parse_feed_regex(content, cutoff, feed_url)


def _load_rss_cache() -> Dict[str, Any]:
    """Load RSS ETag/Last-Modified cache."""
    try:
        with open(RSS_CACHE_PATH, 'r') as f:
            return json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        return {}


def _save_rss_cache(cache: Dict[str, Any]) -> None:
    """Save RSS ETag/Last-Modified cache."""
    try:
        with open(RSS_CACHE_PATH, 'w') as f:
            json.dump(cache, f)
    except Exception as e:
        logging.warning(f"Failed to save RSS cache: {e}")


# Module-level cache, loaded once per run
_rss_cache: Optional[Dict[str, Any]] = None
_rss_cache_dirty = False


def _get_rss_cache(no_cache: bool = False) -> Dict[str, Any]:
    global _rss_cache
    if _rss_cache is None:
        _rss_cache = {} if no_cache else _load_rss_cache()
    return _rss_cache


def _flush_rss_cache() -> None:
    global _rss_cache_dirty
    if _rss_cache_dirty and _rss_cache is not None:
        _save_rss_cache(_rss_cache)
        _rss_cache_dirty = False


def fetch_feed_with_retry(source: Dict[str, Any], cutoff: datetime, no_cache: bool = False) -> Dict[str, Any]:
    """Fetch RSS feed with retry mechanism and conditional requests."""
    source_id = source["id"]
    name = source["name"]
    url = source["url"]
    priority = source["priority"]
    topics = source["topics"]
    
    for attempt in range(RETRY_COUNT + 1):
        try:
            global _rss_cache_dirty
            req_headers = {"User-Agent": "MediaDigest/2.0"}
            
            # Add conditional headers from cache
            cache = _get_rss_cache(no_cache)
            cache_entry = cache.get(url)
            now = time.time()
            ttl_seconds = RSS_CACHE_TTL_HOURS * 3600
            
            if cache_entry and not no_cache and (now - cache_entry.get("ts", 0)) < ttl_seconds:
                if cache_entry.get("etag"):
                    req_headers["If-None-Match"] = cache_entry["etag"]
                if cache_entry.get("last_modified"):
                    req_headers["If-Modified-Since"] = cache_entry["last_modified"]
            
            req = Request(url, headers=req_headers)
            try:
                with urlopen(req, timeout=TIMEOUT) as resp:
                    # Update cache with response headers
                    etag = resp.headers.get("ETag")
                    last_mod = resp.headers.get("Last-Modified")
                    if etag or last_mod:
                        cache[url] = {"etag": etag, "last_modified": last_mod, "ts": now}
                        _rss_cache_dirty = True
                    
                    final_url = resp.url if hasattr(resp, 'url') else url
                    content = resp.read().decode("utf-8", errors="replace")
            except URLError as e:
                if hasattr(e, 'code') and e.code == 304:
                    logging.info(f"⏭ {name}: not modified (304)")
                    return {
                        "source_id": source_id,
                        "source_type": "rss",
                        "name": name,
                        "url": url,
                        "priority": priority,
                        "topics": topics,
                        "status": "ok",
                        "attempts": attempt + 1,
                        "not_modified": True,
                        "count": 0,
                        "articles": [],
                    }
                raise
                
            articles = parse_feed(content, cutoff, final_url)
            
            # Tag articles with topics and validate domains
            validated_articles = []
            for article in articles:
                article["topics"] = topics[:]
                if validate_article_domain(article.get("link", ""), source):
                    validated_articles.append(article)
                else:
                    logging.warning(f"⚠️ {name}: rejected article with unexpected domain: {article.get('link', '')}")
            articles = validated_articles
            
            return {
                "source_id": source_id,
                "source_type": "rss",
                "name": name,
                "url": url,
                "priority": priority,
                "topics": topics,
                "status": "ok",
                "attempts": attempt + 1,
                "count": len(articles),
                "articles": articles,
            }
            
        except Exception as e:
            error_msg = str(e)[:100]
            logging.debug(f"Attempt {attempt + 1} failed for {name}: {error_msg}")
            
            if attempt < RETRY_COUNT:
                time.sleep(RETRY_DELAY * (2 ** attempt))  # Exponential backoff
                continue
            else:
                return {
                    "source_id": source_id,
                    "source_type": "rss",
                    "name": name,
                    "url": url,
                    "priority": priority,
                    "topics": topics,
                    "status": "error",
                    "attempts": attempt + 1,
                    "error": error_msg,
                    "count": 0,
                    "articles": [],
                }


def load_sources(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
    """Load RSS sources from unified configuration with overlay support."""
    try:
        from config_loader import load_merged_sources
    except ImportError:
        # Fallback for relative import
        import sys
        sys.path.append(str(Path(__file__).parent))
        from config_loader import load_merged_sources
    
    # Load merged sources from defaults + optional user overlay
    all_sources = load_merged_sources(defaults_dir, config_dir)
    
    # Filter RSS sources that are enabled
    rss_sources = []
    for source in all_sources:
        if source.get("type") == "rss" and source.get("enabled", True):
            rss_sources.append(source)
            
    logging.info(f"Loaded {len(rss_sources)} enabled RSS sources")
    return rss_sources


def main():
    """Main RSS fetching function."""
    parser = argparse.ArgumentParser(
        description="Parallel RSS/Atom feed fetcher for media-news-digest. "
                   "Fetches enabled RSS sources from unified configuration, "
                   "filters by time window, and outputs structured article data.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    python3 fetch-rss.py
    python3 fetch-rss.py --defaults config/defaults --config workspace/config --hours 48 -o results.json
    python3 fetch-rss.py --config workspace/config --verbose  # backward compatibility
        """
    )
    
    parser.add_argument(
        "--defaults",
        type=Path,
        default=Path("config/defaults"),
        help="Default configuration directory with skill defaults (default: config/defaults)"
    )
    
    parser.add_argument(
        "--config",
        type=Path,
        help="User configuration directory for overlays (optional)"
    )
    
    parser.add_argument(
        "--hours",
        type=int,
        default=48,
        help="Time window in hours for articles (default: 48)"
    )
    
    parser.add_argument(
        "--output", "-o",
        type=Path,
        help="Output JSON path (default: auto-generated temp file)"
    )
    
    parser.add_argument(
        "--verbose", "-v",
        action="store_true",
        help="Enable verbose logging"
    )
    
    parser.add_argument(
        "--no-cache",
        action="store_true",
        help="Bypass ETag/Last-Modified conditional request cache"
    )
    
    parser.add_argument(
        "--force",
        action="store_true",
        help="Force re-fetch even if cached output exists"
    )
    
    args = parser.parse_args()
    logger = setup_logging(args.verbose)
    
    # Resume support: skip if output exists, is valid JSON, and < 1 hour old
    if args.output and args.output.exists() and not args.force:
        try:
            age_seconds = time.time() - args.output.stat().st_mtime
            if age_seconds < 3600:
                with open(args.output, 'r') as f:
                    json.load(f)  # validate JSON
                logger.info(f"Skipping (cached output exists): {args.output}")
                return 0
        except (json.JSONDecodeError, OSError):
            pass
    
    # Auto-generate unique output path if not specified
    if not args.output:
        fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-rss-", suffix=".json")
        os.close(fd)
        args.output = Path(temp_path)
    
    try:
        cutoff = datetime.now(timezone.utc) - timedelta(hours=args.hours)
        
        # Backward compatibility: if only --config provided, use old behavior
        if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
            logger.debug("Backward compatibility mode: using --config as sole source")
            sources = load_sources(args.config, None)
        else:
            sources = load_sources(args.defaults, args.config)
        
        if not sources:
            logger.warning("No RSS sources found or all disabled")
            
        logger.info(f"Fetching {len(sources)} RSS feeds (window: {args.hours}h)")
        
        # Check feedparser availability
        if HAS_FEEDPARSER:
            logger.debug("Using feedparser library for parsing")
        else:
            logger.info("feedparser not available, using regex parsing")
        
        # Initialize cache
        _get_rss_cache(no_cache=args.no_cache)
        
        results = []
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
            futures = {pool.submit(fetch_feed_with_retry, source, cutoff, args.no_cache): source 
                      for source in sources}
            
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                
                if result["status"] == "ok":
                    logger.debug(f"βœ… {result['name']}: {result['count']} articles")
                else:
                    logger.debug(f"❌ {result['name']}: {result['error']}")

        # Flush conditional request cache
        _flush_rss_cache()
        
        # Sort: priority first, then by article count
        results.sort(key=lambda x: (not x.get("priority", False), -x.get("count", 0)))

        ok_count = sum(1 for r in results if r["status"] == "ok")
        total_articles = sum(r.get("count", 0) for r in results)

        output = {
            "generated": datetime.now(timezone.utc).isoformat(),
            "source_type": "rss",
            "defaults_dir": str(args.defaults),
            "config_dir": str(args.config) if args.config else None,
            "hours": args.hours,
            "feedparser_available": HAS_FEEDPARSER,
            "sources_total": len(results),
            "sources_ok": ok_count,
            "total_articles": total_articles,
            "sources": results,
        }

        # Write output
        json_str = json.dumps(output, ensure_ascii=False, indent=2)
        with open(args.output, "w", encoding='utf-8') as f:
            f.write(json_str)

        logger.info(f"βœ… Done: {ok_count}/{len(results)} feeds ok, "
                   f"{total_articles} articles β†’ {args.output}")
        
        return 0
        
    except Exception as e:
        logger.error(f"πŸ’₯ RSS fetch failed: {e}")
        return 1


if __name__ == "__main__":
    sys.exit(main())
```

### scripts/fetch-twitter.py

```python
#!/usr/bin/env python3
"""
Fetch Twitter/X posts from KOL accounts using X API.

Reads sources.json, filters Twitter sources, fetches recent posts using
either the official X API v2 or twitterapi.io, and outputs structured JSON.

Usage:
    python3 fetch-twitter.py [--config CONFIG_DIR] [--hours 48] [--output FILE] [--verbose]
    python3 fetch-twitter.py --backend twitterapiio  # force twitterapi.io backend

Environment:
    TWITTER_API_BACKEND - Backend selection: "official", "twitterapiio", or "auto" (default: auto)
    X_BEARER_TOKEN      - Twitter/X API bearer token (for official backend)
    TWITTERAPI_IO_KEY   - twitterapi.io API key (for twitterapiio backend)
"""

import json
import sys
import os
import argparse
import logging
import time
import tempfile
import re
import threading
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen, Request
from urllib.error import HTTPError
from urllib.parse import urlencode, quote
from pathlib import Path
from typing import Dict, List, Any, Optional

TIMEOUT = 30
MAX_WORKERS = 5  # Lower for API rate limits
RETRY_COUNT = 2
RETRY_DELAY = 2.0
MAX_TWEETS_PER_USER = 20
ID_CACHE_PATH = "/tmp/media-news-digest-twitter-id-cache.json"
ID_CACHE_TTL_DAYS = 7

# Twitter API v2 endpoints
OFFICIAL_API_BASE = "https://api.x.com/2"
USER_LOOKUP_ENDPOINT = f"{OFFICIAL_API_BASE}/users/by"

# twitterapi.io endpoints
TWITTERAPIIO_BASE = "https://api.twitterapi.io"


def setup_logging(verbose: bool) -> logging.Logger:
    """Setup logging configuration."""
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    return logging.getLogger(__name__)


def clean_tweet_text(text: str) -> str:
    """Clean tweet text for better display."""
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    # Truncate if too long
    if len(text) > 280:
        text = text[:277] + "..."
    return text


# ---------------------------------------------------------------------------
# Rate limiting
# ---------------------------------------------------------------------------

class RateLimiter:
    """Simple token-bucket rate limiter."""
    def __init__(self, qps: float):
        self._lock = threading.Lock()
        self._min_interval = 1.0 / qps
        self._last = 0.0

    def wait(self):
        with self._lock:
            now = time.monotonic()
            wait_time = self._min_interval - (now - self._last)
            if wait_time > 0:
                time.sleep(wait_time)
            self._last = time.monotonic()


# ---------------------------------------------------------------------------
# Backend abstraction
# ---------------------------------------------------------------------------

class TwitterBackend(ABC):
    """Base class for Twitter API backends."""

    @staticmethod
    def _make_result(source, articles, attempt):
        return {
            "source_id": source["id"],
            "source_type": "twitter",
            "name": source["name"],
            "handle": source["handle"].lstrip('@'),
            "priority": source["priority"],
            "topics": source["topics"],
            "status": "ok",
            "attempts": attempt + 1,
            "count": len(articles),
            "articles": articles,
        }

    @staticmethod
    def _make_error(source, error_msg, attempt):
        return {
            "source_id": source["id"],
            "source_type": "twitter",
            "name": source["name"],
            "handle": source["handle"].lstrip('@'),
            "priority": source["priority"],
            "topics": source["topics"],
            "status": "error",
            "attempts": attempt + 1,
            "error": error_msg,
            "count": 0,
            "articles": [],
        }

    @abstractmethod
    def fetch_all(self, sources: List[Dict[str, Any]], cutoff: datetime) -> List[Dict[str, Any]]:
        """Fetch tweets for all sources. Returns list of source result dicts."""


class OfficialBackend(TwitterBackend):
    """Official X API v2 backend (existing logic)."""

    def __init__(self, bearer_token: str, no_cache: bool = False):
        self.bearer_token = bearer_token
        self.no_cache = no_cache

    # -- ID cache helpers --

    @staticmethod
    def _load_id_cache() -> Dict[str, Any]:
        try:
            with open(ID_CACHE_PATH, 'r') as f:
                return json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            return {}

    @staticmethod
    def _save_id_cache(cache: Dict[str, Any]) -> None:
        try:
            with open(ID_CACHE_PATH, 'w') as f:
                json.dump(cache, f)
        except Exception as e:
            logging.warning(f"Failed to save ID cache: {e}")

    def _batch_resolve_user_ids(self, handles: List[str]) -> Dict[str, str]:
        now = time.time()
        cache = {} if self.no_cache else self._load_id_cache()
        ttl_seconds = ID_CACHE_TTL_DAYS * 86400

        result: Dict[str, str] = {}
        to_resolve: List[str] = []
        for handle in handles:
            key = handle.lower()
            entry = cache.get(key)
            if entry and (now - entry.get("ts", 0)) < ttl_seconds:
                result[key] = entry["id"]
            else:
                to_resolve.append(handle)

        if to_resolve:
            logging.info(f"Batch resolving {len(to_resolve)} usernames (cached: {len(result)})")
            headers = {
                "Authorization": f"Bearer {self.bearer_token}",
                "User-Agent": "MediaDigest/2.0"
            }
            for i in range(0, len(to_resolve), 100):
                batch = to_resolve[i:i+100]
                url = f"{USER_LOOKUP_ENDPOINT}?{urlencode({'usernames': ','.join(batch)})}"
                try:
                    req = Request(url, headers=headers)
                    with urlopen(req, timeout=TIMEOUT) as resp:
                        data = json.loads(resp.read().decode())

                    if 'data' in data:
                        for user in data['data']:
                            key = user['username'].lower()
                            result[key] = user['id']
                            cache[key] = {"id": user['id'], "ts": now}

                    if 'errors' in data:
                        for err in data['errors']:
                            logging.warning(f"User lookup error: {err.get('detail', err)}")

                except Exception as e:
                    logging.error(f"Batch user lookup failed: {e}")
                    for handle in batch:
                        try:
                            fallback_url = f"{USER_LOOKUP_ENDPOINT}?{urlencode({'usernames': handle})}"
                            req = Request(fallback_url, headers=headers)
                            with urlopen(req, timeout=TIMEOUT) as resp:
                                fallback_data = json.loads(resp.read().decode())
                            if 'data' in fallback_data and fallback_data['data']:
                                key = handle.lower()
                                result[key] = fallback_data['data'][0]['id']
                                cache[key] = {"id": result[key], "ts": now}
                        except Exception as e2:
                            logging.warning(f"Individual lookup failed for @{handle}: {e2}")

            if not self.no_cache:
                self._save_id_cache(cache)
        else:
            logging.info(f"All {len(result)} usernames resolved from cache")

        return result

    @staticmethod
    def _parse_date(date_str: str) -> Optional[datetime]:
        try:
            if date_str.endswith('Z'):
                date_str = date_str[:-1] + '+00:00'
            return datetime.fromisoformat(date_str)
        except (ValueError, TypeError):
            logging.debug(f"Failed to parse Twitter date: {date_str}")
            return None

    def _fetch_user_tweets(self, source: Dict[str, Any], cutoff: datetime,
                           user_id: Optional[str] = None) -> Dict[str, Any]:
        handle = source["handle"].lstrip('@')
        topics = source["topics"]

        for attempt in range(RETRY_COUNT + 1):
            try:
                params = {
                    "max_results": min(MAX_TWEETS_PER_USER, 100),
                    "tweet.fields": "created_at,public_metrics,context_annotations,referenced_tweets",
                    "expansions": "author_id",
                    "user.fields": "verified,public_metrics"
                }

                if not user_id:
                    user_url = f"{USER_LOOKUP_ENDPOINT}?{urlencode({'usernames': handle})}"
                    headers = {
                        "Authorization": f"Bearer {self.bearer_token}",
                        "User-Agent": "MediaDigest/2.0"
                    }
                    req = Request(user_url, headers=headers)
                    with urlopen(req, timeout=TIMEOUT) as resp:
                        user_data = json.loads(resp.read().decode())
                    if 'data' not in user_data or not user_data['data']:
                        raise ValueError(f"User not found: {handle}")
                    user_id = user_data['data'][0]['id']

                headers = {
                    "Authorization": f"Bearer {self.bearer_token}",
                    "User-Agent": "MediaDigest/2.0"
                }

                time.sleep(0.3)

                tweets_url = f"{OFFICIAL_API_BASE}/users/{user_id}/tweets?{urlencode(params)}"
                req = Request(tweets_url, headers=headers)

                with urlopen(req, timeout=TIMEOUT) as resp:
                    tweets_data = json.loads(resp.read().decode())

                articles = []
                if 'data' in tweets_data:
                    for tweet in tweets_data['data']:
                        created_at = self._parse_date(tweet.get('created_at', ''))
                        if not created_at or created_at < cutoff:
                            continue

                        text = tweet.get('text', '')
                        if text.startswith('RT @'):
                            continue
                        referenced = tweet.get('referenced_tweets', [])
                        if any(ref.get('type') == 'replied_to' for ref in referenced):
                            continue

                        articles.append({
                            "title": clean_tweet_text(text),
                            "link": f"https://twitter.com/{handle}/status/{tweet['id']}",
                            "date": created_at.isoformat(),
                            "topics": topics[:],
                            "metrics": tweet.get('public_metrics', {}),
                            "tweet_id": tweet['id']
                        })

                return self._make_result(source, articles, attempt)

            except HTTPError as e:
                if e.code == 429:
                    error_msg = "Rate limit exceeded"
                    logging.warning(f"Rate limit hit for @{handle}, attempt {attempt + 1}")
                    if attempt < RETRY_COUNT:
                        time.sleep(60)
                        continue
                else:
                    error_msg = f"HTTP {e.code}: {e.reason}"

            except Exception as e:
                error_msg = str(e)[:100]
                logging.debug(f"Attempt {attempt + 1} failed for @{handle}: {error_msg}")

            if attempt < RETRY_COUNT:
                time.sleep(RETRY_DELAY * (2 ** attempt))
                continue

            return self._make_error(source, error_msg, attempt)

    def fetch_all(self, sources: List[Dict[str, Any]], cutoff: datetime) -> List[Dict[str, Any]]:
        all_handles = [s["handle"].lstrip('@') for s in sources]
        user_id_map = self._batch_resolve_user_ids(all_handles)

        results: List[Dict[str, Any]] = []
        total = len(sources)
        done = 0
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
            futures = {}
            for source in sources:
                handle = source["handle"].lstrip('@')
                resolved_id = user_id_map.get(handle.lower())
                futures[pool.submit(self._fetch_user_tweets, source, cutoff, resolved_id)] = source

            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                done += 1
                if result["status"] == "ok":
                    logging.info(f"[{done}/{total}] βœ… @{result['handle']}: {result['count']} tweets"
                                 + (f" (top: {result['articles'][0]['metrics']['like_count']}❀️)" if result.get('articles') else ""))
                else:
                    logging.warning(f"[{done}/{total}] ❌ @{result['handle']}: {result.get('error','unknown')}")

        return results


class TwitterApiIoBackend(TwitterBackend):
    """twitterapi.io backend."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self._limiter = RateLimiter(qps=5)

    @staticmethod
    def _parse_date(date_str: str) -> Optional[datetime]:
        """Parse twitterapi.io date format: 'Tue Dec 10 07:00:30 +0000 2024'."""
        try:
            return datetime.strptime(date_str, "%a %b %d %H:%M:%S %z %Y")
        except (ValueError, TypeError):
            logging.debug(f"Failed to parse twitterapi.io date: {date_str}")
            return None

    def _parse_tweets_page(self, tweets: list, handle: str, topics: list, cutoff: datetime) -> list:
        """Parse a page of tweets into article dicts."""
        articles = []
        for tweet in tweets:
            # Skip retweets
            if tweet.get("retweeted_tweet"):
                continue
            created_at = self._parse_date(tweet.get("createdAt", ""))
            if not created_at or created_at < cutoff:
                continue

            text = tweet.get("text", "")
            if text.startswith("RT @"):
                continue

            tweet_id = tweet.get("id", "")
            link = tweet.get("url") or f"https://twitter.com/{handle}/status/{tweet_id}"

            articles.append({
                "title": clean_tweet_text(text),
                "link": link,
                "date": created_at.isoformat(),
                "topics": topics[:],
                "metrics": {
                    "like_count": tweet.get("likeCount", 0),
                    "retweet_count": tweet.get("retweetCount", 0),
                    "reply_count": tweet.get("replyCount", 0),
                    "quote_count": tweet.get("quoteCount", 0),
                    "impression_count": tweet.get("viewCount", 0),
                },
                "tweet_id": tweet_id,
            })
        return articles

    def _fetch_user_tweets(self, source: Dict[str, Any], cutoff: datetime) -> Dict[str, Any]:
        handle = source["handle"].lstrip('@')
        topics = source["topics"]

        for attempt in range(RETRY_COUNT + 1):
            try:
                params = urlencode({
                    "userName": handle,
                    "includeReplies": "false",
                })
                url = f"{TWITTERAPIIO_BASE}/twitter/user/last_tweets?{params}"
                headers = {
                    "X-API-Key": self.api_key,
                    "User-Agent": "MediaDigest/2.0",
                }

                self._limiter.wait()

                req = Request(url, headers=headers)
                with urlopen(req, timeout=TIMEOUT) as resp:
                    raw = json.loads(resp.read().decode())

                # API wraps response in {"data": {...}} envelope
                data = raw.get("data", raw)

                articles = self._parse_tweets_page(
                    data.get("tweets", []), handle, topics, cutoff
                )

                # Pagination: fetch one more page if available and all tweets still in window
                has_next = data.get("has_next_page", False)
                next_cursor = data.get("next_cursor")
                if has_next and next_cursor and articles:
                    oldest = min(a["date"] for a in articles)
                    if oldest >= cutoff.isoformat():
                        self._limiter.wait()
                        page2_params = urlencode({
                            "userName": handle,
                            "includeReplies": "false",
                            "cursor": next_cursor,
                        })
                        page2_url = f"{TWITTERAPIIO_BASE}/twitter/user/last_tweets?{page2_params}"
                        req2 = Request(page2_url, headers=headers)
                        with urlopen(req2, timeout=TIMEOUT) as resp2:
                            raw2 = json.loads(resp2.read().decode())
                        data2 = raw2.get("data", raw2)
                        articles.extend(self._parse_tweets_page(
                            data2.get("tweets", []), handle, topics, cutoff
                        ))
                        has_next = data2.get("has_next_page", False)

                # Truncation warning
                if has_next and articles:
                    oldest = min(a["date"] for a in articles)
                    if oldest >= cutoff.isoformat():
                        logging.warning(f"@{handle}: results may be truncated ({len(articles)} tweets, more available)")

                return self._make_result(source, articles, attempt)

            except HTTPError as e:
                if e.code == 429:
                    error_msg = "Rate limit exceeded"
                    logging.warning(f"Rate limit hit for @{handle}, attempt {attempt + 1}")
                    if attempt < RETRY_COUNT:
                        time.sleep(5)
                        continue
                else:
                    error_msg = f"HTTP {e.code}: {e.reason}"

            except Exception as e:
                error_msg = str(e)[:100]
                logging.debug(f"Attempt {attempt + 1} failed for @{handle}: {error_msg}")

            if attempt < RETRY_COUNT:
                time.sleep(RETRY_DELAY * (2 ** attempt))
                continue

            return self._make_error(source, error_msg, attempt)

    def fetch_all(self, sources: List[Dict[str, Any]], cutoff: datetime) -> List[Dict[str, Any]]:
        results: List[Dict[str, Any]] = []
        total = len(sources)
        done = 0
        with ThreadPoolExecutor(max_workers=3) as pool:
            futures = {pool.submit(self._fetch_user_tweets, source, cutoff): source
                       for source in sources}
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
                done += 1
                if result["status"] == "ok":
                    logging.info(f"[{done}/{total}] βœ… @{result['handle']}: {result['count']} tweets"
                                 + (f" (top: {result['articles'][0]['metrics']['like_count']}❀️)" if result['articles'] else ""))
                else:
                    logging.warning(f"[{done}/{total}] ❌ @{result['handle']}: {result['error']}")

        return results


# ---------------------------------------------------------------------------
# Backend selection
# ---------------------------------------------------------------------------

def select_backend(backend_name: str, no_cache: bool = False) -> Optional[TwitterBackend]:
    """Select and instantiate the appropriate backend.

    Returns None if no credentials are available for the chosen backend.
    """
    if backend_name == "twitterapiio":
        key = os.getenv("TWITTERAPI_IO_KEY")
        if not key:
            logging.error("TWITTERAPI_IO_KEY not set (required for twitterapiio backend)")
            return None
        logging.info("Using twitterapi.io backend")
        return TwitterApiIoBackend(key)

    if backend_name == "official":
        token = os.getenv("X_BEARER_TOKEN")
        if not token:
            logging.error("X_BEARER_TOKEN not set (required for official backend)")
            return None
        logging.info("Using official X API v2 backend")
        return OfficialBackend(token, no_cache=no_cache)

    # auto: try twitterapiio first, then official
    if backend_name == "auto":
        key = os.getenv("TWITTERAPI_IO_KEY")
        if key:
            logging.info("Auto-selected twitterapi.io backend (TWITTERAPI_IO_KEY set)")
            return TwitterApiIoBackend(key)
        token = os.getenv("X_BEARER_TOKEN")
        if token:
            logging.info("Auto-selected official X API v2 backend (X_BEARER_TOKEN set)")
            return OfficialBackend(token, no_cache=no_cache)
        logging.warning("No Twitter API credentials found (checked TWITTERAPI_IO_KEY, X_BEARER_TOKEN)")
        return None

    logging.error(f"Unknown backend: {backend_name}")
    return None


# ---------------------------------------------------------------------------
# Source loading
# ---------------------------------------------------------------------------

def load_twitter_sources(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
    """Load Twitter sources from unified configuration with overlay support."""
    try:
        from config_loader import load_merged_sources
    except ImportError:
        # Fallback for relative import
        import sys
        sys.path.append(str(Path(__file__).parent))
        from config_loader import load_merged_sources

    # Load merged sources from defaults + optional user overlay
    all_sources = load_merged_sources(defaults_dir, config_dir)

    # Filter Twitter sources that are enabled
    twitter_sources = []
    for source in all_sources:
        if source.get("type") == "twitter" and source.get("enabled", True):
            if not source.get("handle"):
                logging.warning(f"Twitter source {source.get('id')} missing handle, skipping")
                continue
            twitter_sources.append(source)

    logging.info(f"Loaded {len(twitter_sources)} enabled Twitter sources")
    return twitter_sources


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
    """Main Twitter fetching function."""
    parser = argparse.ArgumentParser(
        description="Fetch recent tweets from Twitter/X KOL accounts. "
                   "Supports official X API v2 and twitterapi.io backends.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    export X_BEARER_TOKEN="your_token_here"
    python3 fetch-twitter.py
    python3 fetch-twitter.py --defaults config/defaults --config workspace/config --hours 24 -o results.json
    python3 fetch-twitter.py --backend twitterapiio  # use twitterapi.io
    python3 fetch-twitter.py --config workspace/config --verbose  # backward compatibility
        """
    )

    parser.add_argument(
        "--defaults",
        type=Path,
        default=Path("config/defaults"),
        help="Default configuration directory with skill defaults (default: config/defaults)"
    )

    parser.add_argument(
        "--config",
        type=Path,
        help="User configuration directory for overlays (optional)"
    )

    parser.add_argument(
        "--hours",
        type=int,
        default=48,
        help="Time window in hours for tweets (default: 48)"
    )

    parser.add_argument(
        "--output", "-o",
        type=Path,
        help="Output JSON path (default: auto-generated temp file)"
    )

    parser.add_argument(
        "--verbose", "-v",
        action="store_true",
        help="Enable verbose logging"
    )

    parser.add_argument(
        "--no-cache",
        action="store_true",
        help="Bypass username→ID cache (official backend only)"
    )

    parser.add_argument(
        "--force",
        action="store_true",
        help="Force re-fetch even if cached output exists"
    )

    parser.add_argument(
        "--backend",
        choices=["official", "twitterapiio", "auto"],
        default=None,
        help="Twitter API backend (overrides TWITTER_API_BACKEND env var). "
             "auto = twitterapiio if TWITTERAPI_IO_KEY set, else official if X_BEARER_TOKEN set"
    )

    args = parser.parse_args()
    logger = setup_logging(args.verbose)

    # Resume support: skip if output exists, is valid JSON, and < 1 hour old
    if args.output and args.output.exists() and not args.force:
        try:
            age_seconds = time.time() - args.output.stat().st_mtime
            if age_seconds < 3600:
                with open(args.output, 'r') as f:
                    json.load(f)
                logger.info(f"Skipping (cached output exists): {args.output}")
                return 0
        except (json.JSONDecodeError, OSError):
            pass

    # Resolve backend: CLI arg > env var > default (auto)
    backend_name = args.backend or os.getenv("TWITTER_API_BACKEND", "auto")

    backend = select_backend(backend_name, no_cache=args.no_cache)
    if not backend:
        logger.warning("No Twitter backend available. Writing empty result and skipping Twitter fetch.")
        empty_result = {
            "generated": datetime.now(timezone.utc).isoformat(),
            "source_type": "twitter",
            "backend": backend_name,
            "hours": args.hours,
            "sources_total": 0,
            "sources_ok": 0,
            "total_articles": 0,
            "sources": [],
            "skipped_reason": f"No credentials for backend '{backend_name}'"
        }
        output_path = args.output or Path("/tmp/md-twitter.json")
        with open(output_path, "w") as f:
            json.dump(empty_result, f, indent=2)
        print(f"Output (empty): {output_path}")
        return 0

    # Auto-generate unique output path if not specified
    if not args.output:
        fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-twitter-", suffix=".json")
        os.close(fd)
        args.output = Path(temp_path)

    try:
        cutoff = datetime.now(timezone.utc) - timedelta(hours=args.hours)

        # Backward compatibility: if only --config provided, use old behavior
        if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
            logger.debug("Backward compatibility mode: using --config as sole source")
            sources = load_twitter_sources(args.config, None)
        else:
            sources = load_twitter_sources(args.defaults, args.config)

        if not sources:
            logger.warning("No Twitter sources found or all disabled")

        logger.info(f"Fetching {len(sources)} Twitter accounts (window: {args.hours}h, backend: {backend_name})")

        results = backend.fetch_all(sources, cutoff)

        # Sort: priority first, then by article count
        results.sort(key=lambda x: (not x.get("priority", False), -x.get("count", 0)))

        ok_count = sum(1 for r in results if r["status"] == "ok")
        total_tweets = sum(r.get("count", 0) for r in results)

        output = {
            "generated": datetime.now(timezone.utc).isoformat(),
            "source_type": "twitter",
            "backend": backend_name,
            "defaults_dir": str(args.defaults),
            "config_dir": str(args.config) if args.config else None,
            "hours": args.hours,
            "sources_total": len(results),
            "sources_ok": ok_count,
            "total_articles": total_tweets,
            "sources": results,
        }

        # Write output
        json_str = json.dumps(output, ensure_ascii=False, indent=2)
        with open(args.output, "w", encoding='utf-8') as f:
            f.write(json_str)

        logger.info(f"βœ… Done: {ok_count}/{len(results)} accounts ok, "
                   f"{total_tweets} tweets β†’ {args.output}")

        return 0

    except Exception as e:
        logger.error(f"πŸ’₯ Twitter fetch failed: {e}")
        return 1


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/fetch-web.py

```python
#!/usr/bin/env python3
"""
Fetch web search results for tech digest topics.

Reads topics.json, performs web searches for each topic's search queries,
and outputs structured JSON with search results tagged by topics.

Usage:
    python3 fetch-web.py [--config CONFIG_DIR] [--freshness 48h] [--output FILE] [--verbose]

Note: This script can use Brave Search API if BRAVE_API_KEY is set, otherwise
it provides a JSON interface for agents to use web_search tool.
"""

import json
import sys
import os
import argparse
import logging
import time
import tempfile
import re
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from urllib.request import urlopen, Request
from urllib.error import HTTPError
from urllib.parse import urlencode

TIMEOUT = 30
MAX_RESULTS_PER_QUERY = 5
RETRY_COUNT = 1
RETRY_DELAY = 2.0

# Brave Search API
BRAVE_API_BASE = "https://api.search.brave.com/res/v1/web/search"
TAVILY_API_BASE = "https://api.tavily.com/search"
BRAVE_RATE_LIMIT_CACHE = "/tmp/media-news-digest-brave-rate-limit.json"


def setup_logging(verbose: bool) -> logging.Logger:
    """Setup logging configuration."""
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    return logging.getLogger(__name__)


def get_brave_api_keys() -> List[str]:
    """Get Brave Search API keys from environment.
    
    Supports multiple keys via comma-separated BRAVE_API_KEYS (preferred)
    or BRAVE_API_KEY (single key fallback):
        export BRAVE_API_KEYS="key1,key2,key3"
        export BRAVE_API_KEY="key1"  # fallback for single key
    """
    raw = os.getenv('BRAVE_API_KEYS', '') or os.getenv('BRAVE_API_KEY', '')
    if not raw:
        return []
    return [k.strip() for k in raw.split(',') if k.strip()]


def get_brave_api_key() -> Optional[str]:
    """Get first available Brave API key (legacy compat)."""
    keys = get_brave_api_keys()
    return keys[0] if keys else None


def _probe_brave_key(api_key: str) -> Dict[str, Any]:
    """Probe a single Brave API key. Returns {qps, workers, exhausted, error}."""
    try:
        params = urlencode({'q': 'test', 'count': 1})
        url = f"{BRAVE_API_BASE}?{params}"
        req = Request(url, headers={
            'Accept': 'application/json',
            'X-Subscription-Token': api_key,
            'User-Agent': 'MediaDigest/2.0'
        })
        with urlopen(req, timeout=TIMEOUT) as resp:
            limit_header = resp.headers.get('x-ratelimit-limit', '1')
            remaining = resp.headers.get('x-ratelimit-remaining', '')
            per_second = int(limit_header.split(',')[0].strip())
            resp.read()

        exhausted = False
        if remaining.isdigit() and int(remaining) == 0:
            exhausted = True

        workers = min(per_second // 2, 5) if per_second >= 10 else 1
        return {'qps': per_second, 'workers': workers, 'exhausted': exhausted, 'error': None}
    except HTTPError as e:
        if e.code == 429:
            return {'qps': 1, 'workers': 1, 'exhausted': True, 'error': '429 rate limited'}
        return {'qps': 1, 'workers': 1, 'exhausted': False, 'error': f'HTTP {e.code}'}
    except Exception as e:
        return {'qps': 1, 'workers': 1, 'exhausted': False, 'error': str(e)}


def select_brave_key_and_limits(keys: List[str]) -> Tuple[Optional[str], int, int]:
    """Select the best available Brave API key and detect rate limits.
    
    Tries each key in order. Skips exhausted keys (cached for 24h).
    Returns (api_key, max_qps, max_workers) or (None, 0, 0) if all exhausted.
    """
    if not keys:
        return None, 0, 0

    # Override via env var
    brave_plan = os.getenv('BRAVE_PLAN', '').lower()
    plan_qps = None
    if brave_plan == 'free':
        plan_qps, plan_workers = 1, 1
    elif brave_plan == 'pro':
        plan_qps, plan_workers = 15, 5

    # Load cache
    cache = {}
    try:
        with open(BRAVE_RATE_LIMIT_CACHE, 'r') as f:
            cache = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError, OSError):
        pass

    now = time.time()
    key_cache = cache.get('keys', {})

    for i, key in enumerate(keys):
        key_id = f"key_{i}"  # Don't log actual keys
        cached = key_cache.get(key_id, {})
        cache_age = now - cached.get('ts', 0)

        # Use cache if fresh (24h)
        if cache_age < 86400 and cached.get('exhausted'):
            logging.debug(f"Brave {key_id}: exhausted (cached), skipping")
            continue

        if plan_qps is not None:
            logging.info(f"Using Brave {key_id} with BRAVE_PLAN={brave_plan} override: {plan_qps} QPS")
            return key, plan_qps, plan_workers

        if cache_age < 86400 and 'qps' in cached and not cached.get('exhausted'):
            qps = cached['qps']
            workers = cached['workers']
            logging.info(f"Using Brave {key_id} (cached): {qps} QPS, {workers} workers")
            return key, qps, workers

        # Probe
        result = _probe_brave_key(key)
        key_cache[key_id] = {'ts': now, **result}

        if result['exhausted']:
            logging.warning(f"Brave {key_id}: exhausted ({result.get('error', 'quota reached')}), trying next")
            continue

        if result['error']:
            logging.warning(f"Brave {key_id}: probe error ({result['error']}), trying next")
            continue

        logging.info(f"Using Brave {key_id}: {result['qps']} QPS, {result['workers']} workers")
        # Save cache
        try:
            cache['keys'] = key_cache
            with open(BRAVE_RATE_LIMIT_CACHE, 'w') as f:
                json.dump(cache, f)
        except OSError:
            pass
        return key, result['qps'], result['workers']

    # All keys exhausted
    logging.warning("All Brave API keys exhausted or errored")
    # Save cache
    try:
        cache['keys'] = key_cache
        with open(BRAVE_RATE_LIMIT_CACHE, 'w') as f:
            json.dump(cache, f)
    except OSError:
        pass
    return None, 0, 0


def detect_brave_rate_limit(api_key: str) -> Tuple[int, int]:
    """Legacy wrapper: detect rate limit for a single key."""
    _, qps, workers = select_brave_key_and_limits([api_key])
    return max(qps, 1), max(workers, 1)


def search_brave(query: str, api_key: str, freshness: Optional[str] = None) -> Dict[str, Any]:
    """Perform search using Brave Search API."""
    params = {
        'q': query,
        'count': MAX_RESULTS_PER_QUERY,
        'search_lang': 'en',
        'country': 'ALL',
        'safesearch': 'moderate',
        'text_decorations': 'false'
    }
    
    if freshness:
        params['freshness'] = freshness
    
    url = f"{BRAVE_API_BASE}?{urlencode(params)}"
    headers = {
        'Accept': 'application/json',
        'X-Subscription-Token': api_key,
        'User-Agent': 'MediaDigest/2.0'
    }
    
    try:
        req = Request(url, headers=headers)
        with urlopen(req, timeout=TIMEOUT) as resp:
            raw = resp.read()
            # Handle gzip if server sends it anyway
            if raw[:2] == b'\x1f\x8b':
                import gzip
                raw = gzip.decompress(raw)
            data = json.loads(raw.decode())
            
        results = []
        if 'web' in data and 'results' in data['web']:
            for result in data['web']['results']:
                results.append({
                    'title': result.get('title', ''),
                    'link': result.get('url', ''),
                    'snippet': result.get('description', ''),
                    'date': datetime.now(timezone.utc).isoformat()  # Search timestamp
                })
                
        return {
            'status': 'ok',
            'query': query,
            'results': results,
            'total': len(results)
        }
        
    except Exception as e:
        return {
            'status': 'error',
            'query': query,
            'error': str(e)[:100],
            'results': [],
            'total': 0
        }


def filter_content(text: str, must_include: List[str], exclude: List[str]) -> bool:
    """Check if content matches inclusion/exclusion criteria."""
    text_lower = text.lower()

    # Check must_include (any match)
    if must_include:
        has_required = any(keyword.lower() in text_lower for keyword in must_include)
        if not has_required:
            return False

    # Check exclude (any match disqualifies)
    if exclude:
        has_excluded = any(keyword.lower() in text_lower for keyword in exclude)
        if has_excluded:
            return False
            
    return True


def search_topic_brave(topic: Dict[str, Any], api_key: str, freshness: Optional[str] = None,
                       max_workers: int = 1, delay: float = 0.5) -> Dict[str, Any]:
    """Search all queries for a topic using Brave API.
    
    Args:
        max_workers: Number of parallel search threads (1 = sequential)
        delay: Delay between requests in sequential mode (ignored when parallel)
    """
    topic_id = topic["id"]
    queries = topic["search"]["queries"]
    must_include = topic["search"].get("must_include", [])
    exclude = topic["search"].get("exclude", [])
    
    all_results = []
    query_stats = []
    
    if max_workers > 1:
        with ThreadPoolExecutor(max_workers=max_workers) as pool:
            futures = {pool.submit(search_brave, q, api_key, freshness): q for q in queries}
            for future in as_completed(futures):
                search_result = future.result()
                query_stats.append({
                    'query': search_result['query'],
                    'status': search_result['status'],
                    'count': search_result['total']
                })
                if search_result['status'] == 'ok':
                    for result in search_result['results']:
                        combined_text = f"{result['title']} {result['snippet']}"
                        if filter_content(combined_text, must_include, exclude):
                            result['topics'] = [topic_id]
                            all_results.append(result)
    else:
        for query in queries:
            search_result = search_brave(query, api_key, freshness)
            query_stats.append({
                'query': query,
                'status': search_result['status'],
                'count': search_result['total']
            })
            if search_result['status'] == 'ok':
                for result in search_result['results']:
                    combined_text = f"{result['title']} {result['snippet']}"
                    if filter_content(combined_text, must_include, exclude):
                        result['topics'] = [topic_id]
                        all_results.append(result)
            time.sleep(delay)
    
    return {
        'topic_id': topic_id,
        'status': 'ok',
        'queries_executed': len(queries),
        'queries_ok': sum(1 for q in query_stats if q['status'] == 'ok'),
        'query_stats': query_stats,
        'count': len(all_results),
        'articles': all_results
    }



def get_tavily_api_key() -> Optional[str]:
    """Get Tavily API key from environment."""
    return os.getenv('TAVILY_API_KEY', '').strip() or None


def search_tavily(query: str, api_key: str, topic: str = "general",
                  max_results: int = 10, search_depth: str = "basic",
                  days: Optional[int] = None) -> Dict[str, Any]:
    """Perform search using Tavily Search API.
    
    Args:
        topic: 'general' or 'news' (news for real-time updates)
        days: Limit results to the last N days (None = no limit)
    """
    payload = {
        "api_key": api_key,
        "query": query,
        "search_depth": search_depth,
        "topic": topic,
        "max_results": max_results,
        "include_answer": False,
    }
    if days is not None:
        payload["days"] = days

    try:
        data = json.dumps(payload).encode()
        req = Request(TAVILY_API_BASE, data=data, headers={
            "Content-Type": "application/json",
            "User-Agent": "MediaDigest/3.0"
        }, method="POST")
        with urlopen(req, timeout=TIMEOUT) as resp:
            result = json.loads(resp.read().decode())

        articles = []
        for r in result.get("results", []):
            articles.append({
                "title": r.get("title", ""),
                "link": r.get("url", ""),
                "snippet": r.get("content", "")[:300],
                "date": r.get("published_date", ""),
                "source": "tavily",
            })

        return {
            "query": query,
            "status": "ok",
            "total": len(articles),
            "results": articles,
        }
    except HTTPError as e:
        logging.warning(f"Tavily search error for '{query}': HTTP {e.code}")
        return {"query": query, "status": "error", "total": 0, "results": [], "error": f"HTTP {e.code}"}
    except Exception as e:
        logging.warning(f"Tavily search error for '{query}': {e}")
        return {"query": query, "status": "error", "total": 0, "results": [], "error": str(e)}


def search_topic_tavily(topic: Dict[str, Any], api_key: str, days: Optional[int] = None) -> Dict[str, Any]:
    """Search all queries for a topic using Tavily API."""
    topic_id = topic["id"]
    queries = topic["search"]["queries"]
    must_include = topic["search"].get("must_include", [])
    exclude = topic["search"].get("exclude", [])

    all_results = []
    query_stats = []

    for query in queries:
        search_result = search_tavily(query, api_key, topic="news", days=days)
        query_stats.append({
            "query": search_result["query"],
            "status": search_result["status"],
            "count": search_result["total"],
        })
        if search_result["status"] == "ok":
            for result in search_result["results"]:
                combined_text = f"{result['title']} {result['snippet']}"
                if filter_content(combined_text, must_include, exclude):
                    result["topics"] = [topic_id]
                    all_results.append(result)

    ok_count = sum(1 for s in query_stats if s["status"] == "ok")
    return {
        "topic": topic_id,
        "status": "ok" if ok_count > 0 else "error",
        "queries": len(queries),
        "queries_ok": ok_count,
        "count": len(all_results),
        "articles": all_results,
        "query_details": query_stats,
    }


def generate_search_interface(topic: Dict[str, Any]) -> Dict[str, Any]:
    """Generate JSON interface for agent web search."""
    topic_id = topic["id"]
    queries = topic["search"]["queries"]
    must_include = topic["search"].get("must_include", [])
    exclude = topic["search"].get("exclude", [])
    
    return {
        'topic_id': topic_id,
        'status': 'interface',
        'search_required': True,
        'queries': queries,
        'filters': {
            'must_include': must_include,
            'exclude': exclude
        },
        'instructions': [
            f"Use web_search tool for each query in 'queries' list",
            f"Filter results using 'filters.must_include' and 'filters.exclude'",
            f"Tag matching articles with topic: '{topic_id}'",
            f"Expected max results per query: {MAX_RESULTS_PER_QUERY}"
        ],
        'count': 0,
        'articles': []
    }


def load_topics(defaults_dir: Path, config_dir: Optional[Path] = None) -> List[Dict[str, Any]]:
    """Load topics from configuration with overlay support."""
    try:
        from config_loader import load_merged_topics
    except ImportError:
        # Fallback for relative import
        import sys
        sys.path.append(str(Path(__file__).parent))
        from config_loader import load_merged_topics
    
    # Load merged topics from defaults + optional user overlay
    topics = load_merged_topics(defaults_dir, config_dir)
    logging.info(f"Loaded {len(topics)} topics for web search")
    return topics


def convert_freshness(hours: int) -> str:
    """Convert hours to Brave API freshness format."""
    if hours <= 24:
        return "pd"  # past day
    elif hours <= 168:  # 7 days
        return "pw"  # past week
    elif hours <= 720:  # 30 days
        return "pm"  # past month
    else:
        return "py"  # past year


def main():
    """Main web search function."""
    parser = argparse.ArgumentParser(
        description="Perform web searches for tech digest topics. "
                   "Can use Brave Search API (BRAVE_API_KEY) or generate interface for agents.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    # With Brave API
    export BRAVE_API_KEY="your_key_here"
    python3 fetch-web.py --defaults config/defaults --config workspace/config --freshness 24h
    
    # Without API (generates interface)
    python3 fetch-web.py --config workspace/config --output web-search-interface.json  # backward compatibility
        """
    )
    
    parser.add_argument(
        "--defaults",
        type=Path,
        default=Path("config/defaults"),
        help="Default configuration directory with skill defaults (default: config/defaults)"
    )
    
    parser.add_argument(
        "--config",
        type=Path,
        help="User configuration directory for overlays (optional)"
    )
    
    parser.add_argument(
        "--freshness",
        default="48h",
        help="Search freshness: 24h, 48h, 1w, 1m (default: 48h)"
    )
    
    parser.add_argument(
        "--output", "-o",
        type=Path,
        help="Output JSON path (default: auto-generated temp file)"
    )
    
    parser.add_argument(
        "--verbose", "-v",
        action="store_true",
        help="Enable verbose logging"
    )
    
    parser.add_argument(
        "--force",
        action="store_true",
        help="Force re-fetch even if cached output exists"
    )
    
    args = parser.parse_args()
    logger = setup_logging(args.verbose)
    
    # Resume support: skip if output exists, is valid JSON, and < 1 hour old
    if args.output and args.output.exists() and not args.force:
        try:
            age_seconds = time.time() - args.output.stat().st_mtime
            if age_seconds < 3600:
                with open(args.output, 'r') as f:
                    json.load(f)
                logger.info(f"Skipping (cached output exists): {args.output}")
                return 0
        except (json.JSONDecodeError, OSError):
            pass
    
    # Auto-generate unique output path if not specified
    if not args.output:
        fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-web-", suffix=".json")
        os.close(fd)
        args.output = Path(temp_path)
    
    try:
        # Backward compatibility: if only --config provided, use old behavior
        if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
            logger.debug("Backward compatibility mode: using --config as sole source")
            topics = load_topics(args.config, None)
        else:
            topics = load_topics(args.defaults, args.config)
        
        if not topics:
            logger.warning("No topics found")
            return 1
            
        # Backend selection: WEB_SEARCH_BACKEND env or auto-detect
        web_backend = os.getenv('WEB_SEARCH_BACKEND', 'auto').lower()
        tavily_key = get_tavily_api_key()
        brave_keys = get_brave_api_keys()
        
        use_tavily = False
        use_brave = False
        api_key = None
        max_qps = 1
        max_workers = 1
        
        if web_backend == 'tavily' and tavily_key:
            use_tavily = True
        elif web_backend == 'brave' and brave_keys:
            api_key, max_qps, max_workers = select_brave_key_and_limits(brave_keys)
            use_brave = bool(api_key)
        elif web_backend == 'auto':
            if tavily_key:
                use_tavily = True
            elif brave_keys:
                api_key, max_qps, max_workers = select_brave_key_and_limits(brave_keys)
                use_brave = bool(api_key)
        
        if use_tavily:
            logger.info(f"Using Tavily Search API for {len(topics)} topics")
            
            # Convert freshness to days for Tavily
            tavily_days = None
            if args.freshness in ('pd',): tavily_days = 1
            elif args.freshness in ('pw',): tavily_days = 7
            elif args.freshness in ('pm',): tavily_days = 30
            elif args.freshness in ('py',): tavily_days = 365
            else:
                try:
                    tavily_days = max(1, int(args.freshness.rstrip('h')) // 24)
                except (ValueError, AttributeError):
                    tavily_days = 2
            
            results = []
            for topic in topics:
                if not topic.get("search", {}).get("queries"):
                    logger.debug(f"Topic {topic['id']} has no search queries, skipping")
                    continue
                logger.debug(f"Searching topic: {topic['id']}")
                result = search_topic_tavily(topic, tavily_key, days=tavily_days)
                results.append(result)
            
            total_articles = sum(r.get("count", 0) for r in results)
            ok_topics = sum(1 for r in results if r["status"] == "ok")
            
            output = {
                "generated": datetime.now(timezone.utc).isoformat(),
                "source_type": "web",
                "defaults_dir": str(args.defaults),
                "config_dir": str(args.config) if args.config else None,
                "freshness": args.freshness,
                "api_used": "tavily",
                "topics_total": len(topics),
                "topics_ok": ok_topics,
                "total_articles": total_articles,
                "topics": results,
            }
            
            with open(args.output, 'w', encoding='utf-8') as f:
                json.dump(output, f, indent=2, ensure_ascii=False)
            
            logger.info(f"\u2705 Done: {ok_topics}/{len(topics)} topics ok, {total_articles} articles β†’ {args.output}")
            return 0
        
        elif use_brave:
            logger.info(f"Using Brave Search API for {len(topics)} topics ({len(brave_keys)} key(s) configured)")
            
            delay = 1.0 / max_qps if max_workers == 1 else 0
            
            # Convert freshness to Brave API format
            # Accept both Brave native (pd/pw/pm/py) and human-friendly (24h/48h/1w/1m)
            if args.freshness in ('pd', 'pw', 'pm', 'py'):
                brave_freshness = args.freshness
            else:
                freshness_map = {'1w': 168, '1m': 720, '1y': 8760}
                if args.freshness in freshness_map:
                    freshness_hours = freshness_map[args.freshness]
                else:
                    try:
                        freshness_hours = int(args.freshness.rstrip('h'))
                    except ValueError:
                        logger.warning(f"Unrecognized freshness format '{args.freshness}', defaulting to 48h")
                        freshness_hours = 48
                brave_freshness = convert_freshness(freshness_hours)
            
            results = []
            for topic in topics:
                if not topic.get("search", {}).get("queries"):
                    logger.debug(f"Topic {topic['id']} has no search queries, skipping")
                    continue
                    
                logger.debug(f"Searching topic: {topic['id']}")
                result = search_topic_brave(topic, api_key, brave_freshness,
                                           max_workers=max_workers, delay=delay)
                results.append(result)
            
            total_articles = sum(r.get("count", 0) for r in results)
            ok_topics = sum(1 for r in results if r["status"] == "ok")
            
            output = {
                "generated": datetime.now(timezone.utc).isoformat(),
                "source_type": "web",
                "defaults_dir": str(args.defaults),
                "config_dir": str(args.config) if args.config else None,
                "freshness": args.freshness,
                "api_used": "brave",
                "topics_total": len(results),
                "topics_ok": ok_topics,
                "total_articles": total_articles,
                "topics": results
            }
            
            logger.info(f"βœ… Searched {ok_topics}/{len(results)} topics, "
                       f"{total_articles} articles found")
            
        else:
            logger.info("No BRAVE_API_KEY found, generating search interface for agents")
            
            results = []
            for topic in topics:
                if not topic.get("search", {}).get("queries"):
                    continue
                result = generate_search_interface(topic)
                results.append(result)
            
            output = {
                "generated": datetime.now(timezone.utc).isoformat(),
                "source_type": "web",
                "defaults_dir": str(args.defaults),
                "config_dir": str(args.config) if args.config else None,
                "freshness": args.freshness,
                "api_used": "interface",
                "topics_total": len(results),
                "topics_ok": 0,  # Requires manual execution
                "total_articles": 0,
                "topics": results,
                "agent_instructions": [
                    "This file contains search interface for web_search tool",
                    "For each topic, execute the queries using web_search",
                    "Apply the filters (must_include/exclude) to results",
                    "Tag matching articles with the topic_id",
                    "Update this file with results for merge-sources.py"
                ]
            }
            
            logger.info(f"βœ… Generated search interface for {len(results)} topics")

        # Write output
        json_str = json.dumps(output, ensure_ascii=False, indent=2)
        with open(args.output, "w", encoding='utf-8') as f:
            f.write(json_str)

        logger.info(f"Output written to: {args.output}")
        return 0
        
    except Exception as e:
        logger.error(f"πŸ’₯ Web search failed: {e}")
        return 1


if __name__ == "__main__":
    sys.exit(main())
```

### scripts/generate-pdf.py

```python
#!/usr/bin/env python3
"""
Generate styled PDF from markdown digest report.

Converts a media-news-digest markdown report into a professional PDF
with Chinese font support, emoji icons, and clean typography.

Usage:
    python3 generate-pdf.py --input /tmp/md-report.md --output /tmp/md-digest.pdf [--verbose]

Requirements:
    - weasyprint (pip install weasyprint)
    - Noto Sans CJK SC font (apt install fonts-noto-cjk)
"""

import argparse
import html
import re
import sys
import logging
from pathlib import Path
from urllib.parse import urlparse


# ---------------------------------------------------------------------------
# Markdown β†’ HTML conversion (with sanitization)
# ---------------------------------------------------------------------------

def escape(text: str) -> str:
    return html.escape(text, quote=True)


def is_safe_url(url: str) -> bool:
    try:
        parsed = urlparse(url.strip())
        return parsed.scheme in ('http', 'https')
    except Exception:
        return False


def _process_inline(text: str) -> str:
    """Process inline markdown with HTML escaping."""
    result = escape(text)

    # Bold: **text**
    result = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', result)

    # Inline code: `text`
    result = re.sub(
        r'`(.+?)`',
        r'<code>\1</code>',
        result
    )

    # Angle-bracket links: <https://...>
    def restore_link(m):
        url = html.unescape(m.group(1))
        if is_safe_url(url):
            escaped_url = escape(url)
            try:
                domain = urlparse(url).netloc
                return f'<a href="{escaped_url}">{escape(domain)}</a>'
            except Exception:
                return f'<a href="{escaped_url}">{escaped_url}</a>'
        return escape(url)

    result = re.sub(r'&lt;(https?://[^&]+?)&gt;', restore_link, result)

    # Markdown links: [text](url)
    def restore_md_link(m):
        label = html.unescape(m.group(1))
        url = html.unescape(m.group(2))
        if is_safe_url(url):
            return f'<a href="{escape(url)}">{escape(label)}</a>'
        return escape(label)

    result = re.sub(r'\[([^\]]+?)\]\(([^)]+?)\)', restore_md_link, result)

    return result


def markdown_to_html(md_content: str) -> str:
    """Convert markdown digest to styled HTML for PDF rendering."""
    lines = md_content.strip().split('\n')
    html_parts = []
    in_list = False

    for line in lines:
        stripped = line.strip()

        if not stripped:
            if in_list:
                html_parts.append('</ul>')
                in_list = False
            continue

        # H1
        if stripped.startswith('# '):
            title = _process_inline(stripped[2:])
            html_parts.append(f'<h1>{title}</h1>')
            continue

        # H2
        if stripped.startswith('## '):
            if in_list:
                html_parts.append('</ul>')
                in_list = False
            section = _process_inline(stripped[3:])
            html_parts.append(f'<h2>{section}</h2>')
            continue

        # H3
        if stripped.startswith('### '):
            if in_list:
                html_parts.append('</ul>')
                in_list = False
            section = _process_inline(stripped[4:])
            html_parts.append(f'<h3>{section}</h3>')
            continue

        # Blockquote
        if stripped.startswith('> '):
            text = _process_inline(stripped[2:])
            html_parts.append(f'<blockquote>{text}</blockquote>')
            continue

        # Horizontal rule
        if stripped == '---':
            html_parts.append('<hr>')
            continue

        # Bullet items
        if stripped.startswith('β€’ ') or stripped.startswith('- '):
            if not in_list:
                html_parts.append('<ul>')
                in_list = True
            item_text = stripped[2:]
            safe_item = _process_inline(item_text)
            html_parts.append(f'<li>{safe_item}</li>')
            continue

        # Angle-bracket link on its own line (often source URLs)
        if stripped.startswith('<http') and in_list:
            url = stripped.strip('<> ')
            if is_safe_url(url):
                escaped_url = escape(url)
                try:
                    domain = urlparse(url).netloc
                    label = escape(domain)
                except Exception:
                    label = escaped_url
                html_parts.append(f'<li class="source-link"><a href="{escaped_url}">{label}</a></li>')
            continue

        # Stats/footer
        if stripped.startswith('πŸ“Š') or stripped.startswith('πŸ€–'):
            text = _process_inline(stripped)
            html_parts.append(f'<p class="footer">{text}</p>')
            continue

        # Regular paragraph
        text = _process_inline(stripped)
        html_parts.append(f'<p>{text}</p>')

    if in_list:
        html_parts.append('</ul>')

    return '\n'.join(html_parts)


# ---------------------------------------------------------------------------
# PDF CSS
# ---------------------------------------------------------------------------

PDF_CSS = """
@page {
    size: A4;
    margin: 2cm 2.5cm;
    @top-center {
        content: "Tech Digest";
        font-size: 9px;
        color: #999;
        font-family: 'Noto Sans CJK SC', 'Noto Sans SC', sans-serif;
    }
    @bottom-center {
        content: counter(page) " / " counter(pages);
        font-size: 9px;
        color: #999;
        font-family: 'Noto Sans CJK SC', 'Noto Sans SC', sans-serif;
    }
}

body {
    font-family: 'Noto Sans CJK SC', 'Noto Sans SC', 'PingFang SC',
                 'Microsoft YaHei', 'Segoe UI', Roboto, sans-serif;
    font-size: 11pt;
    line-height: 1.7;
    color: #1a1a1a;
}

h1 {
    font-size: 22pt;
    color: #111;
    border-bottom: 3px solid #2563eb;
    padding-bottom: 8px;
    margin-bottom: 20px;
    margin-top: 0;
}

h2 {
    font-size: 15pt;
    color: #1e40af;
    margin-top: 28px;
    margin-bottom: 12px;
    padding-bottom: 4px;
    border-bottom: 1px solid #e5e7eb;
}

h3 {
    font-size: 13pt;
    color: #374151;
    margin-top: 20px;
    margin-bottom: 8px;
}

blockquote {
    background: #f0f4ff;
    border-left: 4px solid #2563eb;
    padding: 12px 16px;
    margin: 16px 0;
    color: #374151;
    font-size: 10.5pt;
    border-radius: 0 6px 6px 0;
}

ul {
    padding-left: 20px;
    margin: 8px 0;
}

li {
    margin-bottom: 10px;
    line-height: 1.6;
}

li.source-link {
    list-style: none;
    margin-bottom: 2px;
    margin-top: -6px;
}

li.source-link a {
    font-size: 9pt;
}

a {
    color: #2563eb;
    text-decoration: none;
}

a:hover {
    text-decoration: underline;
}

strong {
    color: #111;
}

code {
    font-family: 'Noto Sans Mono CJK SC', 'SF Mono', 'Fira Code', monospace;
    font-size: 9pt;
    background: #f3f4f6;
    padding: 2px 5px;
    border-radius: 3px;
    color: #6b7280;
}

hr {
    border: none;
    border-top: 1px solid #e5e7eb;
    margin: 28px 0;
}

p.footer {
    font-size: 8.5pt;
    color: #9ca3af;
    margin-top: 4px;
}

/* First page title area */
h1 + blockquote {
    margin-top: 12px;
}

/* Emoji rendering */
body {
    -webkit-font-smoothing: antialiased;
}
"""


# ---------------------------------------------------------------------------
# HTML wrapper
# ---------------------------------------------------------------------------

def wrap_html(body: str) -> str:
    return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<style>
{PDF_CSS}
</style>
</head>
<body>
{body}
</body>
</html>"""


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
    parser = argparse.ArgumentParser(
        description="Generate styled PDF from markdown digest report",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""\
Examples:
    python3 generate-pdf.py -i /tmp/md-report.md -o /tmp/md-digest.pdf
    python3 generate-pdf.py -i report.md -o digest.pdf --verbose

Requirements:
    pip install weasyprint
    apt install fonts-noto-cjk  (for Chinese support)
"""
    )
    parser.add_argument("--input", "-i", required=True, help="Input markdown file")
    parser.add_argument("--output", "-o", required=True, help="Output PDF file")
    parser.add_argument("--verbose", "-v", action="store_true")
    args = parser.parse_args()

    logging.basicConfig(
        level=logging.DEBUG if args.verbose else logging.INFO,
        format="%(levelname)s: %(message)s"
    )

    try:
        import weasyprint
    except ImportError:
        logging.error("weasyprint not installed. Run: pip install weasyprint")
        sys.exit(1)

    input_path = Path(args.input)
    if not input_path.exists():
        logging.error(f"Input file not found: {args.input}")
        sys.exit(1)

    md_content = input_path.read_text(encoding='utf-8')
    logging.info(f"Converting {args.input} ({len(md_content)} chars)")

    # Convert markdown β†’ HTML β†’ PDF
    body_html = markdown_to_html(md_content)
    full_html = wrap_html(body_html)

    # Optionally save intermediate HTML for debugging
    if args.verbose:
        html_debug = Path(args.output).with_suffix('.html')
        html_debug.write_text(full_html, encoding='utf-8')
        logging.debug(f"Debug HTML saved: {html_debug}")

    # Generate PDF
    logging.info("Generating PDF...")
    doc = weasyprint.HTML(string=full_html)
    doc.write_pdf(args.output)

    output_size = Path(args.output).stat().st_size
    logging.info(f"βœ… PDF generated: {args.output} ({output_size / 1024:.0f} KB)")


if __name__ == "__main__":
    main()

```

### scripts/merge-sources.py

```python
#!/usr/bin/env python3
"""
Merge data from all sources (RSS, Twitter, Web) with quality scoring.

Reads output from fetch-rss.py, fetch-twitter.py, and fetch-web.py,
merges articles, removes duplicates, applies quality scoring, and
groups by topics for final digest output.

Usage:
    python3 merge-sources.py [--rss FILE] [--twitter FILE] [--web FILE] [--output FILE] [--verbose]
"""

import json
import sys
import os
import argparse
import logging
import tempfile
import re
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional, Set
from difflib import SequenceMatcher
from urllib.parse import urlparse

# Quality scoring weights
SCORE_MULTI_SOURCE = 5      # Article appears in multiple sources
SCORE_PRIORITY_SOURCE = 3   # From high-priority source
SCORE_RECENT = 2            # Recent article (< 24h)
SCORE_ENGAGEMENT_VIRAL = 5   # Viral tweet (1000+ likes or 500+ RTs)
SCORE_ENGAGEMENT_HIGH = 3    # High engagement (500+ likes or 200+ RTs)
SCORE_ENGAGEMENT_MED = 2     # Medium engagement (100+ likes or 50+ RTs)
SCORE_ENGAGEMENT_LOW = 1     # Some engagement (50+ likes or 20+ RTs)
PENALTY_DUPLICATE = -10     # Duplicate/very similar title
PENALTY_OLD_REPORT = -5     # Already in previous digest

# Deduplication thresholds
TITLE_SIMILARITY_THRESHOLD = 0.85
DOMAIN_DUPLICATE_THRESHOLD = 0.95


def setup_logging(verbose: bool) -> logging.Logger:
    """Setup logging configuration."""
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    return logging.getLogger(__name__)


def load_source_data(file_path: Optional[Path]) -> Dict[str, Any]:
    """Load source data from JSON file."""
    if not file_path or not file_path.exists():
        return {"sources": [], "total_articles": 0}
        
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return data
    except Exception as e:
        logging.warning(f"Failed to load {file_path}: {e}")
        return {"sources": [], "total_articles": 0}


def normalize_title(title: str) -> str:
    """Normalize title for comparison."""
    # Remove common prefixes/suffixes
    title = re.sub(r'^(RT\s+@\w+:\s*)', '', title, flags=re.IGNORECASE)
    title = re.sub(r'\s*[|\-–]\s*[^|]*$', '', title)  # Remove " | Site Name" endings
    
    # Normalize whitespace and punctuation
    title = re.sub(r'\s+', ' ', title).strip()
    title = re.sub(r'[^\w\s]', '', title.lower())
    
    return title


def calculate_title_similarity(title1: str, title2: str) -> float:
    """Calculate similarity between two titles."""
    norm1 = normalize_title(title1)
    norm2 = normalize_title(title2)
    
    if not norm1 or not norm2:
        return 0.0
        
    return SequenceMatcher(None, norm1, norm2).ratio()


def get_domain(url: str) -> str:
    """Extract domain from URL."""
    try:
        return urlparse(url).netloc.lower().replace('www.', '')
    except Exception:
        return ''


def normalize_url(url: str) -> str:
    """Normalize URL for dedup comparison (strip query, fragment, trailing slash, www.)."""
    try:
        parsed = urlparse(url)
        domain = parsed.netloc.lower().replace('www.', '')
        path = parsed.path.rstrip('/')
        return f"{domain}{path}"
    except Exception:
        return url


def calculate_base_score(article: Dict[str, Any], source: Dict[str, Any]) -> float:
    """Calculate base quality score for an article."""
    score = 0.0
    
    # Priority source bonus
    if source.get("priority", False):
        score += SCORE_PRIORITY_SOURCE
        
    # Recency bonus (< 24 hours)
    try:
        article_date = datetime.fromisoformat(article["date"].replace('Z', '+00:00'))
        hours_old = (datetime.now(timezone.utc) - article_date).total_seconds() / 3600
        if hours_old < 24:
            score += SCORE_RECENT
    except Exception:
        pass
    
    # Twitter engagement bonus (tiered)
    if source.get("source_type") == "twitter" and "metrics" in article:
        metrics = article["metrics"]
        likes = metrics.get("like_count", 0)
        retweets = metrics.get("retweet_count", 0)
        
        if likes >= 1000 or retweets >= 500:
            score += SCORE_ENGAGEMENT_VIRAL
        elif likes >= 500 or retweets >= 200:
            score += SCORE_ENGAGEMENT_HIGH
        elif likes >= 100 or retweets >= 50:
            score += SCORE_ENGAGEMENT_MED
        elif likes >= 50 or retweets >= 20:
            score += SCORE_ENGAGEMENT_LOW

    # RSS from priority sources get extra weight (official blogs, research papers)
    if source.get("source_type") == "rss" and source.get("priority", False):
        score += 2  # Extra priority RSS bonus

    return score


def _extract_tokens(title: str) -> Set[str]:
    """Extract significant tokens from a normalized title for bucketing."""
    norm = normalize_title(title)
    # Split into tokens, filter short/common words
    stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'in', 'on', 'at',
                 'to', 'for', 'of', 'and', 'or', 'with', 'by', 'from', 'as', 'it',
                 'its', 'that', 'this', 'be', 'has', 'had', 'have', 'not', 'but',
                 'what', 'how', 'new', 'will', 'can', 'do', 'does', 'did'}
    tokens = set()
    for word in norm.split():
        if len(word) >= 3 and word not in stopwords:
            tokens.add(word)
    return tokens


def _build_token_buckets(articles: List[Dict[str, Any]]) -> Dict[int, Set[int]]:
    """Build token-based buckets mapping each article index to candidate duplicate indices.
    
    Two articles are candidates if they share 2+ significant tokens.
    Returns dict: article_index -> set of candidate article indices to compare against.
    """
    from collections import defaultdict
    
    # token -> list of article indices
    token_to_indices: Dict[str, List[int]] = defaultdict(list)
    article_tokens: List[Set[str]] = []
    
    for i, article in enumerate(articles):
        tokens = _extract_tokens(article.get("title", ""))
        article_tokens.append(tokens)
        for token in tokens:
            token_to_indices[token].append(i)
    
    # For each article, find candidates sharing 2+ tokens
    candidates: Dict[int, Set[int]] = defaultdict(set)
    for i, tokens in enumerate(article_tokens):
        # Count how many tokens each other article shares with this one
        overlap_count: Dict[int, int] = defaultdict(int)
        for token in tokens:
            for j in token_to_indices[token]:
                if j != i:
                    overlap_count[j] += 1
        for j, count in overlap_count.items():
            if count >= 2:
                candidates[i].add(j)
    
    return candidates


def deduplicate_articles(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Remove duplicate articles based on title similarity.
    
    Uses token-based bucketing to avoid O(nΒ²) SequenceMatcher comparisons.
    Only articles sharing 2+ significant title tokens are compared.
    Domain saturation is handled separately per-topic after grouping.
    """
    if not articles:
        return articles
        
    # Sort by quality score (highest first) to keep best versions
    articles.sort(key=lambda x: x.get("quality_score", 0), reverse=True)

    # Phase 1: URL dedup (exact URL match after normalization)
    url_seen: Dict[str, int] = {}  # normalized_url -> index in articles
    url_duplicates: Set[int] = set()
    for i, article in enumerate(articles):
        url = article.get("link", "")
        if not url:
            continue
        norm_url = normalize_url(url)
        if norm_url in url_seen:
            # Keep the one with higher quality_score (articles already sorted by score)
            url_duplicates.add(i)
            logging.debug(f"URL duplicate: {url} ~= {articles[url_seen[norm_url]].get('link','')}")
        else:
            url_seen[norm_url] = i

    if url_duplicates:
        articles = [a for i, a in enumerate(articles) if i not in url_duplicates]
        logging.info(f"URL dedup: removed {len(url_duplicates)} duplicates")

    # Phase 2: Title similarity dedup
    deduplicated = []

    # Build token buckets for candidate pairs
    candidates = _build_token_buckets(articles)
    
    # Track which indices have been marked as duplicates
    duplicate_indices: Set[int] = set()
    
    for i, article in enumerate(articles):
        if i in duplicate_indices:
            continue
        
        title = article.get("title", "")
        
        # Mark future candidates as duplicates using SequenceMatcher (only within bucket)
        for j in candidates.get(i, set()):
            if j > i and j not in duplicate_indices:
                other_title = articles[j].get("title", "")
                # Quick length check β€” titles with >30% length difference are unlikely duplicates
                norm_i = normalize_title(title)
                norm_j = normalize_title(other_title)
                if abs(len(norm_i) - len(norm_j)) > 0.3 * max(len(norm_i), len(norm_j), 1):
                    continue
                similarity = calculate_title_similarity(title, other_title)
                if similarity >= TITLE_SIMILARITY_THRESHOLD:
                    logging.debug(f"Title duplicate: '{other_title}' ~= '{title}' ({similarity:.2f})")
                    duplicate_indices.add(j)
            
        deduplicated.append(article)
        
    logging.info(f"Deduplication: {len(articles)} β†’ {len(deduplicated)} articles")
    return deduplicated


# Domains exempt from per-topic limits (multi-author platforms)
DOMAIN_LIMIT_EXEMPT = {"x.com", "twitter.com", "github.com", "reddit.com"}

def apply_domain_limits(articles: List[Dict[str, Any]], max_per_domain: int = 3) -> List[Dict[str, Any]]:
    """Limit articles per domain within a single topic group.
    
    Should be called per-topic after group_by_topics() to ensure
    each topic gets its own domain budget.
    """
    domain_counts: Dict[str, int] = {}
    result = []
    for article in articles:
        domain = get_domain(article.get("link", ""))
        if domain and domain not in DOMAIN_LIMIT_EXEMPT:
            count = domain_counts.get(domain, 0)
            if count >= max_per_domain:
                logging.debug(f"Domain limit ({max_per_domain}): skipping {domain} article in topic")
                continue
            domain_counts[domain] = count + 1
        result.append(article)
    return result


def merge_article_sources(articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Merge articles that appear from multiple sources."""
    if not articles:
        return articles
        
    # Group articles by normalized title
    title_groups = {}
    for article in articles:
        norm_title = normalize_title(article.get("title", ""))
        if norm_title not in title_groups:
            title_groups[norm_title] = []
        title_groups[norm_title].append(article)
    
    merged = []
    for group in title_groups.values():
        if len(group) == 1:
            merged.append(group[0])
        else:
            # Multiple sources for same story - merge and boost score
            primary = max(group, key=lambda x: x.get("quality_score", 0))
            
            # Collect all source types
            source_types = set(article.get("source_type", "") for article in group)
            source_names = [article.get("source_name", "") for article in group]
            
            # Multi-source bonus
            multi_source_bonus = len(source_types) * SCORE_MULTI_SOURCE
            primary["quality_score"] = primary.get("quality_score", 0) + multi_source_bonus
            
            # Add metadata about multiple sources
            primary["multi_source"] = True
            primary["source_count"] = len(group)
            primary["all_sources"] = source_names[:3]  # Limit to avoid bloat
            
            logging.debug(f"Merged {len(group)} sources for: '{primary['title'][:50]}...'")
            merged.append(primary)
            
    return merged


def load_previous_digests(archive_dir: Path, days: int = 7) -> Set[str]:
    """Load titles from previous digests to avoid repeats."""
    if not archive_dir.exists():
        return set()
        
    seen_titles = set()
    cutoff = datetime.now() - timedelta(days=days)
    
    try:
        for file_path in archive_dir.glob("*.md"):
            # Extract date from filename
            match = re.search(r'(\d{4}-\d{2}-\d{2})', file_path.name)
            if match:
                try:
                    file_date = datetime.strptime(match.group(1), "%Y-%m-%d")
                    if file_date < cutoff:
                        continue
                except ValueError:
                    continue
                    
            # Extract titles from markdown
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                
            # Simple title extraction (assumes format like "- [Title](link)")
            for match in re.finditer(r'-\s*\[([^\]]+)\]', content):
                title = normalize_title(match.group(1))
                if title:
                    seen_titles.add(title)
                    
    except Exception as e:
        logging.debug(f"Failed to load previous digests: {e}")
        
    logging.info(f"Loaded {len(seen_titles)} titles from previous {days} days")
    return seen_titles


def apply_previous_digest_penalty(articles: List[Dict[str, Any]], 
                                previous_titles: Set[str]) -> List[Dict[str, Any]]:
    """Apply penalty to articles that appeared in previous digests."""
    if not previous_titles:
        return articles
        
    penalized_count = 0
    for article in articles:
        norm_title = normalize_title(article.get("title", ""))
        if norm_title in previous_titles:
            article["quality_score"] = article.get("quality_score", 0) + PENALTY_OLD_REPORT
            article["in_previous_digest"] = True
            penalized_count += 1
            
    logging.info(f"Applied previous digest penalty to {penalized_count} articles")
    return articles


def group_by_topics(articles: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
    """Group articles by their topics."""
    topic_groups = {}
    
    for article in articles:
        topics = article.get("topics", [])
        if not topics:
            topics = ["uncategorized"]
            
        for topic in topics:
            if topic not in topic_groups:
                topic_groups[topic] = []
            
            # Add copy with single topic for cleaner grouping
            article_copy = article.copy()
            article_copy["primary_topic"] = topic
            topic_groups[topic].append(article_copy)
    
    # Sort articles within each topic by quality score
    for topic in topic_groups:
        topic_groups[topic].sort(key=lambda x: x.get("quality_score", 0), reverse=True)
        
    return topic_groups


def main():
    """Main merge and scoring function."""
    parser = argparse.ArgumentParser(
        description="Merge articles from all sources with quality scoring and deduplication.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    python3 merge-sources.py --rss rss.json --twitter twitter.json --web web.json
    python3 merge-sources.py --rss rss.json --output merged.json --verbose
    python3 merge-sources.py --archive-dir workspace/archive/media-digest
        """
    )
    
    parser.add_argument(
        "--rss",
        type=Path,
        help="RSS fetch results JSON file"
    )
    
    parser.add_argument(
        "--twitter",
        type=Path,
        help="Twitter fetch results JSON file"
    )
    
    parser.add_argument(
        "--web",
        type=Path,
        help="Web search results JSON file"
    )
    
    parser.add_argument(
        "--github",
        type=Path,
        help="GitHub releases results JSON file"
    )
    
    parser.add_argument(
        "--trending",
        type=Path,
        help="GitHub trending repos JSON file"
    )
    
    parser.add_argument(
        "--reddit",
        type=Path,
        help="Reddit posts results JSON file"
    )
    
    parser.add_argument(
        "--output", "-o",
        type=Path,
        help="Output JSON path (default: auto-generated temp file)"
    )
    
    parser.add_argument(
        "--archive-dir",
        type=Path,
        help="Archive directory for previous digest penalty"
    )
    
    parser.add_argument(
        "--verbose", "-v",
        action="store_true",
        help="Enable verbose logging"
    )
    
    args = parser.parse_args()
    logger = setup_logging(args.verbose)
    
    # Auto-generate unique output path if not specified
    if not args.output:
        fd, temp_path = tempfile.mkstemp(prefix="media-news-digest-merged-", suffix=".json")
        os.close(fd)
        args.output = Path(temp_path)
    
    try:
        # Load source data
        rss_data = load_source_data(args.rss)
        twitter_data = load_source_data(args.twitter)
        web_data = load_source_data(args.web)
        github_data = load_source_data(args.github)
        trending_data = load_source_data(args.trending) if hasattr(args, "trending") else None
        reddit_data = load_source_data(args.reddit)
        
        logger.info(f"Loaded sources - RSS: {rss_data.get('total_articles', 0)}, "
                   f"Twitter: {twitter_data.get('total_articles', 0)}, "
                   f"Web: {web_data.get('total_articles', 0)}, "
                   f"GitHub: {github_data.get('total_articles', 0)} releases + {trending_data.get('total', 0) if trending_data else 0} trending, "
                   f"Reddit: {reddit_data.get('total_posts', 0)}")
        
        # Collect all articles with source context
        all_articles = []
        
        # Process RSS articles
        for source in rss_data.get("sources", []):
            for article in source.get("articles", []):
                article["source_type"] = "rss"
                article["source_name"] = source.get("name", "")
                article["source_id"] = source.get("source_id", "")
                article["quality_score"] = calculate_base_score(article, source)
                all_articles.append(article)
        
        # Process Twitter articles
        for source in twitter_data.get("sources", []):
            for article in source.get("articles", []):
                article["source_type"] = "twitter"
                article["source_name"] = f"@{source.get('handle', '')}"
                article["display_name"] = source.get("name", "")
                article["source_id"] = source.get("source_id", "")
                article["quality_score"] = calculate_base_score(article, source)
                all_articles.append(article)
        
        # Process Web articles
        for topic_result in web_data.get("topics", []):
            for article in topic_result.get("articles", []):
                article["source_type"] = "web"
                article["source_name"] = "Web Search"
                article["source_id"] = f"web-{topic_result.get('topic_id', '')}"
                # Build a minimal source dict so web articles go through the same scoring
                web_source = {
                    "source_type": "web",
                    "priority": False,
                }
                article["quality_score"] = calculate_base_score(article, web_source)
                all_articles.append(article)
        
        # Process GitHub articles
        for source in github_data.get("sources", []):
            for article in source.get("articles", []):
                article["source_type"] = "github"
                article["source_name"] = source.get("name", "")
                article["source_id"] = source.get("source_id", "")
                article["quality_score"] = calculate_base_score(article, source)
                all_articles.append(article)
        
        # Process Reddit articles
        for source in reddit_data.get("subreddits", []):
            for article in source.get("articles", []):
                article["source_type"] = "reddit"
                article["source_name"] = f"r/{source.get('subreddit', '')}"
                article["source_id"] = source.get("source_id", "")
                reddit_source = {
                    "source_type": "reddit",
                    "priority": source.get("priority", False),
                }
                article["quality_score"] = calculate_base_score(article, reddit_source)
                # Reddit score bonus
                score = article.get("score", 0)
                if score > 500:
                    article["quality_score"] += 5
                elif score > 200:
                    article["quality_score"] += 3
                elif score > 100:
                    article["quality_score"] += 1
                all_articles.append(article)
        

        # Load GitHub trending repos
        if trending_data:
            for repo in trending_data.get("repos", []):
                article = {
                    "title": f"{repo['repo']}: {repo['description']}" if repo.get('description') else repo['repo'],
                    "link": repo.get("url", f"https://github.com/{repo['repo']}"),
                    "snippet": repo.get("description", ""),
                    "date": repo.get("pushed_at", ""),
                    "source": "github-trending",
                    "source_type": "github_trending",
                    "topics": repo.get("topics", []),
                    "stars": repo.get("stars", 0),
                    "daily_stars_est": repo.get("daily_stars_est", 0),
                    "forks": repo.get("forks", 0),
                    "language": repo.get("language", ""),
                    "quality_score": 5 + min(10, repo.get("daily_stars_est", 0) // 10),
                }
                all_articles.append(article)
        total_collected = len(all_articles)
        logger.info(f"Total articles collected: {total_collected}")
        
        # Load previous digest titles for penalty
        previous_titles = set()
        if args.archive_dir:
            previous_titles = load_previous_digests(args.archive_dir)
        
        # Apply previous digest penalty
        all_articles = apply_previous_digest_penalty(all_articles, previous_titles)
        
        # Merge multi-source articles
        all_articles = merge_article_sources(all_articles)
        logger.info(f"After merging multi-source: {len(all_articles)}")
        
        # Deduplicate articles
        all_articles = deduplicate_articles(all_articles)
        
        # Group by topics
        topic_groups = group_by_topics(all_articles)
        
        # Apply per-topic domain limits (max 3 articles per domain per topic)
        for topic in topic_groups:
            before = len(topic_groups[topic])
            topic_groups[topic] = apply_domain_limits(topic_groups[topic])
            after = len(topic_groups[topic])
            if before != after:
                logger.info(f"Domain limits ({topic}): {before} β†’ {after}")
        
        # Recalculate total after domain limits
        total_after_domain_limits = sum(len(articles) for articles in topic_groups.values())


        topic_counts = {topic: len(articles) for topic, articles in topic_groups.items()}
        
        output = {
            "generated": datetime.now(timezone.utc).isoformat(),
            "input_sources": {
                "rss_articles": rss_data.get("total_articles", 0),
                "twitter_articles": twitter_data.get("total_articles", 0),
                "web_articles": web_data.get("total_articles", 0),
                "github_articles": github_data.get("total_articles", 0),
                "github_trending": trending_data.get("total", 0) if trending_data else 0,
                "reddit_posts": reddit_data.get("total_posts", 0),
                "total_input": total_collected
            },
            "processing": {
                "deduplication_applied": True,
                "multi_source_merging": True,
                "previous_digest_penalty": len(previous_titles) > 0,
                "quality_scoring": True
            },
            "output_stats": {
                "total_articles": total_after_domain_limits,
                "topics_count": len(topic_groups),
                "topic_distribution": topic_counts
            },
            "topics": {
                topic: {
                    "count": len(articles),
                    "articles": articles
                } for topic, articles in topic_groups.items()
            }
        }
        
        # Write output
        json_str = json.dumps(output, ensure_ascii=False, indent=2)
        with open(args.output, "w", encoding='utf-8') as f:
            f.write(json_str)
        
        logger.info(f"βœ… Merged and scored articles:")
        logger.info(f"   Input: {total_collected} articles")
        logger.info(f"   Output: {total_after_domain_limits} articles across {len(topic_groups)} topics")
        logger.info(f"   File: {args.output}")
        
        return 0
        
    except Exception as e:
        logger.error(f"πŸ’₯ Merge failed: {e}")
        return 1


if __name__ == "__main__":
    sys.exit(main())
```

### scripts/sanitize-html.py

```python
#!/usr/bin/env python3
"""
Sanitize media digest markdown report into safe HTML email.

Reads a markdown report file, escapes all text content to prevent XSS,
and outputs a styled HTML email body safe for injection into email clients.

Usage:
    python3 sanitize-html.py --input /tmp/md-report.md --output /tmp/md-email.html [--verbose]

Security:
    - All text content is HTML-escaped (prevents XSS from malicious RSS/Twitter/web content)
    - Only whitelisted tags/attributes are allowed
    - URLs are validated (must be http/https)
    - No JavaScript, event handlers, or data: URIs allowed
"""

import argparse
import html
import re
import sys
import logging
from urllib.parse import urlparse


def escape(text: str) -> str:
    """HTML-escape text content."""
    return html.escape(text, quote=True)


def is_safe_url(url: str) -> bool:
    """Validate URL is http(s) only β€” no javascript:, data:, etc."""
    try:
        parsed = urlparse(url.strip())
        return parsed.scheme in ('http', 'https')
    except Exception:
        return False


def safe_link(url: str, label: str = None, style: str = "color:#0969da;font-size:13px") -> str:
    """Generate a safe HTML link with escaped content."""
    url = url.strip()
    if not is_safe_url(url):
        return escape(label or url)
    escaped_url = escape(url)
    escaped_label = escape(label or url)
    return f'<a href="{escaped_url}" style="{style}">{escaped_label}</a>'


def markdown_to_safe_html(md_content: str) -> str:
    """Convert markdown digest report to sanitized HTML email."""
    lines = md_content.strip().split('\n')
    html_parts = []
    
    # Email wrapper open
    html_parts.append(
        '<div style="max-width:640px;margin:0 auto;font-family:'
        '-apple-system,BlinkMacSystemFont,\'Segoe UI\',Roboto,sans-serif;'
        'color:#1a1a1a;line-height:1.6">'
    )
    
    in_list = False
    
    for line in lines:
        stripped = line.strip()
        
        # Skip empty lines
        if not stripped:
            if in_list:
                html_parts.append('</ul>')
                in_list = False
            continue
        
        # H1: # Title
        if stripped.startswith('# '):
            title = escape(stripped[2:])
            html_parts.append(
                f'<h1 style="font-size:22px;border-bottom:2px solid #e5e5e5;'
                f'padding-bottom:8px">{title}</h1>'
            )
            continue
        
        # H2: ## Section
        if stripped.startswith('## '):
            if in_list:
                html_parts.append('</ul>')
                in_list = False
            section = escape(stripped[3:])
            html_parts.append(
                f'<h2 style="font-size:17px;margin-top:24px;color:#333">{section}</h2>'
            )
            continue
        
        # Blockquote: > executive summary
        if stripped.startswith('> '):
            text = escape(stripped[2:])
            html_parts.append(
                f'<p style="color:#555;font-size:14px;background:#f8f9fa;'
                f'padding:12px;border-radius:6px">{text}</p>'
            )
            continue
        
        # Horizontal rule
        if stripped == '---':
            html_parts.append('<hr style="border:none;border-top:1px solid #e5e5e5;margin:24px 0">')
            continue
        
        # Bullet items: β€’ or -
        if stripped.startswith('β€’ ') or stripped.startswith('- '):
            if not in_list:
                html_parts.append('<ul style="padding-left:20px">')
                in_list = True
            item_text = stripped[2:]
            safe_item = _process_inline(item_text)
            html_parts.append(f'<li style="margin-bottom:10px">{safe_item}</li>')
            continue
        
        # Continuation of bullet (indented line with link)
        if stripped.startswith('<http') and in_list:
            url = stripped.strip('<> ')
            link = safe_link(url)
            html_parts.append(f'<li style="margin-bottom:2px;list-style:none">{link}</li>')
            continue
        
        # Stats/footer line
        if stripped.startswith('πŸ“Š') or stripped.startswith('πŸ€–'):
            text = _process_inline(stripped)
            html_parts.append(f'<p style="font-size:12px;color:#888">{text}</p>')
            continue
        
        # Regular paragraph
        text = _process_inline(stripped)
        html_parts.append(f'<p>{text}</p>')
    
    if in_list:
        html_parts.append('</ul>')
    
    html_parts.append('</div>')
    return '\n'.join(html_parts)


def _process_inline(text: str) -> str:
    """Process inline markdown (bold, links, code) with HTML escaping."""
    # First escape everything
    result = escape(text)
    
    # Restore bold: **text** β†’ <strong>text</strong>
    result = re.sub(
        r'\*\*(.+?)\*\*',
        r'<strong>\1</strong>',
        result
    )
    
    # Restore inline code: `text` β†’ <code>text</code>
    result = re.sub(
        r'`(.+?)`',
        lambda m: f'<code style="font-size:12px;color:#888;background:#f4f4f4;'
                  f'padding:2px 6px;border-radius:3px">{m.group(1)}</code>',
        result
    )
    
    # Restore angle-bracket links: &lt;https://...&gt; β†’ <a href>
    def restore_link(m):
        url = html.unescape(m.group(1))
        if is_safe_url(url):
            escaped_url = escape(url)
            # Show shortened domain
            try:
                domain = urlparse(url).netloc
                return f'<a href="{escaped_url}" style="color:#0969da;font-size:13px">{escape(domain)}</a>'
            except Exception:
                return f'<a href="{escaped_url}" style="color:#0969da;font-size:13px">{escaped_url}</a>'
        return escape(url)
    
    result = re.sub(r'&lt;(https?://[^&]+?)&gt;', restore_link, result)
    
    # Restore markdown links: [text](url) β€” already escaped, need to unescape for parsing
    def restore_md_link(m):
        label = html.unescape(m.group(1))
        url = html.unescape(m.group(2))
        if is_safe_url(url):
            return f'<a href="{escape(url)}" style="color:#0969da">{escape(label)}</a>'
        return escape(label)
    
    result = re.sub(r'\[([^\]]+?)\]\(([^)]+?)\)', restore_md_link, result)
    
    return result


def main():
    parser = argparse.ArgumentParser(
        description="Convert markdown digest to sanitized HTML email"
    )
    parser.add_argument("--input", "-i", required=True, help="Input markdown file")
    parser.add_argument("--output", "-o", required=True, help="Output HTML file")
    parser.add_argument("--verbose", "-v", action="store_true")
    args = parser.parse_args()
    
    logging.basicConfig(
        level=logging.DEBUG if args.verbose else logging.INFO,
        format="%(levelname)s: %(message)s"
    )
    
    try:
        with open(args.input, 'r') as f:
            md_content = f.read()
    except FileNotFoundError:
        logging.error(f"Input file not found: {args.input}")
        sys.exit(1)
    
    logging.info(f"Converting {args.input} ({len(md_content)} chars)")
    
    html_output = markdown_to_safe_html(md_content)
    
    with open(args.output, 'w') as f:
        f.write(html_output)
    
    logging.info(f"Wrote sanitized HTML to {args.output} ({len(html_output)} chars)")


if __name__ == "__main__":
    main()

```

### scripts/send-email.py

```python
#!/usr/bin/env python3
"""
Send HTML email with optional PDF attachment via msmtp or sendmail.

Properly constructs MIME multipart message so HTML body renders correctly
even when attachments are included.

Usage:
    python3 send-email.py --to [email protected] --subject "Daily Digest" \
        --html /tmp/md-email.html [--attach /tmp/md-digest.pdf] [--from "Bot <[email protected]>"]
"""

import argparse
import base64
import subprocess
import sys
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.utils import formatdate
from pathlib import Path


def build_message(subject: str, from_addr: str, to_addrs: list,
                  html_path: Path, attach_path: Path = None) -> str:
    """Build a proper MIME message with HTML body and optional attachment."""
    
    html_content = html_path.read_text(encoding='utf-8')
    
    if attach_path and attach_path.exists():
        # Multipart mixed: HTML body + attachment
        msg = MIMEMultipart('mixed')
        html_part = MIMEText(html_content, 'html', 'utf-8')
        msg.attach(html_part)
        
        pdf_data = attach_path.read_bytes()
        pdf_part = MIMEApplication(pdf_data, _subtype='pdf')
        pdf_part.add_header('Content-Disposition', 'attachment',
                           filename=attach_path.name)
        msg.attach(pdf_part)
    else:
        # Simple HTML message
        msg = MIMEText(html_content, 'html', 'utf-8')
    
    msg['Subject'] = subject
    msg['From'] = from_addr
    msg['To'] = ', '.join(to_addrs)
    msg['Date'] = formatdate(localtime=True)
    
    return msg.as_string()


def send_via_msmtp(message: str, to_addrs: list) -> bool:
    """Send via msmtp (preferred)."""
    try:
        result = subprocess.run(
            ['msmtp', '--read-envelope-from'] + to_addrs,
            input=message.encode('utf-8'),
            capture_output=True,
            timeout=30
        )
        if result.returncode == 0:
            return True
        logging.error(f"msmtp failed: {result.stderr.decode()}")
        return False
    except FileNotFoundError:
        logging.debug("msmtp not found")
        return False
    except Exception as e:
        logging.error(f"msmtp error: {e}")
        return False


def send_via_sendmail(message: str, to_addrs: list) -> bool:
    """Send via sendmail (fallback)."""
    for cmd in ['sendmail', '/usr/sbin/sendmail']:
        try:
            result = subprocess.run(
                [cmd, '-t'] + to_addrs,
                input=message.encode('utf-8'),
                capture_output=True,
                timeout=30
            )
            if result.returncode == 0:
                return True
            logging.error(f"{cmd} failed: {result.stderr.decode()}")
        except FileNotFoundError:
            continue
        except Exception as e:
            logging.error(f"{cmd} error: {e}")
    return False


def main():
    parser = argparse.ArgumentParser(
        description="Send HTML email with optional PDF attachment",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""\
Examples:
    python3 send-email.py --to [email protected] --subject "Daily Digest" --html /tmp/md-email.html
    python3 send-email.py --to [email protected] --to [email protected] --subject "Weekly" --html body.html --attach digest.pdf
    python3 send-email.py --to [email protected] --subject "Test" --html body.html --from "Bot <[email protected]>"
"""
    )
    parser.add_argument('--to', action='append', required=True, help='Recipient email (repeatable)')
    parser.add_argument('--subject', '-s', required=True, help='Email subject')
    parser.add_argument('--html', required=True, type=Path, help='HTML body file')
    parser.add_argument('--attach', type=Path, default=None, help='PDF attachment file')
    parser.add_argument('--from', dest='from_addr', default=None, help='From address')
    parser.add_argument('--verbose', '-v', action='store_true')
    args = parser.parse_args()
    
    logging.basicConfig(
        level=logging.DEBUG if args.verbose else logging.INFO,
        format="%(levelname)s: %(message)s"
    )
    
    if not args.html.exists():
        logging.error(f"HTML file not found: {args.html}")
        sys.exit(1)
    
    # Expand comma-separated addresses
    to_addrs = []
    for addr in args.to:
        to_addrs.extend([a.strip() for a in addr.split(',') if a.strip()])
    
    from_addr = args.from_addr or 'noreply@localhost'
    
    logging.info(f"Building email: {args.subject} β†’ {', '.join(to_addrs)}")
    if args.attach:
        logging.info(f"Attachment: {args.attach} ({'exists' if args.attach.exists() else 'MISSING'})")
    
    message = build_message(args.subject, from_addr, to_addrs, args.html, args.attach)
    
    # Try msmtp first, then sendmail
    if send_via_msmtp(message, to_addrs):
        logging.info("βœ… Sent via msmtp")
        return 0
    
    if send_via_sendmail(message, to_addrs):
        logging.info("βœ… Sent via sendmail")
        return 0
    
    logging.error("❌ All send methods failed")
    return 1


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/source-health.py

```python
#!/usr/bin/env python3
"""
Source health monitoring for media-news-digest pipeline.

Tracks per-source success/failure history and reports unhealthy sources.

Usage:
    python3 source-health.py --rss rss.json --twitter twitter.json --github github.json
"""

import json
import sys
import argparse
import logging
import time
from pathlib import Path
from typing import Dict, Any, Optional
from datetime import datetime

HEALTH_FILE = "/tmp/media-news-digest-source-health.json"
HISTORY_DAYS = 7
FAILURE_THRESHOLD = 0.5  # >50% failure rate triggers warning


def setup_logging(verbose: bool) -> logging.Logger:
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(level=level, format='%(asctime)s - %(levelname)s - %(message)s')
    return logging.getLogger(__name__)


def load_health_data() -> Dict[str, Any]:
    try:
        with open(HEALTH_FILE, 'r') as f:
            return json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        return {}


def save_health_data(data: Dict[str, Any]) -> None:
    with open(HEALTH_FILE, 'w') as f:
        json.dump(data, f, indent=2)


def load_source_file(path: Optional[Path]) -> list:
    if not path or not path.exists():
        return []
    try:
        with open(path, 'r') as f:
            data = json.load(f)
        return data.get("sources", [])
    except (json.JSONDecodeError, OSError):
        return []


def load_source_file_flexible(path: Optional[Path]) -> list:
    """Load sources from a JSON file, trying 'sources', 'subreddits', and 'topics' keys."""
    if not path or not path.exists():
        return []
    try:
        with open(path, 'r') as f:
            data = json.load(f)
        # Try standard keys
        if "sources" in data:
            return data["sources"]
        if "subreddits" in data:
            return data["subreddits"]
        if "topics" in data:
            # Create synthetic sources from topic results
            synthetic = []
            for topic in data["topics"]:
                synthetic.append({
                    "source_id": f"web-{topic.get('topic_id', 'unknown')}",
                    "name": f"Web: {topic.get('topic_id', 'unknown')}",
                    "status": topic.get("status", "ok"),
                    "articles": topic.get("articles", []),
                })
            return synthetic
        return []
    except (json.JSONDecodeError, OSError):
        return []


def update_health(health: Dict[str, Any], sources: list, now: float) -> None:
    cutoff = now - HISTORY_DAYS * 86400
    for source in sources:
        sid = source.get("source_id", source.get("id", "unknown"))
        if sid not in health:
            health[sid] = {"name": source.get("name", sid), "checks": []}
        # Prune old entries
        health[sid]["checks"] = [c for c in health[sid]["checks"] if c["ts"] > cutoff]
        health[sid]["checks"].append({
            "ts": now,
            "ok": source.get("status") == "ok",
        })


def report_unhealthy(health: Dict[str, Any], logger: logging.Logger) -> int:
    unhealthy = 0
    for sid, info in health.items():
        checks = info.get("checks", [])
        if len(checks) < 2:
            continue
        failures = sum(1 for c in checks if not c["ok"])
        rate = failures / len(checks)
        if rate > FAILURE_THRESHOLD:
            logger.warning(f"⚠️  Unhealthy source: {info.get('name', sid)} "
                         f"({failures}/{len(checks)} failures, {rate:.0%} failure rate)")
            unhealthy += 1
    return unhealthy


def main():
    parser = argparse.ArgumentParser(description="Track source health for media-news-digest pipeline.")
    parser.add_argument("--rss", type=Path, help="RSS output JSON")
    parser.add_argument("--twitter", type=Path, help="Twitter output JSON")
    parser.add_argument("--github", type=Path, help="GitHub output JSON")
    parser.add_argument("--reddit", type=Path, help="Reddit output JSON")
    parser.add_argument("--web", type=Path, help="Web search output JSON")
    parser.add_argument("--verbose", "-v", action="store_true")
    args = parser.parse_args()

    logger = setup_logging(args.verbose)
    health = load_health_data()
    now = time.time()

    # Standard sources (use 'sources' key)
    for path in [args.rss, args.twitter, args.github]:
        sources = load_source_file(path)
        if sources:
            update_health(health, sources, now)

    # Reddit and Web use flexible loading (subreddits/topics keys)
    for path in [args.reddit, args.web]:
        sources = load_source_file_flexible(path)
        if sources:
            update_health(health, sources, now)

    save_health_data(health)
    unhealthy = report_unhealthy(health, logger)

    total = len(health)
    logger.info(f"πŸ“Š Health check: {total} sources tracked, {unhealthy} unhealthy")
    return 0


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/summarize-merged.py

```python
#!/usr/bin/env python3
"""
Print a human-readable summary of merged JSON data for LLM consumption.

Usage:
    python3 summarize-merged.py [--input /tmp/md-merged.json] [--top N] [--topic TOPIC]
"""

import json
import argparse
from pathlib import Path


def summarize(data: dict, top_n: int = 10, topic_filter: str = None):
    """Print structured summary of merged data."""
    
    # Metadata
    meta = data.get("output_stats", {})
    print(f"=== Merged Data Summary ===")
    print(f"Total articles: {meta.get('total_articles', '?')}")
    print(f"Topics: {', '.join(data.get('topics', {}).keys())}")
    print()
    
    topics = data.get("topics", {})
    
    for topic_id, topic_data in topics.items():
        if topic_filter and topic_id != topic_filter:
            continue
        
        articles = topic_data.get("articles", [])
        if not isinstance(articles, list):
            continue
        
        print(f"=== {topic_id} ({len(articles)} articles) ===")
        
        # Sort by quality_score descending
        sorted_articles = sorted(
            [a for a in articles if isinstance(a, dict)],
            key=lambda a: a.get("quality_score", 0),
            reverse=True
        )
        
        for i, a in enumerate(sorted_articles[:top_n]):
            title = a.get("title", "?")[:100]
            source = a.get("source_name", "?")
            source_type = a.get("source_type", "?")
            qs = a.get("quality_score", 0)
            link = a.get("link") or a.get("reddit_url") or a.get("external_url", "")
            snippet = (a.get("snippet") or a.get("summary") or "")[:150]
            
            # Metrics for Twitter
            metrics = a.get("metrics", {})
            display_name = a.get("display_name", "")
            
            print(f"\n  [{i+1}] ({qs:.0f}pts) [{source_type}] {title}")
            print(f"      Source: {source}", end="")
            if display_name:
                print(f" ({display_name})", end="")
            print()
            if link:
                print(f"      Link: {link}")
            if snippet:
                print(f"      Snippet: {snippet}")
            if metrics:
                parts = []
                for k, v in metrics.items():
                    if v and v > 0:
                        parts.append(f"{k}={v}")
                if parts:
                    print(f"      Metrics: {', '.join(parts)}")
            
            # Reddit-specific
            reddit_score = a.get("score")
            num_comments = a.get("num_comments")
            if reddit_score is not None:
                print(f"      Reddit: {reddit_score}↑", end="")
                if num_comments:
                    print(f" Β· {num_comments} comments", end="")
                print()
        
        print()


def main():
    parser = argparse.ArgumentParser(description="Summarize merged JSON for LLM consumption")
    parser.add_argument("--input", "-i", type=Path, default=Path("/tmp/md-merged.json"))
    parser.add_argument("--top", "-n", type=int, default=10, help="Top N articles per topic")
    parser.add_argument("--topic", "-t", type=str, default=None, help="Filter to specific topic")
    args = parser.parse_args()
    
    if not args.input.exists():
        print(f"Error: {args.input} not found. Run the pipeline first.")
        return
    
    with open(args.input) as f:
        data = json.load(f)
    
    summarize(data, top_n=args.top, topic_filter=args.topic)


if __name__ == "__main__":
    main()

```

### scripts/test-pipeline.sh

```bash
#!/bin/bash
# Pipeline smoke test β€” runs fetch steps with filtering, validates outputs
# Usage:
#   ./test-pipeline.sh                          # run all sources
#   ./test-pipeline.sh --only twitter,rss       # only these source types
#   ./test-pipeline.sh --skip web               # skip web search
#   ./test-pipeline.sh --topics crypto           # only sources with these topics
#   ./test-pipeline.sh --ids sama-twitter,openai-rss  # specific source IDs
#   ./test-pipeline.sh --hours 12               # custom time window
#   ./test-pipeline.sh --keep                   # keep output dir after test
#   ./test-pipeline.sh --twitter-backend twitterapiio  # force twitter backend
set -e

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
DEFAULTS="$SCRIPT_DIR/../config/defaults"
OUTDIR=$(mktemp -d /tmp/media-digest-test-XXXXXX)
PASSED=0
SKIPPED=0
FAILED=0
HOURS=24
KEEP=false
ONLY=""
SKIP=""
TOPICS=""
IDS=""
TWITTER_BACKEND=""
VERBOSE=""
CONFIG=""

# ── Parse args ──
while [[ $# -gt 0 ]]; do
    case "$1" in
        --only)       ONLY="$2"; shift 2 ;;
        --skip)       SKIP="$2"; shift 2 ;;
        --topics)     TOPICS="$2"; shift 2 ;;
        --ids)        IDS="$2"; shift 2 ;;
        --hours)      HOURS="$2"; shift 2 ;;
        --keep)       KEEP=true; shift ;;
        --twitter-backend|--backend) TWITTER_BACKEND="$2"; shift 2 ;;
        --config)     CONFIG="$2"; shift 2 ;;
        --verbose|-v) VERBOSE="--verbose"; shift ;;
        --help|-h)
            cat <<'HELP'
Pipeline smoke test β€” runs fetch steps with filtering, merges, and validates outputs.

USAGE:
  ./test-pipeline.sh [OPTIONS]

OPTIONS:
  --only TYPES      Only run these source types (comma-separated)
                    Values: rss, twitter, github, reddit, web
                    Example: --only twitter,rss

  --skip TYPES      Skip these source types (comma-separated)
                    Values: rss, twitter, github, reddit, web
                    Example: --skip web,reddit

  --topics TOPICS   Only include sources matching these topics (comma-separated)
                    Values: llm, ai-agent, frontier-tech, crypto
                    Example: --topics crypto,llm

  --ids IDS         Only include specific source IDs (comma-separated)
                    IDs are defined in config/defaults/sources.json
                    Example: --ids sama-twitter,openai-rss,vitalik-twitter

  --hours N         Time window for fetching articles (default: 24)
                    Example: --hours 48

  --twitter-backend NAME
                    Force a specific Twitter API backend
                    Values: official, twitterapiio, auto
                    official     = X API v2 (needs X_BEARER_TOKEN)
                    twitterapiio = twitterapi.io (needs TWITTERAPI_IO_KEY)
                    auto         = try twitterapiio first, fallback to official

  --config DIR      User config overlay directory (optional)
                    Example: --config workspace/config

  --verbose, -v     Enable verbose logging for fetch scripts

  --keep            Keep output directory after test (default: clean up on success)

  --help, -h        Show this help message

EXAMPLES:
  ./test-pipeline.sh                                    # full pipeline, all sources
  ./test-pipeline.sh --only twitter --twitter-backend twitterapiio  # twitter only via twitterapi.io
  ./test-pipeline.sh --topics crypto --hours 48 --keep  # crypto sources, 48h window
  ./test-pipeline.sh --skip web,reddit -v               # skip web+reddit, verbose
  ./test-pipeline.sh --ids sama-twitter,karpathy-twitter --only twitter

ENVIRONMENT:
  X_BEARER_TOKEN      Official X API v2 bearer token (for --backend official)
  TWITTERAPI_IO_KEY   twitterapi.io API key (for --backend twitterapiio)
  TWITTER_API_BACKEND Default twitter backend if --backend not given (official|twitterapiio|auto)
  BRAVE_API_KEY       Brave Search API key (for web fetch)
  GITHUB_TOKEN        GitHub token (optional, increases GitHub API rate limits)
HELP
            exit 0
            ;;
        *) echo "Unknown option: $1"; exit 1 ;;
    esac
done

# ── Helpers ──
should_run() {
    local type="$1"
    # Check --only filter
    if [ -n "$ONLY" ]; then
        echo ",$ONLY," | grep -qi ",$type," || return 1
    fi
    # Check --skip filter
    if [ -n "$SKIP" ]; then
        echo ",$SKIP," | grep -qi ",$type," && return 1
    fi
    return 0
}

run_step() {
    local name="$1"; shift
    local start=$(date +%s)
    if "$@" 2>&1; then
        local elapsed=$(( $(date +%s) - start ))
        echo "βœ… $name (${elapsed}s)"
        PASSED=$((PASSED + 1))
    else
        local code=$?
        local elapsed=$(( $(date +%s) - start ))
        echo "❌ $name (exit $code, ${elapsed}s)"
        FAILED=$((FAILED + 1))
    fi
}

validate_json() {
    local file="$1" name="$2"
    if [ -f "$file" ] && python3 -c "
import json, sys
d = json.load(open(sys.argv[1]))
# Print summary stats
if 'sources' in d and isinstance(d['sources'], list):
    ok = sum(1 for s in d['sources'] if s.get('status') == 'ok')
    total = len(d['sources'])
    articles = sum(s.get('count', len(s.get('articles', []))) for s in d['sources'])
    print(f'   πŸ“Š {ok}/{total} sources ok, {articles} articles')
elif 'topics' in d:
    topics = d['topics']
    if isinstance(topics, dict):
        total = sum(len(t.get('articles', [])) for t in topics.values())
        print(f'   πŸ“Š {len(topics)} topics, {total} articles')
    elif isinstance(topics, list):
        total = sum(len(t.get('articles', [])) for t in topics)
        print(f'   πŸ“Š {len(topics)} topics, {total} articles')
" "$file" 2>/dev/null; then
        echo "βœ… $name JSON valid"
        PASSED=$((PASSED + 1))
    else
        echo "❌ $name JSON invalid or missing"
        FAILED=$((FAILED + 1))
    fi
}

# ── Generate filtered sources if --topics or --ids specified ──
EXTRA_ARGS=()
if [ -n "$TOPICS" ] || [ -n "$IDS" ]; then
    FILTER_CONFIG="$OUTDIR/filter-config"
    mkdir -p "$FILTER_CONFIG"
    python3 -c "
import json, sys
topics_filter = '${TOPICS}'.split(',') if '${TOPICS}' else []
ids_filter = '${IDS}'.split(',') if '${IDS}' else []

d = json.load(open('${DEFAULTS}/sources.json'))
filtered = []
for s in d['sources']:
    if ids_filter and s['id'] not in ids_filter:
        continue
    if topics_filter and not any(t in s.get('topics', []) for t in topics_filter):
        continue
    filtered.append(s)

d['sources'] = filtered
print(f'Filtered: {len(filtered)} sources', file=sys.stderr)
json.dump(d, open('${FILTER_CONFIG}/sources.json', 'w'), indent=2)
" 2>&1
    DEFAULTS="$FILTER_CONFIG"
fi

if [ -n "$CONFIG" ]; then
    EXTRA_ARGS+=("--config" "$CONFIG")
fi
if [ -n "$VERBOSE" ]; then
    EXTRA_ARGS+=("$VERBOSE")
fi

echo "πŸ§ͺ Pipeline Test (hours=$HOURS, outdir=$OUTDIR)"
echo "   Sources: $(python3 -c "import json; d=json.load(open('${DEFAULTS}/sources.json')); types={}
for s in d['sources']: t=s['type']; types[t]=types.get(t,0)+1
print(' | '.join(f'{t}:{n}' for t,n in sorted(types.items())))" 2>/dev/null)"
echo ""

# ── Fetch steps ──

# RSS
if should_run "rss"; then
    run_step "fetch-rss" python3 "$SCRIPT_DIR/fetch-rss.py" --defaults "$DEFAULTS" --hours "$HOURS" --output "$OUTDIR/rss.json" --force "${EXTRA_ARGS[@]}"
    validate_json "$OUTDIR/rss.json" "rss"
else
    echo "⏭  fetch-rss (skipped)"
    SKIPPED=$((SKIPPED + 1))
fi

# GitHub
if should_run "github"; then
    run_step "fetch-github" python3 "$SCRIPT_DIR/fetch-github.py" --defaults "$DEFAULTS" --hours "$HOURS" --output "$OUTDIR/github.json" --force "${EXTRA_ARGS[@]}"
    validate_json "$OUTDIR/github.json" "github"
else
    echo "⏭  fetch-github (skipped)"
    SKIPPED=$((SKIPPED + 1))
fi

# Twitter
if should_run "twitter"; then
    TWITTER_ARGS=("--defaults" "$DEFAULTS" "--hours" "$HOURS" "--output" "$OUTDIR/twitter.json" "--force" "${EXTRA_ARGS[@]}")
    [ -n "$TWITTER_BACKEND" ] && TWITTER_ARGS+=("--backend" "$TWITTER_BACKEND")

    if [ -n "$X_BEARER_TOKEN" ] || [ -n "$TWITTERAPI_IO_KEY" ]; then
        run_step "fetch-twitter" python3 "$SCRIPT_DIR/fetch-twitter.py" "${TWITTER_ARGS[@]}"
        validate_json "$OUTDIR/twitter.json" "twitter"
    else
        echo "⏭  fetch-twitter (no X_BEARER_TOKEN or TWITTERAPI_IO_KEY)"
        SKIPPED=$((SKIPPED + 1))
    fi
else
    echo "⏭  fetch-twitter (skipped)"
    SKIPPED=$((SKIPPED + 1))
fi

# Reddit
if should_run "reddit"; then
    if [ -f "$SCRIPT_DIR/fetch-reddit.py" ]; then
        run_step "fetch-reddit" python3 "$SCRIPT_DIR/fetch-reddit.py" --defaults "$DEFAULTS" --hours "$HOURS" --output "$OUTDIR/reddit.json" --force "${EXTRA_ARGS[@]}"
        validate_json "$OUTDIR/reddit.json" "reddit"
    else
        echo "⏭  fetch-reddit (script not found)"
        SKIPPED=$((SKIPPED + 1))
    fi
else
    echo "⏭  fetch-reddit (skipped)"
    SKIPPED=$((SKIPPED + 1))
fi

# Web search
if should_run "web"; then
    if [ -n "$BRAVE_API_KEY" ]; then
        run_step "fetch-web" python3 "$SCRIPT_DIR/fetch-web.py" --defaults "$DEFAULTS" --freshness pd --output "$OUTDIR/web.json" --force "${EXTRA_ARGS[@]}"
        validate_json "$OUTDIR/web.json" "web"
    else
        echo "⏭  fetch-web (no BRAVE_API_KEY)"
        SKIPPED=$((SKIPPED + 1))
    fi
else
    echo "⏭  fetch-web (skipped)"
    SKIPPED=$((SKIPPED + 1))
fi

# ── Merge ──
MERGE_ARGS=("--output" "$OUTDIR/merged.json")
[ -f "$OUTDIR/rss.json" ]     && MERGE_ARGS+=("--rss" "$OUTDIR/rss.json")
[ -f "$OUTDIR/twitter.json" ] && MERGE_ARGS+=("--twitter" "$OUTDIR/twitter.json")
[ -f "$OUTDIR/web.json" ]     && MERGE_ARGS+=("--web" "$OUTDIR/web.json")
[ -f "$OUTDIR/github.json" ]  && MERGE_ARGS+=("--github" "$OUTDIR/github.json")
[ -f "$OUTDIR/reddit.json" ]  && MERGE_ARGS+=("--reddit" "$OUTDIR/reddit.json")

if [ ${#MERGE_ARGS[@]} -gt 2 ]; then
    run_step "merge-sources" python3 "$SCRIPT_DIR/merge-sources.py" "${MERGE_ARGS[@]}"
    validate_json "$OUTDIR/merged.json" "merged"

    # Validate merged structure
    if python3 -c "
import json, sys
d = json.load(open(sys.argv[1]))
assert 'topics' in d and 'output_stats' in d
stats = d['output_stats']
print(f'   πŸ“Š Merged: {stats.get(\"total_articles\", \"?\")} articles across {len(d[\"topics\"])} topics')
" "$OUTDIR/merged.json" 2>/dev/null; then
        echo "βœ… merged structure valid"
        PASSED=$((PASSED + 1))
    else
        echo "❌ merged structure invalid"
        FAILED=$((FAILED + 1))
    fi
else
    echo "⏭  merge (no source files to merge)"
    SKIPPED=$((SKIPPED + 1))
fi

# ── Summary ──
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "πŸ“Š Results: $PASSED passed, $FAILED failed, $SKIPPED skipped"
echo "   Output:  $OUTDIR"
if [ "$KEEP" = false ] && [ "$FAILED" -eq 0 ]; then
    rm -rf "$OUTDIR"
    echo "   (cleaned up β€” use --keep to preserve)"
fi
[ "$FAILED" -eq 0 ] && exit 0 || exit 1

```

### scripts/validate-config.py

```python
#!/usr/bin/env python3
"""
Configuration validation script for media-news-digest.

Validates sources.json and topics.json against JSON Schema and performs
additional consistency checks.

Usage:
    python3 validate-config.py [--defaults DEFAULTS_DIR] [--config CONFIG_DIR] [--verbose]
"""

import json
import argparse
import logging
import sys
import os
from pathlib import Path
from typing import Dict, Any, Set

try:
    import jsonschema
    from jsonschema import validate, ValidationError
    HAS_JSONSCHEMA = True
except ImportError:
    HAS_JSONSCHEMA = False


def setup_logging(verbose: bool) -> logging.Logger:
    """Setup logging configuration."""
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    return logging.getLogger(__name__)


def load_json_file(file_path: Path) -> Dict[str, Any]:
    """Load and parse JSON file."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    except FileNotFoundError:
        raise FileNotFoundError(f"Config file not found: {file_path}")
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON in {file_path}: {e}")


def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any], 
                          config_type: str) -> bool:
    """Validate data against JSON schema."""
    if not HAS_JSONSCHEMA:
        logging.warning("jsonschema not available, skipping schema validation")
        return True
        
    try:
        # Extract the relevant schema definition
        if config_type == "sources":
            schema_def = {
                "type": "object",
                "required": ["sources"],
                "properties": {
                    "sources": {
                        "type": "array", 
                        "items": schema["definitions"]["source"]
                    }
                }
            }
        elif config_type == "topics":
            schema_def = {
                "type": "object",
                "required": ["topics"],
                "properties": {
                    "topics": {
                        "type": "array",
                        "items": schema["definitions"]["topic"]
                    }
                }
            }
        else:
            raise ValueError(f"Unknown config type: {config_type}")
            
        validate(instance=data, schema=schema_def)
        logging.info(f"βœ… {config_type}.json passed schema validation")
        return True
        
    except ValidationError as e:
        logging.error(f"❌ Schema validation failed for {config_type}.json:")
        logging.error(f"   Path: {' -> '.join(str(p) for p in e.absolute_path)}")
        logging.error(f"   Error: {e.message}")
        return False


def validate_sources_consistency(sources_data: Dict[str, Any], 
                               topics_data: Dict[str, Any]) -> bool:
    """Validate consistency between sources and topics."""
    errors = []
    
    # Get valid topic IDs
    valid_topics = {topic["id"] for topic in topics_data["topics"]}
    logging.debug(f"Valid topic IDs: {valid_topics}")
    
    # Check source topic references
    for source in sources_data["sources"]:
        source_id = source.get("id", "unknown")
        source_topics = set(source.get("topics", []))
        
        # Check for invalid topic references
        invalid_topics = source_topics - valid_topics
        if invalid_topics:
            errors.append(f"Source '{source_id}' references invalid topics: {invalid_topics}")
            
        # Check for empty topic lists
        if not source_topics:
            errors.append(f"Source '{source_id}' has no topics assigned")
            
    # Check for duplicate source IDs
    source_ids = [source.get("id") for source in sources_data["sources"]]
    duplicates = {id for id in source_ids if source_ids.count(id) > 1}
    if duplicates:
        errors.append(f"Duplicate source IDs found: {duplicates}")
        
    # Check for duplicate topic IDs
    topic_ids = [topic.get("id") for topic in topics_data["topics"]]
    duplicates = {id for id in topic_ids if topic_ids.count(id) > 1}
    if duplicates:
        errors.append(f"Duplicate topic IDs found: {duplicates}")
        
    if errors:
        logging.error("❌ Consistency validation failed:")
        for error in errors:
            logging.error(f"   {error}")
        return False
    else:
        logging.info("βœ… Consistency validation passed")
        return True


def validate_source_types(sources_data: Dict[str, Any]) -> bool:
    """Validate source-type specific requirements."""
    errors = []
    
    for source in sources_data["sources"]:
        source_id = source.get("id", "unknown")
        source_type = source.get("type")
        
        if source_type == "rss":
            if not source.get("url"):
                errors.append(f"RSS source '{source_id}' missing required 'url' field")
        elif source_type == "twitter":
            if not source.get("handle"):
                errors.append(f"Twitter source '{source_id}' missing required 'handle' field")
        elif source_type == "github":
            if not source.get("repo"):
                errors.append(f"GitHub source '{source_id}' missing required 'repo' field")
        elif source_type == "reddit":
            if not source.get("subreddit"):
                errors.append(f"Reddit source '{source_id}' missing required 'subreddit' field")
        elif source_type == "web":
            # Web sources are handled by topics, no specific validation needed
            pass
        else:
            errors.append(f"Source '{source_id}' has invalid type: {source_type}")
            
    if errors:
        logging.error("❌ Source type validation failed:")
        for error in errors:
            logging.error(f"   {error}")
        return False
    else:
        logging.info("βœ… Source type validation passed")
        return True


def main():
    """Main validation function."""
    parser = argparse.ArgumentParser(
        description="Validate media-news-digest configuration files",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    python3 validate-config.py
    python3 validate-config.py --defaults config/defaults --config workspace/config --verbose
    python3 validate-config.py --config workspace/config --verbose  # backward compatibility
    """
    )
    
    parser.add_argument(
        "--defaults",
        type=Path,
        default=Path("config/defaults"),
        help="Default configuration directory with skill defaults (default: config/defaults)"
    )
    
    parser.add_argument(
        "--config",
        type=Path,
        help="User configuration directory for overlays (optional)"
    )
    
    parser.add_argument(
        "--verbose", "-v",
        action="store_true", 
        help="Enable verbose logging"
    )
    
    args = parser.parse_args()
    logger = setup_logging(args.verbose)
    
    # Load config_loader for merged configurations
    try:
        from config_loader import load_merged_sources, load_merged_topics
    except ImportError:
        # Fallback for relative import
        import sys
        sys.path.append(str(Path(__file__).parent))
        from config_loader import load_merged_sources, load_merged_topics
    
    # File paths
    schema_path = Path("config/schema.json")
    
    if args.config:
        logger.info(f"Validating merged configuration: defaults={args.defaults}, config={args.config}")
    else:
        logger.info(f"Validating default configuration: {args.defaults}")
    
    try:
        # Backward compatibility: if only --config provided, use old behavior
        if args.config and args.defaults == Path("config/defaults") and not args.defaults.exists():
            logger.debug("Backward compatibility mode: using --config as sole source")
            defaults_dir = args.config
            config_dir = None
        else:
            defaults_dir = args.defaults
            config_dir = args.config
        
        # Load schema
        schema = load_json_file(schema_path)
        logger.debug("Loaded schema.json")
        
        # Load merged configuration data
        merged_sources = load_merged_sources(defaults_dir, config_dir)
        merged_topics = load_merged_topics(defaults_dir, config_dir)
        
        # Convert to the format expected by validation functions
        sources_data = {"sources": merged_sources}
        topics_data = {"topics": merged_topics}
        
        logger.debug(f"Loaded {len(merged_sources)} merged sources, {len(merged_topics)} merged topics")
        
        # Perform validations
        all_valid = True
        
        # Schema validation
        all_valid &= validate_against_schema(sources_data, schema, "sources")
        all_valid &= validate_against_schema(topics_data, schema, "topics")
        
        # Consistency validation
        all_valid &= validate_sources_consistency(sources_data, topics_data)
        
        # Source type validation  
        all_valid &= validate_source_types(sources_data)
        
        # Summary
        if all_valid:
            logger.info("πŸŽ‰ All validations passed!")
            return 0
        else:
            logger.error("πŸ’₯ Validation failed!")
            return 1
            
    except Exception as e:
        logger.error(f"πŸ’₯ Validation error: {e}")
        return 1


if __name__ == "__main__":
    sys.exit(main())
```