Back to skills
SkillHub ClubShip Full StackFull Stack

arch-view

This skill should be used when the user asks to "generate architecture view", "show service dependency graph", "map request flows", "show event topology", "group services by domain", "visualize architecture", or mentions cross-repository architecture analysis, service mapping, or architectural visualization.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
0
Hot score
74
Updated
March 20, 2026
Overall rating
C2.6
Composite score
2.6
Best-practice grade
C61.1

Install command

npx @skill-hub/cli install astrabit-cpt-astrabit-docs-arch-view

Repository

AstraBit-CPT/astrabit-docs

Skill path: .claude/skills/arch-view

This skill should be used when the user asks to "generate architecture view", "show service dependency graph", "map request flows", "show event topology", "group services by domain", "visualize architecture", or mentions cross-repository architecture analysis, service mapping, or architectural visualization.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: AstraBit-CPT.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install arch-view into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/AstraBit-CPT/astrabit-docs before adding arch-view to shared team environments
  • Use arch-view for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: arch-view
description: This skill should be used when the user asks to "generate architecture view", "show service dependency graph", "map request flows", "show event topology", "group services by domain", "visualize architecture", or mentions cross-repository architecture analysis, service mapping, or architectural visualization.
version: 1.0.0
---

# Architecture View

Generate cross-repository architectural views by aggregating `catalog-info.yaml` metadata from all repositories using parallel subagents.

## Purpose

Create visual architectural representations (Mermaid diagrams, markdown tables) that show how services fit together, how requests flow through gateways, how events propagate, and how services group by domain/team.

## When to Use

Trigger this skill when:
- User asks to "show service dependency graph" or "map the architecture"
- User wants to understand "how services connect"
- User asks about "request flows" or "event topology"
- User wants to "group services by domain/team"
- User mentions "cross-repo architecture" or "system architecture"

## Workflow

### Phase 1: Discover Repositories

Find all repositories to analyze:

1. **Check `repos/` directory** - Local cloned repositories
2. **Optionally fetch from GitHub** - Use `gh repo list Astrabit-CPT` for full list
3. **Filter for active repos** - Skip archived or template repos

### Phase 2: Parallel Metadata Collection

Use subagents IN PARALLEL to read each repository's `catalog-info.yaml`:

```
For each repo:
  Launch subagent with: "Read catalog-info.yaml from [repo_path] and return the parsed content"
```

**Parallel processing strategy:**
- Launch 5-10 subagents simultaneously
- Collect all results
- Handle missing metadata gracefully (skip or note as missing)

### Phase 3: Aggregate and Build Model

Combine all metadata into a unified model:

```python
aggregated = {
    "components": {},  # name -> catalog info
    "dependencies": set(),  # (from, to) tuples
    "gateways": [],
    "services": [],
    "workers": [],
    "domains": {},  # domain -> [components]
    "events": {
        "producers": {},  # topic -> [producers]
        "consumers": {},  # topic -> [consumers]
    },
    "routes": [],  # gateway routes
}
```

### Phase 4: Generate Requested View

Based on user request, generate the appropriate view:

| View | Command/Trigger | Output |
|------|-----------------|--------|
| Service Dependency Graph | "dependency graph", "show dependencies" | Mermaid graph |
| Request Flow Maps | "request flows", "how requests flow" | Mermaid flowchart |
| Event Topology | "event topology", "event map" | Mermaid graph |
| Service Groupings | "group services", "services by domain" | Markdown tables |
| Full Architecture | "architecture view", "full architecture" | All views combined |

## View Templates

### Service Dependency Graph

```mermaid
graph TD
    Gateway[api-gateway<br/>type: gateway] --> Auth[auth-service<br/>type: service]
    Gateway --> Users[user-service<br/>type: service]
    Gateway --> Orders[order-service<br/>type: service]
    Users --> DB[(user-db<br/>type: database)]
    Auth --> Redis[(redis<br/>type: cache)]
    Orders --> OrdersDB[(order-db<br/>type: database)]
    Orders --> Worker[order-processor<br/>type: worker]
```

**Generation logic:**
- Nodes: All components from `catalog-info.yaml`
- Edges: From `dependsOn` relationships
- Shape: Database nodes use `[(name)]`, others use `[name]`
- Labels: Include `name` and `type`

### Request Flow Map

```mermaid
flowchart LR
    Client[Client] --> Gateway[api-gateway]
    Gateway -->|/api/users/*| Users[user-service]
    Gateway -->|/api/auth/*| Auth[auth-service]
    Gateway -->|/api/orders/*| Orders[order-service]
    Users --> DB[(user-db)]
    Auth --> Redis[(redis)]
```

**Generation logic:**
- Start from gateway components (type: gateway)
- Follow `routes` to find downstream services
- Include route paths as edge labels
- Follow service dependencies to databases

### Event Topology

```mermaid
graph LR
    Orders[order-service] -->|order.placed| Kafka1[Kafka: order-placed]
    Orders -->|order.cancelled| Kafka2[Kafka: order-cancelled]
    Kafka1 --> User[user-service]
    Kafka1 --> Notif[notification-service]
    Kafka2 --> User
    Worker[order-processor] -->|order.processed| Kafka3[Kafka: order-processed]
    Kafka1 --> Worker
```

**Generation logic:**
- Nodes: Services (from `eventProducers`) and Kafka topics (from `topic` field)
- Edges: Producer → Topic, Topic → Consumer
- Labels: Topic names on edges

### Service Groupings

**By Domain:**
| Domain | Services | Owner |
|--------|----------|-------|
| trading | order-service, trade-service, order-processor | trading-team |
| platform | api-gateway, user-service, auth-service | platform-team |
| shared | shared-utils, shared-types | platform-team |

**By Type:**
| Type | Services |
|------|----------|
| gateway | api-gateway |
| service | user-service, auth-service, order-service |
| worker | order-processor, notification-worker |
| library | shared-utils, shared-types |

**Generation logic:**
- Group by `spec.domain` field
- Show owner for each domain
- Count services per grouping

## Scripts

### aggregate-metadata.py

Collect all `catalog-info.yaml` files from repositories:

```bash
# Aggregate from repos directory
python skills/arch-view/scripts/aggregate-metadata.py repos/

# Output as JSON
python skills/arch-view/scripts/aggregate-metadata.py repos/ --format json

# Output as summary
python skills/arch-view/scripts/aggregate-metadata.py repos/ --summary
```

### generate-mermaid.py

Convert aggregated metadata to Mermaid diagrams:

```bash
# Generate all views
python skills/arch-view/scripts/generate-mermaid.py aggregated.json

# Generate specific view
python skills/arch-view/scripts/generate-mermaid.py aggregated.json --view dependency
python skills/arch-view/scripts/generate-mermaid.py aggregated.json --view request-flow
python skills/arch-view/scripts/generate-mermaid.py aggregated.json --view events
```

## Using Subagents

Launch subagents to read metadata in parallel:

```
For efficiency, launch multiple subagents simultaneously:

Subagent 1: "Read repos/api-gateway/catalog-info.yaml and return parsed YAML"
Subagent 2: "Read repos/user-service/catalog-info.yaml and return parsed YAML"
Subagent 3: "Read repos/order-service/catalog-info.yaml and return parsed YAML"
... (continue for all repos)

Collect results and aggregate.
```

**Tip:** Limit to 5-10 concurrent subagents to avoid overwhelming the system.

## Output Format

Present results as:

1. **Summary** - Quick overview of what was found
2. **Visual diagrams** - Mermaid graphs that render in supported viewers
3. **Tables** - Groupings and listings
4. **Missing metadata** - List of repos without catalog-info.yaml

### Example Output Structure

```markdown
# Architecture View

## Summary
- Total repositories: 15
- Components with metadata: 12
- Missing metadata: 3
- Gateways: 1
- Services: 8
- Workers: 2
- Libraries: 1

## Service Dependency Graph
[Mermaid diagram]

## Request Flow Map
[Mermaid diagram]

## Event Topology
[Mermaid diagram]

## Service Groupings
[Markdown tables]

## Missing Metadata
The following repositories lack catalog-info.yaml:
- repo-a
- repo-b
- repo-c
```

## Additional Resources

### Reference Files

- **`references/view-templates.md`** - Mermaid templates for each view type
- **`references/mermaid-guide.md`** - Mermaid syntax reference

### Scripts

- **`scripts/aggregate-metadata.py`** - Collect catalog-info.yaml from all repos
- **`scripts/generate-mermaid.py`** - Convert metadata to Mermaid diagrams


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### references/view-templates.md

```markdown
# Mermaid View Templates

Templates for generating different architectural views as Mermaid diagrams.

## Service Dependency Graph Template

```mermaid
graph TD
    %% Gateways
    %% For each component with type=gateway
    Gateway[{{name}}<br/>type: {{type}}]

    %% Services
    %% For each component with type=service
    Service[{{name}}<br/>type: {{type}}]

    %% Workers
    %% For each component with type=worker
    Worker[{{name}}<br/>type: {{type}}]

    %% Databases - use different shape
    %% For each component with type=database
    DB[({{name}}<br/>type: {{type}})]

    %% Dependencies - edges from dependsOn
    %% For each dependency in component.dependsOn
    Gateway --> Service
    Service --> DB
    Worker --> DB

    %% Styling
    classDef gateway fill:#e1f5fe
    classDef service fill:#f3e5f5
    classDef worker fill:#fff3e0
    classDef database fill:#e8f5e9

    class Gateway gateway
    class Service service
    class Worker worker
    class DB database
```

### Generation Rules

1. **Node shapes:**
   - Gateway/Service/Worker: `[Name]`
   - Database: `[(Name)]`

2. **Node labels:**
   - Format: `{name}<br/>type: {type}`
   - Optionally add domain if available

3. **Edges:**
   - From `dependsOn` relationships
   - Arrow direction: dependent → dependency

4. **Layout:**
   - Use `graph TD` (top-down) for better readability
   - Consider `graph LR` (left-right) for wide architectures

## Request Flow Map Template

```mermaid
flowchart LR
    Client[External Client] --> Gateway[{{gateway-name}}]

    %% Routes from gateway
    %% For each route in gateway.routes
    Gateway -->|{{route.path}}| Downstream[{{route.forwardsTo}}]

    %% Service dependencies
    %% For each service.dependsOn
    Downstream --> DB[(database)]

    %% Styling
    classDef gateway fill:#e1f5fe,stroke:#01579b
    classDef service fill:#f3e5f5,stroke:#4a148c
    classDef database fill:#e8f5e9,stroke:#1b5e20
    classDef external fill:#ffebee,stroke:#b71c1c
```

### Generation Rules

1. **Start from gateways:**
   - Find all components with `type: gateway`
   - These are the entry points

2. **Follow routes:**
   - Parse `routes` array from gateway metadata
   - Create edge with route path as label
   - Handle both `handler: this` and `forwardsTo: service`

3. **Add dependencies:**
   - Include databases and caches that services depend on
   - Don't show service-to-service dependencies (avoid clutter)

4. **Edge labels:**
   - Show route patterns: `/api/users/*`
   - Use `|label|` syntax in Mermaid

## Event Topology Template

```mermaid
graph LR
    %% Producers
    %% For each component with eventProducers
    Producer[{{service-name}}] -->|{{topic}}| Topic[(Kafka:<br/>{{topic}})]

    %% Topics to Consumers
    %% For each topic's consumers
    Topic --> Consumer[{{service-name}}]

    %% Styling
    classDef producer fill:#e8f5e9
    classDef consumer fill:#fff3e0
    classDef topic fill:#e1f5fe,stroke:#0277bd

    class Producer producer
    class Consumer consumer
    class Topic topic
```

### Generation Rules

1. **Topic nodes:**
   - Create a topic node for each unique topic name
   - Format: `[(Kafka: {topic})]`
   - Group by topic, not by producer

2. **Producer edges:**
   - From service to topic it produces
   - Label: topic name

3. **Consumer edges:**
   - From topic to consuming service
   - If multiple consumers, show all

4. **Message flow:**
   - Optional: show event schema on edges
   - Format: `|{event}| {topic}`

## Domain Grouping Template

Not a Mermaid diagram - use Markdown tables:

```markdown
## Services by Domain

| Domain | Services | Owner | Gateway | Service | Worker |
|--------|----------|-------|---------|---------|--------|
| trading | order-service, trade-service | trading-team | trading-gateway | order-service, trade-service | order-processor |
| platform | user-service, auth-service | platform-team | api-gateway | user-service, auth-service | notification-worker |

## Service Ownership

| Service | Domain | Type | Owner |
|---------|--------|------|-------|
| api-gateway | platform | gateway | platform-team |
| user-service | platform | service | platform-team |
| order-service | trading | service | trading-team |
```

## Complete Architecture Template

Combine all views:

```markdown
# System Architecture

## Overview
[Summary text about the system]

## Service Dependency Graph
[Mermaid dependency graph]

## Request Flows
[Mermaid request flow diagram]

## Event Topology
[Mermaid event graph]

## Service Groupings
[Markdown tables by domain]

## Data Flows
[Additional detail on how data flows through the system]
```

## Mermaid Syntax Quick Reference

### Graph Types
- `graph TD` - Top-down
- `graph LR` - Left-right
- `flowchart LR` - Flowchart (more control)

### Shapes
- `[text]` - Rectangle (process)
- `[(text)]` - Cylinder (database)
- `((text))` - Circle (connector)
- `{text}` - Rhombus (decision)
- `[[text]]` - Square with rounded corners

### Edges
- `A --> B` - Arrow
- `A -->|label| B` - Labeled arrow
- `A -.-> B` - Dotted arrow
- `A === B` - Thick line

### Subgraphs
```mermaid
graph TD
    subgraph Domain [Trading Domain]
        A[Service A]
        B[Service B]
    end
    subgraph Platform [Platform Domain]
        C[Service C]
    end
    A --> C
```

### Styling
```mermaid
graph TD
    A[Node]
    B[Node]

    classDef default fill:#f9f9f9,stroke:#333,stroke-width:2px
    classDef highlight fill:#ffeb3b,stroke:#f57f17

    class A default
    class B highlight
```

```

### scripts/aggregate-metadata.py

```python
#!/usr/bin/env python3
"""
Aggregate catalog-info.yaml files from multiple repositories.

Usage: python aggregate-metadata.py [repos_directory]
Output: JSON containing aggregated metadata from all repos
"""

import argparse
import json
import sys
from pathlib import Path
from typing import Dict, List, Any


def load_yaml(file_path: Path) -> Dict[str, Any]:
    """Load and parse a YAML file."""
    try:
        import yaml
        with open(file_path, 'r') as f:
            return yaml.safe_load(f) or {}
    except ImportError:
        # Fallback: simple parsing for our schema
        return simple_parse_yaml(file_path)


def simple_parse_yaml(file_path: Path) -> Dict[str, Any]:
    """Simple YAML parser for catalog-info.yaml structure."""
    result = {}
    current_section = None
    current_list = None
    list_indent = 0

    with open(file_path, 'r') as f:
        for line in f:
            stripped = line.rstrip()
            if not stripped or stripped.startswith('#'):
                continue

            indent = len(line) - len(line.lstrip())

            # Top level sections
            if indent == 0 and ':' in stripped:
                key = stripped.split(':', 1)[0]
                if key in ['apiVersion', 'kind']:
                    result[key] = stripped.split(':', 1)[1].strip().strip('"').strip("'")
                elif key == 'metadata':
                    result['metadata'] = {}
                    current_section = result['metadata']
                elif key == 'spec':
                    result['spec'] = {}
                    current_section = result['spec']
                continue

            # Within a section
            if current_section is not None:
                # Check for list item
                if stripped.startswith('- ') and ':' in stripped[2:]:
                    if current_list is None:
                        current_list = []
                        # Find the key in current_section that should hold this list
                        list_indent = indent

                    item_content = stripped[2:]
                    if ':' in item_content:
                        item_key = item_content.split(':', 1)[0].strip()
                        item_value = item_content.split(':', 1)[1].strip().strip('"').strip("'")
                        current_list.append({item_key: item_value})
                    continue

                # End of list
                if current_list is not None and indent <= list_indent:
                    # Assign list to section (simplified - assumes single list per section)
                    if isinstance(current_section, dict):
                        # Determine which key this list belongs to
                        for key in list(current_section.keys())[-1:]:
                            current_section[key] = current_list
                    current_list = None

                # Key-value pair
                if ':' in stripped:
                    key = stripped.split(':', 1)[0].strip()
                    value = stripped.split(':', 1)[1].strip().strip('"').strip("'")
                    if value and value != 'null' and value != '~':
                        current_section[key] = value
                    else:
                        current_section[key] = None

    return result


def find_catalog_files(repos_dir: Path) -> List[tuple[str, Path]]:
    """Find all catalog-info.yaml files in repository directories."""
    catalog_files = []

    if not repos_dir.exists():
        return catalog_files

    for item in repos_dir.iterdir():
        if item.is_dir() and not item.name.startswith('.'):
            catalog_path = item / "catalog-info.yaml"
            if catalog_path.exists():
                catalog_files.append((item.name, catalog_path))

    return catalog_files


def aggregate_metadata(repos_dir: Path) -> Dict[str, Any]:
    """Aggregate metadata from all repositories."""
    catalog_files = find_catalog_files(repos_dir)

    aggregated = {
        "repositories": [],
        "components": {},
        "domains": {},
        "owners": {},
        "types": {
            "gateway": [],
            "service": [],
            "worker": [],
            "library": [],
            "frontend": [],
            "database": [],
        },
        "dependencies": {},
        "events": {
            "topics": set(),
            "producers": {},
            "consumers": {},
        },
        "routes": [],
        "missing_metadata": [],
    }

    for repo_name, catalog_path in catalog_files:
        try:
            catalog = load_yaml(catalog_path)

            # Extract basic info
            metadata = catalog.get("metadata", {})
            spec = catalog.get("spec", {})

            name = metadata.get("name", repo_name)
            component_type = spec.get("type", "unknown")
            domain = spec.get("domain", "unknown")
            owner = spec.get("owner", "unknown")

            # Store component
            aggregated["components"][name] = {
                "name": name,
                "repo": repo_name,
                "description": metadata.get("description", ""),
                "type": component_type,
                "domain": domain,
                "owner": owner,
                "lifecycle": spec.get("lifecycle", "unknown"),
                "runtime": spec.get("runtime", "unknown"),
                "framework": spec.get("framework", "unknown"),
                "catalog_path": str(catalog_path),
            }

            # Track by type
            if component_type in aggregated["types"]:
                aggregated["types"][component_type].append(name)

            # Track by domain
            if domain != "unknown":
                if domain not in aggregated["domains"]:
                    aggregated["domains"][domain] = []
                aggregated["domains"][domain].append(name)

            # Track by owner
            if owner != "unknown":
                if owner not in aggregated["owners"]:
                    aggregated["owners"][owner] = []
                aggregated["owners"][owner].append(name)

            # Dependencies
            depends_on = spec.get("dependsOn", [])
            if depends_on:
                aggregated["dependencies"][name] = [
                    dep.get("component", dep) if isinstance(dep, dict) else dep
                    for dep in depends_on
                ]

            # Events
            for producer in spec.get("eventProducers", []):
                topic = producer.get("topic", "")
                if topic:
                    aggregated["events"]["topics"].add(topic)
                    if topic not in aggregated["events"]["producers"]:
                        aggregated["events"]["producers"][topic] = []
                    aggregated["events"]["producers"][topic].append(name)

            for consumer in spec.get("eventConsumers", []):
                topic = consumer.get("topic", "")
                if topic:
                    aggregated["events"]["topics"].add(topic)
                    if topic not in aggregated["events"]["consumers"]:
                        aggregated["events"]["consumers"][topic] = []
                    aggregated["events"]["consumers"][topic].append(name)

            # Routes (for gateways)
            if component_type == "gateway":
                for route in spec.get("routes", []):
                    aggregated["routes"].append({
                        "gateway": name,
                        "path": route.get("path", ""),
                        "methods": route.get("methods", []),
                        "forwardsTo": route.get("forwardsTo"),
                        "handler": route.get("handler"),
                    })

        except Exception as e:
            print(f"Warning: Failed to parse {catalog_path}: {e}", file=sys.stderr)
            aggregated["missing_metadata"].append(repo_name)

    # Check for repos without catalog-info.yaml
    for item in repos_dir.iterdir():
        if item.is_dir() and not item.name.startswith('.'):
            catalog_path = item / "catalog-info.yaml"
            if not catalog_path.exists() and item.name not in aggregated["missing_metadata"]:
                aggregated["missing_metadata"].append(item.name)

    # Convert sets to lists for JSON serialization
    aggregated["events"]["topics"] = sorted(list(aggregated["events"]["topics"]))

    return aggregated


def print_summary(aggregated: Dict[str, Any]):
    """Print a summary of aggregated metadata."""
    print(f"# Architecture Summary")
    print()
    print(f"- **Total repositories scanned:** {len(aggregated['components']) + len(aggregated['missing_metadata'])}")
    print(f"- **Components with metadata:** {len(aggregated['components'])}")
    print(f"- **Missing metadata:** {len(aggregated['missing_metadata'])}")
    print()
    print(f"## Components by Type")
    for comp_type, components in aggregated["types"].items():
        if components:
            print(f"- **{comp_type}:** {', '.join(components)}")
    print()
    print(f"## Domains")
    for domain, components in aggregated["domains"].items():
        print(f"- **{domain}:** {', '.join(components)}")
    print()

    if aggregated["missing_metadata"]:
        print(f"## Missing Metadata")
        for repo in aggregated["missing_metadata"]:
            print(f"- {repo}")
        print()


def main():
    parser = argparse.ArgumentParser(
        description="Aggregate catalog-info.yaml files from multiple repositories"
    )
    parser.add_argument("repos_dir", nargs="?", default="repos",
                       help="Directory containing repositories")
    parser.add_argument("--format", choices=["json", "summary"], default="summary",
                       help="Output format")
    parser.add_argument("--output", "-o", help="Write to file instead of stdout")
    args = parser.parse_args()

    repos_dir = Path(args.repos_dir)

    if not repos_dir.exists():
        print(f"Error: Directory '{repos_dir}' does not exist", file=sys.stderr)
        return 1

    aggregated = aggregate_metadata(repos_dir)

    if args.format == "json":
        output = json.dumps(aggregated, indent=2, default=str)
    else:
        output = ""
        # Capture summary output
        import io
        old_stdout = sys.stdout
        sys.stdout = io.StringIO()
        print_summary(aggregated)
        output = sys.stdout.getvalue()
        sys.stdout = old_stdout

    if args.output:
        Path(args.output).write_text(output)
        print(f"Wrote {args.format} output to {args.output}")
    else:
        print(output)

    return 0


if __name__ == "__main__":
    sys.exit(main())

```

### scripts/generate-mermaid.py

```python
#!/usr/bin/env python3
"""
Generate Mermaid diagrams from aggregated metadata.

Usage: python generate-mermaid.py aggregated.json
Output: Mermaid diagrams for dependency graph, request flow, event topology
"""

import argparse
import json
import sys
from pathlib import Path
from typing import Dict, Any, List, Set


def load_aggregated(file_path: Path) -> Dict[str, Any]:
    """Load aggregated metadata JSON."""
    with open(file_path, 'r') as f:
        return json.load(f)


def generate_dependency_graph(aggregated: Dict[str, Any]) -> str:
    """Generate service dependency graph as Mermaid."""
    lines = ["```mermaid", "graph TD"]

    # Track edges to avoid duplicates
    edges = set()

    # Generate nodes
    for name, comp in aggregated["components"].items():
        comp_type = comp["type"]

        # Node label
        label = f"{name}<br/>type: {comp_type}"
        if comp.get("domain") != "unknown":
            label += f"<br/>domain: {comp['domain']}"

        # Node shape based on type
        if comp_type == "database":
            lines.append(f'    {name}[({label})]')
        elif comp_type == "gateway":
            lines.append(f'    {{name}}[{{label}}]')
        else:
            lines.append(f'    {name}[{label}]')

        # Add dependency edges
        if name in aggregated.get("dependencies", {}):
            for dep in aggregated["dependencies"][name]:
                edge = f"{name} --> {dep}"
                if edge not in edges:
                    edges.add(edge)

    # Add all edges
    for edge in sorted(edges):
        lines.append(f"    {edge}")

    # Add styling
    lines.append("")
    lines.append("    classDef gateway fill:#e1f5fe,stroke:#01579b")
    lines.append("    classDef service fill:#f3e5f5,stroke:#4a148c")
    lines.append("    classDef worker fill:#fff3e0,stroke:#e65100")
    lines.append("    classDef library fill:#f3e5f5,stroke:#7b1fa2,stroke-dasharray: 5 5")
    lines.append("    classDef database fill:#e8f5e9,stroke:#1b5e20")
    lines.append("    classDef frontend fill:#fce4ec,stroke:#880e4f")
    lines.append("")

    # Assign classes
    for name, comp in aggregated["components"].items():
        comp_type = comp["type"]
        if comp_type in ["gateway", "service", "worker", "library", "database", "frontend"]:
            lines.append(f"    class {name} {comp_type}")

    lines.append("```")
    return "\n".join(lines)


def generate_request_flow(aggregated: Dict[str, Any]) -> str:
    """Generate request flow map as Mermaid."""
    lines = ["```mermaid", "flowchart LR"]
    lines.append('    Client[External Client]')

    # Find gateways
    gateways = [name for name, comp in aggregated["components"].items()
                if comp["type"] == "gateway"]

    # Track added nodes
    added = set()

    # Connect clients to gateways
    for gateway in gateways:
        lines.append(f'    Client -->|API| {gateway}')
        added.add(gateway)

        # Add routes from this gateway
        for route in aggregated.get("routes", []):
            if route["gateway"] == gateway:
                target = route.get("forwardsTo") or route.get("handler")
                path = route.get("path", "")

                if target and target != "this":
                    if target not in added:
                        lines.append(f'    {gateway}[{gateway}] -->|{path}| {target}')
                        added.add(target)
                    else:
                        lines.append(f'    {gateway} -->|{path}| {target}')

    # Add service dependencies to databases
    for name, comp in aggregated["components"].items():
        if comp["type"] in ["service", "gateway"]:
            for dep in aggregated.get("dependencies", {}).get(name, []):
                dep_comp = aggregated["components"].get(dep, {})
                if dep_comp.get("type") == "database":
                    lines.append(f'    {name} --> {dep}[({dep})]')

    # Styling
    lines.append("")
    lines.append("    classDef gateway fill:#e1f5fe,stroke:#01579b,stroke-width:2px")
    lines.append("    classDef service fill:#f3e5f5,stroke:#4a148c")
    lines.append("    classDef database fill:#e8f5e9,stroke:#1b5e20")
    lines.append("    classDef client fill:#ffebee,stroke:#b71c1c")
    lines.append("")
    lines.append("    class Client client")
    lines.append("    class " + " ".join(gateways) + f" {'gateway' if gateways else ''}")
    lines.append("```")

    return "\n".join(lines)


def generate_event_topology(aggregated: Dict[str, Any]) -> str:
    """Generate event topology as Mermaid."""
    lines = ["```mermaid", "graph LR"]

    events = aggregated.get("events", {})
    topics = sorted(events.get("topics", []))
    producers = events.get("producers", {})
    consumers = events.get("consumers", {})

    # Track added nodes
    added = set()

    # Create topic nodes
    for topic in topics:
        safe_topic = topic.replace(".", "_").replace("-", "_")
        lines.append(f'    {safe_topic}[((Kafka: {topic}))]')
        added.add(safe_topic)

    # Add producer edges
    for topic, services in producers.items():
        safe_topic = topic.replace(".", "_").replace("-", "_")
        for service in services:
            if service not in added:
                lines.append(f'    {service}[{service}]')
                added.add(service)
            lines.append(f'    {service} -->|{topic}| {safe_topic}')

    # Add consumer edges
    for topic, services in consumers.items():
        safe_topic = topic.replace(".", "_").replace("-", "_")
        for service in services:
            if service not in added:
                lines.append(f'    {service}[{service}]')
                added.add(service)
            lines.append(f'    {safe_topic} -->|{topic}| {service}')

    # Styling
    lines.append("")
    lines.append("    classDef producer fill:#e8f5e9,stroke:#2e7d32")
    lines.append("    classDef consumer fill:#fff3e0,stroke:#e65100")
    lines.append("    classDef topic fill:#e1f5fe,stroke:#0277bd,stroke-width:2px")
    lines.append("")

    # Classify
    all_producers = set()
    for services in producers.values():
        all_producers.update(services)

    all_consumers = set()
    for services in consumers.values():
        all_consumers.update(services)

    lines.append("    class " + " ".join(sorted(all_producers)) + f" {'producer' if all_producers else ''}")
    lines.append("    class " + " ".join(sorted(all_consumers)) + f" {'consumer' if all_consumers else ''}")
    lines.append("    class " + " ".join(t.replace(".", "_").replace("-", "_") for t in topics) + " topic")

    lines.append("```")
    return "\n".join(lines)


def generate_service_groupings(aggregated: Dict[str, Any]) -> str:
    """Generate service groupings as Markdown tables."""
    lines = ["## Services by Domain", ""]
    lines.append("| Domain | Services | Owner |")
    lines.append("|--------|----------|-------|")

    for domain, services in sorted(aggregated.get("domains", {}).items()):
        # Get owner for this domain (first service's owner)
        owner = "unknown"
        for service in services:
            comp = aggregated["components"].get(service, {})
            if comp.get("owner") != "unknown":
                owner = comp["owner"]
                break

        lines.append(f"| {domain} | {', '.join(services)} | {owner} |")

    lines.append("")
    lines.append("## Services by Type")
    lines.append("")
    lines.append("| Type | Services |")
    lines.append("|------|----------|")

    for comp_type, services in aggregated["types"].items():
        if services:
            lines.append(f"| {comp_type} | {', '.join(services)} |")

    return "\n".join(lines)


def generate_full_view(aggregated: Dict[str, Any]) -> str:
    """Generate complete architecture view with all diagrams."""
    lines = ["# Architecture View", ""]
    lines.append("## Summary")
    lines.append(f"- **Total components:** {len(aggregated['components'])}")
    lines.append(f"- **Components by type:**")
    for comp_type, services in aggregated["types"].items():
        if services:
            lines.append(f"  - {comp_type}: {len(services)}")
    lines.append("")

    if aggregated.get("domains"):
        lines.append("## Domains")
        for domain, services in sorted(aggregated["domains"].items()):
            lines.append(f"- **{domain}**: {', '.join(services)}")
        lines.append("")

    lines.append("## Service Dependency Graph")
    lines.append(generate_dependency_graph(aggregated))
    lines.append("")

    lines.append("## Request Flow Map")
    lines.append(generate_request_flow(aggregated))
    lines.append("")

    if aggregated["events"]["topics"]:
        lines.append("## Event Topology")
        lines.append(generate_event_topology(aggregated))
        lines.append("")

    lines.append("## Service Groupings")
    lines.append(generate_service_groupings(aggregated))
    lines.append("")

    if aggregated.get("missing_metadata"):
        lines.append("## Missing Metadata")
        lines.append("The following repositories lack catalog-info.yaml:")
        for repo in aggregated["missing_metadata"]:
            lines.append(f"- {repo}")
        lines.append("")

    return "\n".join(lines)


def main():
    parser = argparse.ArgumentParser(
        description="Generate Mermaid diagrams from aggregated metadata"
    )
    parser.add_argument("input_file", help="Aggregated metadata JSON file")
    parser.add_argument("--view", choices=["dependency", "request-flow", "events", "groupings", "full"],
                       default="full", help="Which view to generate")
    parser.add_argument("--output", "-o", help="Write to file instead of stdout")
    args = parser.parse_args()

    input_path = Path(args.input_file)

    if not input_path.exists():
        print(f"Error: File '{input_path}' does not exist", file=sys.stderr)
        return 1

    aggregated = load_aggregated(input_path)

    view_generators = {
        "dependency": generate_dependency_graph,
        "request-flow": generate_request_flow,
        "events": generate_event_topology,
        "groupings": generate_service_groupings,
        "full": generate_full_view,
    }

    output = view_generators[args.view](aggregated)

    if args.output:
        Path(args.output).write_text(output)
        print(f"Wrote {args.view} view to {args.output}")
    else:
        print(output)

    return 0


if __name__ == "__main__":
    sys.exit(main())

```

arch-view | SkillHub