arch-view
This skill should be used when the user asks to "generate architecture view", "show service dependency graph", "map request flows", "show event topology", "group services by domain", "visualize architecture", or mentions cross-repository architecture analysis, service mapping, or architectural visualization.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install astrabit-cpt-astrabit-docs-arch-view
Repository
Skill path: .claude/skills/arch-view
This skill should be used when the user asks to "generate architecture view", "show service dependency graph", "map request flows", "show event topology", "group services by domain", "visualize architecture", or mentions cross-repository architecture analysis, service mapping, or architectural visualization.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: AstraBit-CPT.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install arch-view into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/AstraBit-CPT/astrabit-docs before adding arch-view to shared team environments
- Use arch-view for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: arch-view
description: This skill should be used when the user asks to "generate architecture view", "show service dependency graph", "map request flows", "show event topology", "group services by domain", "visualize architecture", or mentions cross-repository architecture analysis, service mapping, or architectural visualization.
version: 1.0.0
---
# Architecture View
Generate cross-repository architectural views by aggregating `catalog-info.yaml` metadata from all repositories using parallel subagents.
## Purpose
Create visual architectural representations (Mermaid diagrams, markdown tables) that show how services fit together, how requests flow through gateways, how events propagate, and how services group by domain/team.
## When to Use
Trigger this skill when:
- User asks to "show service dependency graph" or "map the architecture"
- User wants to understand "how services connect"
- User asks about "request flows" or "event topology"
- User wants to "group services by domain/team"
- User mentions "cross-repo architecture" or "system architecture"
## Workflow
### Phase 1: Discover Repositories
Find all repositories to analyze:
1. **Check `repos/` directory** - Local cloned repositories
2. **Optionally fetch from GitHub** - Use `gh repo list Astrabit-CPT` for full list
3. **Filter for active repos** - Skip archived or template repos
### Phase 2: Parallel Metadata Collection
Use subagents IN PARALLEL to read each repository's `catalog-info.yaml`:
```
For each repo:
Launch subagent with: "Read catalog-info.yaml from [repo_path] and return the parsed content"
```
**Parallel processing strategy:**
- Launch 5-10 subagents simultaneously
- Collect all results
- Handle missing metadata gracefully (skip or note as missing)
### Phase 3: Aggregate and Build Model
Combine all metadata into a unified model:
```python
aggregated = {
"components": {}, # name -> catalog info
"dependencies": set(), # (from, to) tuples
"gateways": [],
"services": [],
"workers": [],
"domains": {}, # domain -> [components]
"events": {
"producers": {}, # topic -> [producers]
"consumers": {}, # topic -> [consumers]
},
"routes": [], # gateway routes
}
```
### Phase 4: Generate Requested View
Based on user request, generate the appropriate view:
| View | Command/Trigger | Output |
|------|-----------------|--------|
| Service Dependency Graph | "dependency graph", "show dependencies" | Mermaid graph |
| Request Flow Maps | "request flows", "how requests flow" | Mermaid flowchart |
| Event Topology | "event topology", "event map" | Mermaid graph |
| Service Groupings | "group services", "services by domain" | Markdown tables |
| Full Architecture | "architecture view", "full architecture" | All views combined |
## View Templates
### Service Dependency Graph
```mermaid
graph TD
Gateway[api-gateway<br/>type: gateway] --> Auth[auth-service<br/>type: service]
Gateway --> Users[user-service<br/>type: service]
Gateway --> Orders[order-service<br/>type: service]
Users --> DB[(user-db<br/>type: database)]
Auth --> Redis[(redis<br/>type: cache)]
Orders --> OrdersDB[(order-db<br/>type: database)]
Orders --> Worker[order-processor<br/>type: worker]
```
**Generation logic:**
- Nodes: All components from `catalog-info.yaml`
- Edges: From `dependsOn` relationships
- Shape: Database nodes use `[(name)]`, others use `[name]`
- Labels: Include `name` and `type`
### Request Flow Map
```mermaid
flowchart LR
Client[Client] --> Gateway[api-gateway]
Gateway -->|/api/users/*| Users[user-service]
Gateway -->|/api/auth/*| Auth[auth-service]
Gateway -->|/api/orders/*| Orders[order-service]
Users --> DB[(user-db)]
Auth --> Redis[(redis)]
```
**Generation logic:**
- Start from gateway components (type: gateway)
- Follow `routes` to find downstream services
- Include route paths as edge labels
- Follow service dependencies to databases
### Event Topology
```mermaid
graph LR
Orders[order-service] -->|order.placed| Kafka1[Kafka: order-placed]
Orders -->|order.cancelled| Kafka2[Kafka: order-cancelled]
Kafka1 --> User[user-service]
Kafka1 --> Notif[notification-service]
Kafka2 --> User
Worker[order-processor] -->|order.processed| Kafka3[Kafka: order-processed]
Kafka1 --> Worker
```
**Generation logic:**
- Nodes: Services (from `eventProducers`) and Kafka topics (from `topic` field)
- Edges: Producer → Topic, Topic → Consumer
- Labels: Topic names on edges
### Service Groupings
**By Domain:**
| Domain | Services | Owner |
|--------|----------|-------|
| trading | order-service, trade-service, order-processor | trading-team |
| platform | api-gateway, user-service, auth-service | platform-team |
| shared | shared-utils, shared-types | platform-team |
**By Type:**
| Type | Services |
|------|----------|
| gateway | api-gateway |
| service | user-service, auth-service, order-service |
| worker | order-processor, notification-worker |
| library | shared-utils, shared-types |
**Generation logic:**
- Group by `spec.domain` field
- Show owner for each domain
- Count services per grouping
## Scripts
### aggregate-metadata.py
Collect all `catalog-info.yaml` files from repositories:
```bash
# Aggregate from repos directory
python skills/arch-view/scripts/aggregate-metadata.py repos/
# Output as JSON
python skills/arch-view/scripts/aggregate-metadata.py repos/ --format json
# Output as summary
python skills/arch-view/scripts/aggregate-metadata.py repos/ --summary
```
### generate-mermaid.py
Convert aggregated metadata to Mermaid diagrams:
```bash
# Generate all views
python skills/arch-view/scripts/generate-mermaid.py aggregated.json
# Generate specific view
python skills/arch-view/scripts/generate-mermaid.py aggregated.json --view dependency
python skills/arch-view/scripts/generate-mermaid.py aggregated.json --view request-flow
python skills/arch-view/scripts/generate-mermaid.py aggregated.json --view events
```
## Using Subagents
Launch subagents to read metadata in parallel:
```
For efficiency, launch multiple subagents simultaneously:
Subagent 1: "Read repos/api-gateway/catalog-info.yaml and return parsed YAML"
Subagent 2: "Read repos/user-service/catalog-info.yaml and return parsed YAML"
Subagent 3: "Read repos/order-service/catalog-info.yaml and return parsed YAML"
... (continue for all repos)
Collect results and aggregate.
```
**Tip:** Limit to 5-10 concurrent subagents to avoid overwhelming the system.
## Output Format
Present results as:
1. **Summary** - Quick overview of what was found
2. **Visual diagrams** - Mermaid graphs that render in supported viewers
3. **Tables** - Groupings and listings
4. **Missing metadata** - List of repos without catalog-info.yaml
### Example Output Structure
```markdown
# Architecture View
## Summary
- Total repositories: 15
- Components with metadata: 12
- Missing metadata: 3
- Gateways: 1
- Services: 8
- Workers: 2
- Libraries: 1
## Service Dependency Graph
[Mermaid diagram]
## Request Flow Map
[Mermaid diagram]
## Event Topology
[Mermaid diagram]
## Service Groupings
[Markdown tables]
## Missing Metadata
The following repositories lack catalog-info.yaml:
- repo-a
- repo-b
- repo-c
```
## Additional Resources
### Reference Files
- **`references/view-templates.md`** - Mermaid templates for each view type
- **`references/mermaid-guide.md`** - Mermaid syntax reference
### Scripts
- **`scripts/aggregate-metadata.py`** - Collect catalog-info.yaml from all repos
- **`scripts/generate-mermaid.py`** - Convert metadata to Mermaid diagrams
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### references/view-templates.md
```markdown
# Mermaid View Templates
Templates for generating different architectural views as Mermaid diagrams.
## Service Dependency Graph Template
```mermaid
graph TD
%% Gateways
%% For each component with type=gateway
Gateway[{{name}}<br/>type: {{type}}]
%% Services
%% For each component with type=service
Service[{{name}}<br/>type: {{type}}]
%% Workers
%% For each component with type=worker
Worker[{{name}}<br/>type: {{type}}]
%% Databases - use different shape
%% For each component with type=database
DB[({{name}}<br/>type: {{type}})]
%% Dependencies - edges from dependsOn
%% For each dependency in component.dependsOn
Gateway --> Service
Service --> DB
Worker --> DB
%% Styling
classDef gateway fill:#e1f5fe
classDef service fill:#f3e5f5
classDef worker fill:#fff3e0
classDef database fill:#e8f5e9
class Gateway gateway
class Service service
class Worker worker
class DB database
```
### Generation Rules
1. **Node shapes:**
- Gateway/Service/Worker: `[Name]`
- Database: `[(Name)]`
2. **Node labels:**
- Format: `{name}<br/>type: {type}`
- Optionally add domain if available
3. **Edges:**
- From `dependsOn` relationships
- Arrow direction: dependent → dependency
4. **Layout:**
- Use `graph TD` (top-down) for better readability
- Consider `graph LR` (left-right) for wide architectures
## Request Flow Map Template
```mermaid
flowchart LR
Client[External Client] --> Gateway[{{gateway-name}}]
%% Routes from gateway
%% For each route in gateway.routes
Gateway -->|{{route.path}}| Downstream[{{route.forwardsTo}}]
%% Service dependencies
%% For each service.dependsOn
Downstream --> DB[(database)]
%% Styling
classDef gateway fill:#e1f5fe,stroke:#01579b
classDef service fill:#f3e5f5,stroke:#4a148c
classDef database fill:#e8f5e9,stroke:#1b5e20
classDef external fill:#ffebee,stroke:#b71c1c
```
### Generation Rules
1. **Start from gateways:**
- Find all components with `type: gateway`
- These are the entry points
2. **Follow routes:**
- Parse `routes` array from gateway metadata
- Create edge with route path as label
- Handle both `handler: this` and `forwardsTo: service`
3. **Add dependencies:**
- Include databases and caches that services depend on
- Don't show service-to-service dependencies (avoid clutter)
4. **Edge labels:**
- Show route patterns: `/api/users/*`
- Use `|label|` syntax in Mermaid
## Event Topology Template
```mermaid
graph LR
%% Producers
%% For each component with eventProducers
Producer[{{service-name}}] -->|{{topic}}| Topic[(Kafka:<br/>{{topic}})]
%% Topics to Consumers
%% For each topic's consumers
Topic --> Consumer[{{service-name}}]
%% Styling
classDef producer fill:#e8f5e9
classDef consumer fill:#fff3e0
classDef topic fill:#e1f5fe,stroke:#0277bd
class Producer producer
class Consumer consumer
class Topic topic
```
### Generation Rules
1. **Topic nodes:**
- Create a topic node for each unique topic name
- Format: `[(Kafka: {topic})]`
- Group by topic, not by producer
2. **Producer edges:**
- From service to topic it produces
- Label: topic name
3. **Consumer edges:**
- From topic to consuming service
- If multiple consumers, show all
4. **Message flow:**
- Optional: show event schema on edges
- Format: `|{event}| {topic}`
## Domain Grouping Template
Not a Mermaid diagram - use Markdown tables:
```markdown
## Services by Domain
| Domain | Services | Owner | Gateway | Service | Worker |
|--------|----------|-------|---------|---------|--------|
| trading | order-service, trade-service | trading-team | trading-gateway | order-service, trade-service | order-processor |
| platform | user-service, auth-service | platform-team | api-gateway | user-service, auth-service | notification-worker |
## Service Ownership
| Service | Domain | Type | Owner |
|---------|--------|------|-------|
| api-gateway | platform | gateway | platform-team |
| user-service | platform | service | platform-team |
| order-service | trading | service | trading-team |
```
## Complete Architecture Template
Combine all views:
```markdown
# System Architecture
## Overview
[Summary text about the system]
## Service Dependency Graph
[Mermaid dependency graph]
## Request Flows
[Mermaid request flow diagram]
## Event Topology
[Mermaid event graph]
## Service Groupings
[Markdown tables by domain]
## Data Flows
[Additional detail on how data flows through the system]
```
## Mermaid Syntax Quick Reference
### Graph Types
- `graph TD` - Top-down
- `graph LR` - Left-right
- `flowchart LR` - Flowchart (more control)
### Shapes
- `[text]` - Rectangle (process)
- `[(text)]` - Cylinder (database)
- `((text))` - Circle (connector)
- `{text}` - Rhombus (decision)
- `[[text]]` - Square with rounded corners
### Edges
- `A --> B` - Arrow
- `A -->|label| B` - Labeled arrow
- `A -.-> B` - Dotted arrow
- `A === B` - Thick line
### Subgraphs
```mermaid
graph TD
subgraph Domain [Trading Domain]
A[Service A]
B[Service B]
end
subgraph Platform [Platform Domain]
C[Service C]
end
A --> C
```
### Styling
```mermaid
graph TD
A[Node]
B[Node]
classDef default fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef highlight fill:#ffeb3b,stroke:#f57f17
class A default
class B highlight
```
```
### scripts/aggregate-metadata.py
```python
#!/usr/bin/env python3
"""
Aggregate catalog-info.yaml files from multiple repositories.
Usage: python aggregate-metadata.py [repos_directory]
Output: JSON containing aggregated metadata from all repos
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Dict, List, Any
def load_yaml(file_path: Path) -> Dict[str, Any]:
"""Load and parse a YAML file."""
try:
import yaml
with open(file_path, 'r') as f:
return yaml.safe_load(f) or {}
except ImportError:
# Fallback: simple parsing for our schema
return simple_parse_yaml(file_path)
def simple_parse_yaml(file_path: Path) -> Dict[str, Any]:
"""Simple YAML parser for catalog-info.yaml structure."""
result = {}
current_section = None
current_list = None
list_indent = 0
with open(file_path, 'r') as f:
for line in f:
stripped = line.rstrip()
if not stripped or stripped.startswith('#'):
continue
indent = len(line) - len(line.lstrip())
# Top level sections
if indent == 0 and ':' in stripped:
key = stripped.split(':', 1)[0]
if key in ['apiVersion', 'kind']:
result[key] = stripped.split(':', 1)[1].strip().strip('"').strip("'")
elif key == 'metadata':
result['metadata'] = {}
current_section = result['metadata']
elif key == 'spec':
result['spec'] = {}
current_section = result['spec']
continue
# Within a section
if current_section is not None:
# Check for list item
if stripped.startswith('- ') and ':' in stripped[2:]:
if current_list is None:
current_list = []
# Find the key in current_section that should hold this list
list_indent = indent
item_content = stripped[2:]
if ':' in item_content:
item_key = item_content.split(':', 1)[0].strip()
item_value = item_content.split(':', 1)[1].strip().strip('"').strip("'")
current_list.append({item_key: item_value})
continue
# End of list
if current_list is not None and indent <= list_indent:
# Assign list to section (simplified - assumes single list per section)
if isinstance(current_section, dict):
# Determine which key this list belongs to
for key in list(current_section.keys())[-1:]:
current_section[key] = current_list
current_list = None
# Key-value pair
if ':' in stripped:
key = stripped.split(':', 1)[0].strip()
value = stripped.split(':', 1)[1].strip().strip('"').strip("'")
if value and value != 'null' and value != '~':
current_section[key] = value
else:
current_section[key] = None
return result
def find_catalog_files(repos_dir: Path) -> List[tuple[str, Path]]:
"""Find all catalog-info.yaml files in repository directories."""
catalog_files = []
if not repos_dir.exists():
return catalog_files
for item in repos_dir.iterdir():
if item.is_dir() and not item.name.startswith('.'):
catalog_path = item / "catalog-info.yaml"
if catalog_path.exists():
catalog_files.append((item.name, catalog_path))
return catalog_files
def aggregate_metadata(repos_dir: Path) -> Dict[str, Any]:
"""Aggregate metadata from all repositories."""
catalog_files = find_catalog_files(repos_dir)
aggregated = {
"repositories": [],
"components": {},
"domains": {},
"owners": {},
"types": {
"gateway": [],
"service": [],
"worker": [],
"library": [],
"frontend": [],
"database": [],
},
"dependencies": {},
"events": {
"topics": set(),
"producers": {},
"consumers": {},
},
"routes": [],
"missing_metadata": [],
}
for repo_name, catalog_path in catalog_files:
try:
catalog = load_yaml(catalog_path)
# Extract basic info
metadata = catalog.get("metadata", {})
spec = catalog.get("spec", {})
name = metadata.get("name", repo_name)
component_type = spec.get("type", "unknown")
domain = spec.get("domain", "unknown")
owner = spec.get("owner", "unknown")
# Store component
aggregated["components"][name] = {
"name": name,
"repo": repo_name,
"description": metadata.get("description", ""),
"type": component_type,
"domain": domain,
"owner": owner,
"lifecycle": spec.get("lifecycle", "unknown"),
"runtime": spec.get("runtime", "unknown"),
"framework": spec.get("framework", "unknown"),
"catalog_path": str(catalog_path),
}
# Track by type
if component_type in aggregated["types"]:
aggregated["types"][component_type].append(name)
# Track by domain
if domain != "unknown":
if domain not in aggregated["domains"]:
aggregated["domains"][domain] = []
aggregated["domains"][domain].append(name)
# Track by owner
if owner != "unknown":
if owner not in aggregated["owners"]:
aggregated["owners"][owner] = []
aggregated["owners"][owner].append(name)
# Dependencies
depends_on = spec.get("dependsOn", [])
if depends_on:
aggregated["dependencies"][name] = [
dep.get("component", dep) if isinstance(dep, dict) else dep
for dep in depends_on
]
# Events
for producer in spec.get("eventProducers", []):
topic = producer.get("topic", "")
if topic:
aggregated["events"]["topics"].add(topic)
if topic not in aggregated["events"]["producers"]:
aggregated["events"]["producers"][topic] = []
aggregated["events"]["producers"][topic].append(name)
for consumer in spec.get("eventConsumers", []):
topic = consumer.get("topic", "")
if topic:
aggregated["events"]["topics"].add(topic)
if topic not in aggregated["events"]["consumers"]:
aggregated["events"]["consumers"][topic] = []
aggregated["events"]["consumers"][topic].append(name)
# Routes (for gateways)
if component_type == "gateway":
for route in spec.get("routes", []):
aggregated["routes"].append({
"gateway": name,
"path": route.get("path", ""),
"methods": route.get("methods", []),
"forwardsTo": route.get("forwardsTo"),
"handler": route.get("handler"),
})
except Exception as e:
print(f"Warning: Failed to parse {catalog_path}: {e}", file=sys.stderr)
aggregated["missing_metadata"].append(repo_name)
# Check for repos without catalog-info.yaml
for item in repos_dir.iterdir():
if item.is_dir() and not item.name.startswith('.'):
catalog_path = item / "catalog-info.yaml"
if not catalog_path.exists() and item.name not in aggregated["missing_metadata"]:
aggregated["missing_metadata"].append(item.name)
# Convert sets to lists for JSON serialization
aggregated["events"]["topics"] = sorted(list(aggregated["events"]["topics"]))
return aggregated
def print_summary(aggregated: Dict[str, Any]):
"""Print a summary of aggregated metadata."""
print(f"# Architecture Summary")
print()
print(f"- **Total repositories scanned:** {len(aggregated['components']) + len(aggregated['missing_metadata'])}")
print(f"- **Components with metadata:** {len(aggregated['components'])}")
print(f"- **Missing metadata:** {len(aggregated['missing_metadata'])}")
print()
print(f"## Components by Type")
for comp_type, components in aggregated["types"].items():
if components:
print(f"- **{comp_type}:** {', '.join(components)}")
print()
print(f"## Domains")
for domain, components in aggregated["domains"].items():
print(f"- **{domain}:** {', '.join(components)}")
print()
if aggregated["missing_metadata"]:
print(f"## Missing Metadata")
for repo in aggregated["missing_metadata"]:
print(f"- {repo}")
print()
def main():
parser = argparse.ArgumentParser(
description="Aggregate catalog-info.yaml files from multiple repositories"
)
parser.add_argument("repos_dir", nargs="?", default="repos",
help="Directory containing repositories")
parser.add_argument("--format", choices=["json", "summary"], default="summary",
help="Output format")
parser.add_argument("--output", "-o", help="Write to file instead of stdout")
args = parser.parse_args()
repos_dir = Path(args.repos_dir)
if not repos_dir.exists():
print(f"Error: Directory '{repos_dir}' does not exist", file=sys.stderr)
return 1
aggregated = aggregate_metadata(repos_dir)
if args.format == "json":
output = json.dumps(aggregated, indent=2, default=str)
else:
output = ""
# Capture summary output
import io
old_stdout = sys.stdout
sys.stdout = io.StringIO()
print_summary(aggregated)
output = sys.stdout.getvalue()
sys.stdout = old_stdout
if args.output:
Path(args.output).write_text(output)
print(f"Wrote {args.format} output to {args.output}")
else:
print(output)
return 0
if __name__ == "__main__":
sys.exit(main())
```
### scripts/generate-mermaid.py
```python
#!/usr/bin/env python3
"""
Generate Mermaid diagrams from aggregated metadata.
Usage: python generate-mermaid.py aggregated.json
Output: Mermaid diagrams for dependency graph, request flow, event topology
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Dict, Any, List, Set
def load_aggregated(file_path: Path) -> Dict[str, Any]:
"""Load aggregated metadata JSON."""
with open(file_path, 'r') as f:
return json.load(f)
def generate_dependency_graph(aggregated: Dict[str, Any]) -> str:
"""Generate service dependency graph as Mermaid."""
lines = ["```mermaid", "graph TD"]
# Track edges to avoid duplicates
edges = set()
# Generate nodes
for name, comp in aggregated["components"].items():
comp_type = comp["type"]
# Node label
label = f"{name}<br/>type: {comp_type}"
if comp.get("domain") != "unknown":
label += f"<br/>domain: {comp['domain']}"
# Node shape based on type
if comp_type == "database":
lines.append(f' {name}[({label})]')
elif comp_type == "gateway":
lines.append(f' {{name}}[{{label}}]')
else:
lines.append(f' {name}[{label}]')
# Add dependency edges
if name in aggregated.get("dependencies", {}):
for dep in aggregated["dependencies"][name]:
edge = f"{name} --> {dep}"
if edge not in edges:
edges.add(edge)
# Add all edges
for edge in sorted(edges):
lines.append(f" {edge}")
# Add styling
lines.append("")
lines.append(" classDef gateway fill:#e1f5fe,stroke:#01579b")
lines.append(" classDef service fill:#f3e5f5,stroke:#4a148c")
lines.append(" classDef worker fill:#fff3e0,stroke:#e65100")
lines.append(" classDef library fill:#f3e5f5,stroke:#7b1fa2,stroke-dasharray: 5 5")
lines.append(" classDef database fill:#e8f5e9,stroke:#1b5e20")
lines.append(" classDef frontend fill:#fce4ec,stroke:#880e4f")
lines.append("")
# Assign classes
for name, comp in aggregated["components"].items():
comp_type = comp["type"]
if comp_type in ["gateway", "service", "worker", "library", "database", "frontend"]:
lines.append(f" class {name} {comp_type}")
lines.append("```")
return "\n".join(lines)
def generate_request_flow(aggregated: Dict[str, Any]) -> str:
"""Generate request flow map as Mermaid."""
lines = ["```mermaid", "flowchart LR"]
lines.append(' Client[External Client]')
# Find gateways
gateways = [name for name, comp in aggregated["components"].items()
if comp["type"] == "gateway"]
# Track added nodes
added = set()
# Connect clients to gateways
for gateway in gateways:
lines.append(f' Client -->|API| {gateway}')
added.add(gateway)
# Add routes from this gateway
for route in aggregated.get("routes", []):
if route["gateway"] == gateway:
target = route.get("forwardsTo") or route.get("handler")
path = route.get("path", "")
if target and target != "this":
if target not in added:
lines.append(f' {gateway}[{gateway}] -->|{path}| {target}')
added.add(target)
else:
lines.append(f' {gateway} -->|{path}| {target}')
# Add service dependencies to databases
for name, comp in aggregated["components"].items():
if comp["type"] in ["service", "gateway"]:
for dep in aggregated.get("dependencies", {}).get(name, []):
dep_comp = aggregated["components"].get(dep, {})
if dep_comp.get("type") == "database":
lines.append(f' {name} --> {dep}[({dep})]')
# Styling
lines.append("")
lines.append(" classDef gateway fill:#e1f5fe,stroke:#01579b,stroke-width:2px")
lines.append(" classDef service fill:#f3e5f5,stroke:#4a148c")
lines.append(" classDef database fill:#e8f5e9,stroke:#1b5e20")
lines.append(" classDef client fill:#ffebee,stroke:#b71c1c")
lines.append("")
lines.append(" class Client client")
lines.append(" class " + " ".join(gateways) + f" {'gateway' if gateways else ''}")
lines.append("```")
return "\n".join(lines)
def generate_event_topology(aggregated: Dict[str, Any]) -> str:
"""Generate event topology as Mermaid."""
lines = ["```mermaid", "graph LR"]
events = aggregated.get("events", {})
topics = sorted(events.get("topics", []))
producers = events.get("producers", {})
consumers = events.get("consumers", {})
# Track added nodes
added = set()
# Create topic nodes
for topic in topics:
safe_topic = topic.replace(".", "_").replace("-", "_")
lines.append(f' {safe_topic}[((Kafka: {topic}))]')
added.add(safe_topic)
# Add producer edges
for topic, services in producers.items():
safe_topic = topic.replace(".", "_").replace("-", "_")
for service in services:
if service not in added:
lines.append(f' {service}[{service}]')
added.add(service)
lines.append(f' {service} -->|{topic}| {safe_topic}')
# Add consumer edges
for topic, services in consumers.items():
safe_topic = topic.replace(".", "_").replace("-", "_")
for service in services:
if service not in added:
lines.append(f' {service}[{service}]')
added.add(service)
lines.append(f' {safe_topic} -->|{topic}| {service}')
# Styling
lines.append("")
lines.append(" classDef producer fill:#e8f5e9,stroke:#2e7d32")
lines.append(" classDef consumer fill:#fff3e0,stroke:#e65100")
lines.append(" classDef topic fill:#e1f5fe,stroke:#0277bd,stroke-width:2px")
lines.append("")
# Classify
all_producers = set()
for services in producers.values():
all_producers.update(services)
all_consumers = set()
for services in consumers.values():
all_consumers.update(services)
lines.append(" class " + " ".join(sorted(all_producers)) + f" {'producer' if all_producers else ''}")
lines.append(" class " + " ".join(sorted(all_consumers)) + f" {'consumer' if all_consumers else ''}")
lines.append(" class " + " ".join(t.replace(".", "_").replace("-", "_") for t in topics) + " topic")
lines.append("```")
return "\n".join(lines)
def generate_service_groupings(aggregated: Dict[str, Any]) -> str:
"""Generate service groupings as Markdown tables."""
lines = ["## Services by Domain", ""]
lines.append("| Domain | Services | Owner |")
lines.append("|--------|----------|-------|")
for domain, services in sorted(aggregated.get("domains", {}).items()):
# Get owner for this domain (first service's owner)
owner = "unknown"
for service in services:
comp = aggregated["components"].get(service, {})
if comp.get("owner") != "unknown":
owner = comp["owner"]
break
lines.append(f"| {domain} | {', '.join(services)} | {owner} |")
lines.append("")
lines.append("## Services by Type")
lines.append("")
lines.append("| Type | Services |")
lines.append("|------|----------|")
for comp_type, services in aggregated["types"].items():
if services:
lines.append(f"| {comp_type} | {', '.join(services)} |")
return "\n".join(lines)
def generate_full_view(aggregated: Dict[str, Any]) -> str:
"""Generate complete architecture view with all diagrams."""
lines = ["# Architecture View", ""]
lines.append("## Summary")
lines.append(f"- **Total components:** {len(aggregated['components'])}")
lines.append(f"- **Components by type:**")
for comp_type, services in aggregated["types"].items():
if services:
lines.append(f" - {comp_type}: {len(services)}")
lines.append("")
if aggregated.get("domains"):
lines.append("## Domains")
for domain, services in sorted(aggregated["domains"].items()):
lines.append(f"- **{domain}**: {', '.join(services)}")
lines.append("")
lines.append("## Service Dependency Graph")
lines.append(generate_dependency_graph(aggregated))
lines.append("")
lines.append("## Request Flow Map")
lines.append(generate_request_flow(aggregated))
lines.append("")
if aggregated["events"]["topics"]:
lines.append("## Event Topology")
lines.append(generate_event_topology(aggregated))
lines.append("")
lines.append("## Service Groupings")
lines.append(generate_service_groupings(aggregated))
lines.append("")
if aggregated.get("missing_metadata"):
lines.append("## Missing Metadata")
lines.append("The following repositories lack catalog-info.yaml:")
for repo in aggregated["missing_metadata"]:
lines.append(f"- {repo}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Generate Mermaid diagrams from aggregated metadata"
)
parser.add_argument("input_file", help="Aggregated metadata JSON file")
parser.add_argument("--view", choices=["dependency", "request-flow", "events", "groupings", "full"],
default="full", help="Which view to generate")
parser.add_argument("--output", "-o", help="Write to file instead of stdout")
args = parser.parse_args()
input_path = Path(args.input_file)
if not input_path.exists():
print(f"Error: File '{input_path}' does not exist", file=sys.stderr)
return 1
aggregated = load_aggregated(input_path)
view_generators = {
"dependency": generate_dependency_graph,
"request-flow": generate_request_flow,
"events": generate_event_topology,
"groupings": generate_service_groupings,
"full": generate_full_view,
}
output = view_generators[args.view](aggregated)
if args.output:
Path(args.output).write_text(output)
print(f"Wrote {args.view} view to {args.output}")
else:
print(output)
return 0
if __name__ == "__main__":
sys.exit(main())
```