Back to skills
SkillHub ClubShip Full StackFull Stack

message-bus

File-based message queue for inter-agent coordination. Used by workers AND board directors to communicate. Provides: progress updates, task completion signals, file locking, board deliberation. Core infrastructure for parallel execution.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
281
Hot score
99
Updated
March 20, 2026
Overall rating
C3.3
Composite score
3.3
Best-practice grade
C60.3

Install command

npx @skill-hub/cli install ibrahim-3d-conductor-orchestrator-superpowers-message-bus

Repository

Ibrahim-3d/conductor-orchestrator-superpowers

Skill path: skills/message-bus

File-based message queue for inter-agent coordination. Used by workers AND board directors to communicate. Provides: progress updates, task completion signals, file locking, board deliberation. Core infrastructure for parallel execution.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: Ibrahim-3d.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install message-bus into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/Ibrahim-3d/conductor-orchestrator-superpowers before adding message-bus to shared team environments
  • Use message-bus for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: message-bus
description: "File-based message queue for inter-agent coordination. Used by workers AND board directors to communicate. Provides: progress updates, task completion signals, file locking, board deliberation. Core infrastructure for parallel execution."
---

# Message Bus -- Inter-Agent Communication Protocol

File-based message queue enabling workers and board directors to coordinate via shared state.

## Directory Structure

```
conductor/tracks/{track}/.message-bus/
├── queue.jsonl           # Append-only message log (all messages)
├── locks.json            # Current file locks
├── worker-status.json    # Worker heartbeats and states
├── events/               # Signal files for polling
│   ├── TASK_COMPLETE_1.1.event
│   └── FILE_UNLOCK_*.event
└── board/                # Board deliberation sessions
    ├── session-{ts}.json # Session metadata
    ├── assessments.json  # Director assessments (Phase 1)
    ├── discussion.jsonl  # Discussion messages (Phase 2)
    └── votes.json        # Final votes (Phase 3)
```

## Message Types

### Worker Messages

| Type | Purpose | Payload |
|------|---------|---------|
| `PROGRESS` | Task progress update | `{ task_id, progress_pct, current_subtask }` |
| `TASK_COMPLETE` | Task finished | `{ task_id, commit_sha, files_modified, unblocks[] }` |
| `TASK_FAILED` | Task failed | `{ task_id, error, stack_trace }` |
| `FILE_LOCK` | Acquire file lock | `{ filepath, lock_type, expires_at }` |
| `FILE_UNLOCK` | Release file lock | `{ filepath }` |
| `BLOCKED` | Waiting on dependency | `{ task_id, waiting_for, resource }` |

### Board Messages

| Type | Purpose | Payload |
|------|---------|---------|
| `BOARD_ASSESS` | Director assessment | `{ director, verdict, score, concerns[], recommendations[] }` |
| `BOARD_DISCUSS` | Discussion message | `{ from, to, type, message, changes_my_verdict }` |
| `BOARD_VOTE` | Final vote | `{ director, final_verdict, confidence, conditions[] }` |
| `BOARD_RESOLVE` | Aggregated decision | `{ verdict, vote_summary, conditions[], dissent[] }` |

## Message Format

All messages follow this structure:

```json
{
  "id": "msg-{uuid}",
  "type": "PROGRESS | TASK_COMPLETE | BOARD_ASSESS | ...",
  "source": "worker-1.1-xxx | CA | orchestrator",
  "timestamp": "2026-02-01T12:00:00Z",
  "payload": { ... }
}
```

## Worker Protocol

### Posting Messages

```python
def post_message(bus_path: str, msg_type: str, source: str, payload: dict):
    message = {
        "id": f"msg-{uuid4()}",
        "type": msg_type,
        "source": source,
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "payload": payload
    }

    # Append to queue (atomic via file locking)
    with open(f"{bus_path}/queue.jsonl", "a") as f:
        f.write_file(json.dumps(message) + "\n")

    # Create event file for polling
    if msg_type in ["TASK_COMPLETE", "FILE_UNLOCK", "BOARD_RESOLVE"]:
        event_file = f"{bus_path}/events/{msg_type}_{payload.get('task_id', 'all')}.event"
        Path(event_file).touch()
```

### Reading Messages

```python
def read_messages(bus_path: str, since: str = None, msg_type: str = None) -> list:
    messages = []
    with open(f"{bus_path}/queue.jsonl", "r") as f:
        for line in f:
            msg = json.loads(line)
            if since and msg["timestamp"] < since:
                continue
            if msg_type and msg["type"] != msg_type:
                continue
            messages.append(msg)
    return messages
```

### Polling for Events

```python
def wait_for_event(bus_path: str, event_pattern: str, timeout: int = 300) -> bool:
    """Wait for event file to appear. Returns True if found, False if timeout."""
    import glob
    import time

    start = time.time()
    while time.time() - start < timeout:
        matches = glob.glob(f"{bus_path}/events/{event_pattern}")
        if matches:
            return True
        time.sleep(1)
    return False
```

## File Lock Protocol

### Acquiring Locks

```python
def acquire_lock(bus_path: str, filepath: str, worker_id: str) -> bool:
    locks_file = f"{bus_path}/locks.json"

    # read_file current locks
    locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}

    # Check if already locked
    if filepath in locks:
        existing = locks[filepath]
        # Check if expired (30 min timeout)
        if datetime.fromisoformat(existing["expires_at"]) > datetime.utcnow():
            if existing["worker_id"] != worker_id:
                return False  # Locked by another worker

    # Acquire lock
    locks[filepath] = {
        "worker_id": worker_id,
        "acquired_at": datetime.utcnow().isoformat() + "Z",
        "expires_at": (datetime.utcnow() + timedelta(minutes=30)).isoformat() + "Z"
    }

    with open(locks_file, "w") as f:
        json.dump(locks, f, indent=2)

    # Post lock message
    post_message(bus_path, "FILE_LOCK", worker_id, {"filepath": filepath})
    return True
```

### Releasing Locks

```python
def release_lock(bus_path: str, filepath: str, worker_id: str):
    locks_file = f"{bus_path}/locks.json"
    locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}

    if filepath in locks and locks[filepath]["worker_id"] == worker_id:
        del locks[filepath]
        with open(locks_file, "w") as f:
            json.dump(locks, f, indent=2)

        # Post unlock message and event
        post_message(bus_path, "FILE_UNLOCK", worker_id, {"filepath": filepath})
```

## Worker Status Heartbeat

Workers post heartbeats every 5 minutes:

```python
def update_worker_status(bus_path: str, worker_id: str, task_id: str, status: str, progress: int):
    status_file = f"{bus_path}/worker-status.json"
    statuses = json.load(open(status_file)) if os.path.exists(status_file) else {}

    statuses[worker_id] = {
        "task_id": task_id,
        "status": status,  # "RUNNING" | "COMPLETE" | "FAILED" | "BLOCKED"
        "progress_pct": progress,
        "last_heartbeat": datetime.utcnow().isoformat() + "Z"
    }

    with open(status_file, "w") as f:
        json.dump(statuses, f, indent=2)
```

## Board Deliberation Protocol

### Phase 1: Assessment

Each director posts their assessment:

```python
def post_board_assessment(bus_path: str, director: str, assessment: dict):
    board_path = f"{bus_path}/board"

    # read_file existing assessments
    assess_file = f"{board_path}/assessments.json"
    assessments = json.load(open(assess_file)) if os.path.exists(assess_file) else {}

    # Add this director's assessment
    assessments[director] = assessment

    with open(assess_file, "w") as f:
        json.dump(assessments, f, indent=2)

    # Post to main queue too
    post_message(bus_path, "BOARD_ASSESS", director, assessment)
```

### Phase 2: Discussion

Directors respond to each other:

```python
def post_board_discussion(bus_path: str, from_dir: str, to_dir: str,
                          msg_type: str, message: str, changes_verdict: bool):
    board_path = f"{bus_path}/board"

    discussion_msg = {
        "from": from_dir,
        "to": to_dir,
        "type": msg_type,  # "CHALLENGE" | "AGREE" | "QUESTION" | "CLARIFY"
        "message": message,
        "changes_my_verdict": changes_verdict,
        "timestamp": datetime.utcnow().isoformat() + "Z"
    }

    # Append to discussion log
    with open(f"{board_path}/discussion.jsonl", "a") as f:
        f.write_file(json.dumps(discussion_msg) + "\n")

    # Post to main queue
    post_message(bus_path, "BOARD_DISCUSS", from_dir, discussion_msg)
```

### Phase 3: Voting

Directors cast final votes:

```python
def post_board_vote(bus_path: str, director: str, verdict: str,
                    confidence: float, conditions: list):
    board_path = f"{bus_path}/board"

    votes_file = f"{board_path}/votes.json"
    votes = json.load(open(votes_file)) if os.path.exists(votes_file) else {}

    votes[director] = {
        "final_verdict": verdict,  # "APPROVE" | "REJECT"
        "confidence": confidence,  # 0.0 - 1.0
        "conditions": conditions,
        "timestamp": datetime.utcnow().isoformat() + "Z"
    }

    with open(votes_file, "w") as f:
        json.dump(votes, f, indent=2)

    post_message(bus_path, "BOARD_VOTE", director, votes[director])
```

### Phase 4: Resolution

Orchestrator aggregates votes:

```python
def resolve_board_vote(bus_path: str) -> dict:
    board_path = f"{bus_path}/board"
    votes = json.load(open(f"{board_path}/votes.json"))

    approve_count = sum(1 for v in votes.values() if v["final_verdict"] == "APPROVE")
    reject_count = len(votes) - approve_count

    # Determine verdict
    if approve_count >= 4:
        verdict = "APPROVED"
    elif approve_count == 3:
        verdict = "APPROVED_WITH_REVIEW"
    elif reject_count >= 4:
        verdict = "REJECTED"
    elif reject_count == 3:
        verdict = "REJECTED"
    else:
        verdict = "ESCALATE"

    # Collect conditions
    all_conditions = []
    for director, vote in votes.items():
        for cond in vote.get("conditions", []):
            all_conditions.append(f"{cond} ({director})")

    resolution = {
        "verdict": verdict,
        "vote_summary": {d: v["final_verdict"] for d, v in votes.items()},
        "conditions": all_conditions,
        "timestamp": datetime.utcnow().isoformat() + "Z"
    }

    # Post resolution
    post_message(bus_path, "BOARD_RESOLVE", "orchestrator", resolution)

    # Create event file
    Path(f"{bus_path}/events/BOARD_RESOLVE.event").touch()

    return resolution
```

## Deadlock Detection

Monitor for circular waits:

```python
def detect_deadlock(bus_path: str) -> list:
    """Returns list of workers in deadlock cycle, or empty if none."""
    status_file = f"{bus_path}/worker-status.json"
    locks_file = f"{bus_path}/locks.json"

    statuses = json.load(open(status_file)) if os.path.exists(status_file) else {}
    locks = json.load(open(locks_file)) if os.path.exists(locks_file) else {}

    # Build wait-for graph
    # worker -> worker it's waiting for
    wait_for = {}

    # Find blocked workers
    blocked_msgs = read_messages(bus_path, msg_type="BLOCKED")
    for msg in blocked_msgs:
        blocker = msg["payload"].get("waiting_for")
        if blocker:
            wait_for[msg["source"]] = blocker

    # Detect cycles using DFS
    def find_cycle(start, visited, path):
        if start in path:
            return path[path.index(start):]
        if start in visited:
            return []
        visited.add(start)
        path.append(start)
        if start in wait_for:
            cycle = find_cycle(wait_for[start], visited, path)
            if cycle:
                return cycle
        path.pop()
        return []

    visited = set()
    for worker in wait_for:
        cycle = find_cycle(worker, visited, [])
        if cycle:
            return cycle

    return []
```

## Initialization

Initialize message bus for a track:

```python
def init_message_bus(track_path: str):
    bus_path = f"{track_path}/.message-bus"

    # Create directories
    os.makedirs(bus_path, exist_ok=True)
    os.makedirs(f"{bus_path}/events", exist_ok=True)
    os.makedirs(f"{bus_path}/board", exist_ok=True)

    # Initialize files
    Path(f"{bus_path}/queue.jsonl").touch()

    with open(f"{bus_path}/locks.json", "w") as f:
        json.dump({}, f)

    with open(f"{bus_path}/worker-status.json", "w") as f:
        json.dump({}, f)

    with open(f"{bus_path}/board/assessments.json", "w") as f:
        json.dump({}, f)

    with open(f"{bus_path}/board/votes.json", "w") as f:
        json.dump({}, f)

    Path(f"{bus_path}/board/discussion.jsonl").touch()
```

## Usage in Worker Agents

```markdown
## Worker Protocol

1. **On Start**:
   - read_file message bus for TASK_COMPLETE events of dependencies
   - Verify all dependencies are met
   - Update worker-status.json with RUNNING

2. **Before Modifying Files**:
   - Call acquire_lock() for each file
   - If lock fails, post BLOCKED message and wait

3. **During Execution**:
   - Post PROGRESS every 5 minutes
   - Update worker-status.json heartbeat

4. **On Completion**:
   - Release all file locks
   - Post TASK_COMPLETE with commit SHA and files modified
   - Update worker-status.json with COMPLETE

5. **On Failure**:
   - Release all file locks
   - Post TASK_FAILED with error details
   - Update worker-status.json with FAILED
```

## Usage in Board Deliberation

```markdown
## Board Protocol

1. **Phase 1 (ASSESS)**:
   - All 5 directors read_file proposal
   - Each posts BOARD_ASSESS to assessments.json
   - Wait for all 5 assessments

2. **Phase 2 (DISCUSS)** -- 3 rounds:
   - Directors read_file others' assessments
   - Post BOARD_DISCUSS messages
   - Respond to challenges and questions

3. **Phase 3 (VOTE)**:
   - Each director posts BOARD_VOTE
   - Include confidence level and conditions

4. **Phase 4 (RESOLVE)**:
   - Orchestrator calls resolve_board_vote()
   - Posts BOARD_RESOLVE
   - Creates event file for completion
```

## Board Session Management

### Creating a Board Session

```python
def create_board_session(bus_path: str, checkpoint: str, proposal: dict) -> str:
    """Initialize a new board session for deliberation."""
    board_path = f"{bus_path}/board"
    session_id = f"board-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"

    session = {
        "session_id": session_id,
        "checkpoint": checkpoint,  # "EVALUATE_PLAN" | "EVALUATE_EXECUTION" | "PRE_LAUNCH"
        "status": "ASSESSING",
        "proposal": proposal,
        "directors": ["CA", "CPO", "CSO", "COO", "CXO"],
        "started_at": datetime.utcnow().isoformat() + "Z",
        "phases": {
            "assess": {"status": "IN_PROGRESS", "complete": 0, "of": 5},
            "discuss": {"status": "NOT_STARTED", "rounds": 0, "max_rounds": 3},
            "vote": {"status": "NOT_STARTED", "complete": 0, "of": 5},
            "resolve": {"status": "NOT_STARTED"}
        }
    }

    # Clear previous session data
    with open(f"{board_path}/assessments.json", "w") as f:
        json.dump({}, f, indent=2)
    with open(f"{board_path}/votes.json", "w") as f:
        json.dump({}, f, indent=2)
    Path(f"{board_path}/discussion.jsonl").write_text("")

    # Save session metadata
    with open(f"{board_path}/session-{session_id}.json", "w") as f:
        json.dump(session, f, indent=2)

    return session_id
```

### Checking Phase Completion

```python
def check_board_phase_complete(bus_path: str, session_id: str) -> dict:
    """Check if current board phase is complete and advance if ready."""
    board_path = f"{bus_path}/board"
    session_file = f"{board_path}/session-{session_id}.json"
    session = json.load(open(session_file))

    assessments = json.load(open(f"{board_path}/assessments.json"))
    votes = json.load(open(f"{board_path}/votes.json"))
    discussions = []
    with open(f"{board_path}/discussion.jsonl") as f:
        discussions = [json.loads(l) for l in f if l.strip()]

    result = {"phase": session["status"], "complete": False, "can_advance": False}

    if session["status"] == "ASSESSING":
        session["phases"]["assess"]["complete"] = len(assessments)
        if len(assessments) >= 5:
            result["complete"] = True
            result["can_advance"] = True
            result["next_phase"] = "DISCUSSING"

    elif session["status"] == "DISCUSSING":
        current_round = session["phases"]["discuss"]["rounds"]
        if current_round >= 3:
            result["complete"] = True
            result["can_advance"] = True
            result["next_phase"] = "VOTING"

    elif session["status"] == "VOTING":
        session["phases"]["vote"]["complete"] = len(votes)
        if len(votes) >= 5:
            result["complete"] = True
            result["can_advance"] = True
            result["next_phase"] = "RESOLVING"

    # Save updated session
    with open(session_file, "w") as f:
        json.dump(session, f, indent=2)

    return result
```

### Advancing Board Phase

```python
def advance_board_phase(bus_path: str, session_id: str) -> str:
    """Advance to next deliberation phase."""
    board_path = f"{bus_path}/board"
    session_file = f"{board_path}/session-{session_id}.json"
    session = json.load(open(session_file))

    transitions = {
        "ASSESSING": "DISCUSSING",
        "DISCUSSING": "VOTING",
        "VOTING": "RESOLVING",
        "RESOLVING": "COMPLETE"
    }

    current = session["status"]
    next_phase = transitions.get(current, current)

    session["status"] = next_phase
    session["phases"][next_phase.lower().replace("ing", "")]["status"] = "IN_PROGRESS"

    with open(session_file, "w") as f:
        json.dump(session, f, indent=2)

    return next_phase
```

## Orchestrator Board Integration

### Invoking Board from Orchestrator

```python
async def invoke_board_meeting(
    bus_path: str,
    checkpoint: str,
    proposal: str,
    context: dict
) -> dict:
    """
    Full 4-phase board deliberation.
    Called by orchestrator at EVALUATE_PLAN or EVALUATE_EXECUTION checkpoints.
    """

    # 1. Create session
    session_id = create_board_session(bus_path, checkpoint, {
        "proposal": proposal,
        "context": context
    })

    # 2. Phase 1: ASSESS -- Dispatch all directors in parallel
    director_prompts = {
        "CA": f"Evaluate technical aspects: {proposal}",
        "CPO": f"Evaluate product value: {proposal}",
        "CSO": f"Evaluate security posture: {proposal}",
        "COO": f"Evaluate operational feasibility: {proposal}",
        "CXO": f"Evaluate user experience: {proposal}"
    }

    # Dispatch via parallel Task calls (see agent-factory)
    assessments = await dispatch_board_directors(director_prompts, bus_path)

    # Wait for all assessments
    while check_board_phase_complete(bus_path, session_id)["complete"] == False:
        await asyncio.sleep(5)
    advance_board_phase(bus_path, session_id)

    # 3. Phase 2: DISCUSS -- 3 rounds
    for round_num in range(3):
        await run_discussion_round(bus_path, session_id, round_num)

    advance_board_phase(bus_path, session_id)

    # 4. Phase 3: VOTE -- All directors vote
    await dispatch_final_votes(bus_path, session_id)

    while check_board_phase_complete(bus_path, session_id)["complete"] == False:
        await asyncio.sleep(5)
    advance_board_phase(bus_path, session_id)

    # 5. Phase 4: RESOLVE
    resolution = resolve_board_vote(bus_path)

    return {
        "session_id": session_id,
        "verdict": resolution["verdict"],
        "votes": resolution["vote_summary"],
        "conditions": resolution["conditions"]
    }
```

### Quick Board Review (No Discussion)

```python
async def invoke_board_review(bus_path: str, proposal: str) -> dict:
    """
    Quick board review -- Phase 1 only, no discussion.
    Used for execution quality checks or low-stakes decisions.
    """

    session_id = create_board_session(bus_path, "QUICK_REVIEW", {
        "proposal": proposal,
        "quick_mode": True
    })

    # Dispatch all directors
    await dispatch_board_directors(proposal, bus_path)

    # Wait for assessments
    while check_board_phase_complete(bus_path, session_id)["complete"] == False:
        await asyncio.sleep(5)

    # Aggregate assessments directly (skip discussion and vote)
    board_path = f"{bus_path}/board"
    assessments = json.load(open(f"{board_path}/assessments.json"))

    approve_count = sum(1 for a in assessments.values()
                       if a["verdict"] in ["APPROVE", "CONCERNS"])
    reject_count = len(assessments) - approve_count

    return {
        "session_id": session_id,
        "verdict": "APPROVED" if approve_count >= 3 else "REJECTED",
        "assessments": assessments,
        "consensus": approve_count >= 4
    }
```

## Event-Driven Director Polling

Directors can poll for messages addressed to them:

```python
def get_messages_for_director(bus_path: str, director: str) -> list:
    """Get all discussion messages addressed to this director."""
    board_path = f"{bus_path}/board"

    messages = []
    with open(f"{board_path}/discussion.jsonl") as f:
        for line in f:
            if line.strip():
                msg = json.loads(line)
                if msg["to"] == director or msg["to"] == "ALL":
                    messages.append(msg)

    return messages
```

## Board Session Files

```
.message-bus/board/
├── session-board-20260201120000.json  # Active session metadata
├── assessments.json                    # Phase 1: Director assessments
│   {
│     "CA": { "verdict": "APPROVE", "score": 8, "concerns": [...] },
│     "CPO": { "verdict": "CONCERNS", "score": 7, "concerns": [...] },
│     ...
│   }
├── discussion.jsonl                    # Phase 2: Discussion log
│   {"from": "CA", "to": "CPO", "type": "CHALLENGE", "message": "..."}
│   {"from": "CPO", "to": "CA", "type": "CLARIFY", "message": "..."}
├── votes.json                          # Phase 3: Final votes
│   {
│     "CA": { "final_verdict": "APPROVE", "confidence": 0.9 },
│     ...
│   }
└── resolution.md                       # Phase 4: Board decision
```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### scripts/init-bus.py

```python
#!/usr/bin/env python3
"""
Initialize message bus for a track.

Usage:
    python init-bus.py <track_path>
    python init-bus.py conductor/tracks/feature-xyz_20260201

Creates the message bus directory structure for inter-agent coordination.
"""

import json
import os
import sys
from datetime import datetime
from pathlib import Path


def init_message_bus(track_path: str) -> str:
    """Initialize message bus directory structure for a track."""

    bus_path = os.path.join(track_path, ".message-bus")

    # Create directories
    os.makedirs(bus_path, exist_ok=True)
    os.makedirs(os.path.join(bus_path, "events"), exist_ok=True)
    os.makedirs(os.path.join(bus_path, "board"), exist_ok=True)

    # Initialize queue.jsonl (append-only message log)
    queue_file = os.path.join(bus_path, "queue.jsonl")
    if not os.path.exists(queue_file):
        Path(queue_file).touch()
        # Write initialization message
        init_msg = {
            "id": "msg-init",
            "type": "BUS_INIT",
            "source": "init-bus.py",
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "payload": {
                "track_path": track_path,
                "version": "1.0"
            }
        }
        with open(queue_file, "w") as f:
            f.write(json.dumps(init_msg) + "\n")

    # Initialize locks.json
    locks_file = os.path.join(bus_path, "locks.json")
    if not os.path.exists(locks_file):
        with open(locks_file, "w") as f:
            json.dump({}, f, indent=2)

    # Initialize worker-status.json
    status_file = os.path.join(bus_path, "worker-status.json")
    if not os.path.exists(status_file):
        with open(status_file, "w") as f:
            json.dump({}, f, indent=2)

    # Initialize board files
    board_path = os.path.join(bus_path, "board")

    assessments_file = os.path.join(board_path, "assessments.json")
    if not os.path.exists(assessments_file):
        with open(assessments_file, "w") as f:
            json.dump({}, f, indent=2)

    votes_file = os.path.join(board_path, "votes.json")
    if not os.path.exists(votes_file):
        with open(votes_file, "w") as f:
            json.dump({}, f, indent=2)

    discussion_file = os.path.join(board_path, "discussion.jsonl")
    if not os.path.exists(discussion_file):
        Path(discussion_file).touch()

    return bus_path


def main():
    if len(sys.argv) < 2:
        print("Usage: python init-bus.py <track_path>")
        print("Example: python init-bus.py conductor/tracks/feature-xyz_20260201")
        sys.exit(1)

    track_path = sys.argv[1]

    if not os.path.exists(track_path):
        print(f"Error: Track path does not exist: {track_path}")
        sys.exit(1)

    bus_path = init_message_bus(track_path)

    print(f"Message bus initialized at: {bus_path}")
    print("\nCreated structure:")
    print(f"  {bus_path}/")
    print("  ├── queue.jsonl")
    print("  ├── locks.json")
    print("  ├── worker-status.json")
    print("  ├── events/")
    print("  └── board/")
    print("      ├── assessments.json")
    print("      ├── votes.json")
    print("      └── discussion.jsonl")


if __name__ == "__main__":
    main()

```

### scripts/monitor-bus.py

```python
#!/usr/bin/env python3
"""
Monitor message bus for a track.

Usage:
    python monitor-bus.py <track_path> [--watch]
    python monitor-bus.py conductor/tracks/feature-xyz_20260201 --watch

Shows current state and optionally watches for new messages.
"""

import json
import os
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path


def read_jsonl(filepath: str) -> list:
    """Read JSONL file and return list of objects."""
    if not os.path.exists(filepath):
        return []

    messages = []
    with open(filepath, "r") as f:
        for line in f:
            line = line.strip()
            if line:
                messages.append(json.loads(line))
    return messages


def read_json(filepath: str) -> dict:
    """Read JSON file and return dict."""
    if not os.path.exists(filepath):
        return {}

    with open(filepath, "r") as f:
        return json.load(f)


def check_stale_workers(statuses: dict, threshold_minutes: int = 10) -> list:
    """Find workers with no heartbeat for threshold_minutes."""
    stale = []
    now = datetime.utcnow()

    for worker_id, status in statuses.items():
        if status.get("status") == "RUNNING":
            last_heartbeat = status.get("last_heartbeat")
            if last_heartbeat:
                hb_time = datetime.fromisoformat(last_heartbeat.replace("Z", ""))
                if now - hb_time > timedelta(minutes=threshold_minutes):
                    stale.append({
                        "worker_id": worker_id,
                        "task_id": status.get("task_id"),
                        "last_heartbeat": last_heartbeat,
                        "minutes_stale": int((now - hb_time).total_seconds() / 60)
                    })

    return stale


def check_expired_locks(locks: dict) -> list:
    """Find expired file locks."""
    expired = []
    now = datetime.utcnow()

    for filepath, lock in locks.items():
        expires_at = lock.get("expires_at")
        if expires_at:
            exp_time = datetime.fromisoformat(expires_at.replace("Z", ""))
            if exp_time < now:
                expired.append({
                    "filepath": filepath,
                    "worker_id": lock.get("worker_id"),
                    "expired_at": expires_at,
                    "minutes_expired": int((now - exp_time).total_seconds() / 60)
                })

    return expired


def detect_deadlocks(statuses: dict, messages: list) -> list:
    """Detect circular wait patterns."""
    # Build wait-for graph from BLOCKED messages
    wait_for = {}

    for msg in messages:
        if msg.get("type") == "BLOCKED":
            source = msg.get("source")
            waiting_for = msg.get("payload", {}).get("waiting_for")
            if source and waiting_for:
                wait_for[source] = waiting_for

    # Detect cycles
    def find_cycle(start, visited, path):
        if start in path:
            return path[path.index(start):]
        if start in visited:
            return []
        visited.add(start)
        path.append(start)
        if start in wait_for:
            cycle = find_cycle(wait_for[start], visited, path)
            if cycle:
                return cycle
        path.pop()
        return []

    visited = set()
    for worker in wait_for:
        cycle = find_cycle(worker, visited, [])
        if cycle:
            return cycle

    return []


def print_status(bus_path: str):
    """Print current message bus status."""
    queue_file = os.path.join(bus_path, "queue.jsonl")
    locks_file = os.path.join(bus_path, "locks.json")
    status_file = os.path.join(bus_path, "worker-status.json")

    messages = read_jsonl(queue_file)
    locks = read_json(locks_file)
    statuses = read_json(status_file)

    print("\n" + "=" * 60)
    print(f"MESSAGE BUS STATUS - {datetime.utcnow().isoformat()}Z")
    print("=" * 60)

    # Message summary
    print(f"\nMESSAGES: {len(messages)} total")
    msg_types = {}
    for msg in messages:
        t = msg.get("type", "UNKNOWN")
        msg_types[t] = msg_types.get(t, 0) + 1

    for msg_type, count in sorted(msg_types.items()):
        print(f"   {msg_type}: {count}")

    # Worker status
    print(f"\nWORKERS: {len(statuses)} registered")
    for worker_id, status in statuses.items():
        state = status.get("status", "?")
        print(f"   [{state}] {worker_id}")
        print(f"      Task: {status.get('task_id')}")
        print(f"      Status: {status.get('status')} ({status.get('progress_pct', 0)}%)")

    # Stale workers
    stale = check_stale_workers(statuses)
    if stale:
        print(f"\nSTALE WORKERS: {len(stale)}")
        for sw in stale:
            print(f"   {sw['worker_id']} - {sw['minutes_stale']} min since heartbeat")

    # File locks
    active_locks = {k: v for k, v in locks.items()
                   if datetime.fromisoformat(v.get("expires_at", "2000-01-01").replace("Z", "")) > datetime.utcnow()}

    print(f"\nACTIVE LOCKS: {len(active_locks)}")
    for filepath, lock in active_locks.items():
        print(f"   {filepath}")
        print(f"      Held by: {lock.get('worker_id')}")
        print(f"      Expires: {lock.get('expires_at')}")

    # Expired locks
    expired = check_expired_locks(locks)
    if expired:
        print(f"\nEXPIRED LOCKS: {len(expired)}")
        for el in expired:
            print(f"   {el['filepath']} - expired {el['minutes_expired']} min ago")

    # Deadlock detection
    deadlock = detect_deadlocks(statuses, messages)
    if deadlock:
        print(f"\nDEADLOCK DETECTED!")
        print(f"   Cycle: {' -> '.join(deadlock)} -> {deadlock[0]}")

    # Recent messages
    print(f"\nRECENT MESSAGES (last 5):")
    for msg in messages[-5:]:
        print(f"   [{msg.get('timestamp', '?')}] {msg.get('type')} from {msg.get('source')}")

    # Board status
    board_path = os.path.join(bus_path, "board")
    if os.path.exists(board_path):
        assessments = read_json(os.path.join(board_path, "assessments.json"))
        votes = read_json(os.path.join(board_path, "votes.json"))

        if assessments or votes:
            print(f"\nBOARD STATUS:")
            print(f"   Assessments: {len(assessments)}/5 directors")
            print(f"   Votes: {len(votes)}/5 directors")

            if votes:
                approve = sum(1 for v in votes.values() if v.get("final_verdict") == "APPROVE")
                reject = len(votes) - approve
                print(f"   Current tally: {approve} APPROVE / {reject} REJECT")


def watch_bus(bus_path: str, interval: int = 5):
    """Watch message bus for changes."""
    print(f"Watching message bus at: {bus_path}")
    print(f"Refresh interval: {interval} seconds")
    print("Press Ctrl+C to stop\n")

    last_msg_count = 0

    try:
        while True:
            # Clear screen
            os.system('cls' if os.name == 'nt' else 'clear')

            print_status(bus_path)

            # Check for new messages
            queue_file = os.path.join(bus_path, "queue.jsonl")
            messages = read_jsonl(queue_file)

            if len(messages) > last_msg_count:
                new_count = len(messages) - last_msg_count
                print(f"\n{new_count} new message(s) since last check")
                last_msg_count = len(messages)

            time.sleep(interval)
    except KeyboardInterrupt:
        print("\n\nStopped watching.")


def main():
    if len(sys.argv) < 2:
        print("Usage: python monitor-bus.py <track_path> [--watch]")
        print("Example: python monitor-bus.py conductor/tracks/feature-xyz_20260201 --watch")
        sys.exit(1)

    track_path = sys.argv[1]
    watch_mode = "--watch" in sys.argv

    bus_path = os.path.join(track_path, ".message-bus")

    if not os.path.exists(bus_path):
        print(f"Error: Message bus not found at: {bus_path}")
        print("Run init-bus.py first to initialize the message bus.")
        sys.exit(1)

    if watch_mode:
        watch_bus(bus_path)
    else:
        print_status(bus_path)


if __name__ == "__main__":
    main()

```

message-bus | SkillHub