bazinga-db
DEPRECATED - Use domain-specific skills instead. Routes to bazinga-db-core, bazinga-db-workflow, bazinga-db-agents, or bazinga-db-context.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install mehdic-artk-bazinga-db
Repository
Skill path: .claude/skills/bazinga-db
DEPRECATED - Use domain-specific skills instead. Routes to bazinga-db-core, bazinga-db-workflow, bazinga-db-agents, or bazinga-db-context.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: mehdic.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install bazinga-db into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/mehdic/ARTK before adding bazinga-db to shared team environments
- Use bazinga-db for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: bazinga-db
description: DEPRECATED - Use domain-specific skills instead. Routes to bazinga-db-core, bazinga-db-workflow, bazinga-db-agents, or bazinga-db-context.
version: 2.0.0
allowed-tools: [Bash, Read]
---
# BAZINGA-DB (Deprecated Router)
**This skill is deprecated.** Use domain-specific skills instead:
| Domain | Skill | Use For |
|--------|-------|---------|
| Sessions & State | `bazinga-db-core` | Sessions, state snapshots, dashboard |
| Tasks & Plans | `bazinga-db-workflow` | Task groups, development plans, success criteria |
| Agent Tracking | `bazinga-db-agents` | Logs, reasoning, tokens, skill output, events |
| Context & Learning | `bazinga-db-context` | Context packages, error patterns, strategies |
## Command Routing Table
| Command | Target Skill |
|---------|--------------|
| `create-session`, `get-session`, `list-sessions` | `bazinga-db-core` |
| `update-session-status`, `save-state`, `get-state` | `bazinga-db-core` |
| `dashboard-snapshot`, `query`, `integrity-check` | `bazinga-db-core` |
| `recover-db`, `detect-paths` | `bazinga-db-core` |
| `create-task-group`, `update-task-group`, `get-task-groups` | `bazinga-db-workflow` |
| `save-development-plan`, `get-development-plan`, `update-plan-progress` | `bazinga-db-workflow` |
| `save-success-criteria`, `get-success-criteria`, `update-success-criterion` | `bazinga-db-workflow` |
| `log-interaction`, `stream-logs` | `bazinga-db-agents` |
| `save-reasoning`, `get-reasoning`, `reasoning-timeline` | `bazinga-db-agents` |
| `check-mandatory-phases` | `bazinga-db-agents` |
| `log-tokens`, `token-summary` | `bazinga-db-agents` |
| `save-skill-output`, `get-skill-output`, `get-skill-output-all` | `bazinga-db-agents` |
| `check-skill-evidence` | `bazinga-db-agents` |
| `save-event`, `get-events` | `bazinga-db-agents` |
| `save-context-package`, `get-context-packages`, `mark-context-consumed` | `bazinga-db-context` |
| `update-context-references` | `bazinga-db-context` |
| `save-consumption`, `get-consumption` | `bazinga-db-context` |
| `save-error-pattern`, `get-error-patterns` | `bazinga-db-context` |
| `update-error-confidence`, `cleanup-error-patterns` | `bazinga-db-context` |
| `save-strategy`, `get-strategies` | `bazinga-db-context` |
| `update-strategy-helpfulness`, `extract-strategies` | `bazinga-db-context` |
## Quick Reference
⚠️ **DO NOT use CLI directly.** Instead, invoke the domain-specific skill:
| Need | Invoke |
|------|--------|
| Session/state ops | `Skill(command: "bazinga-db-core")` |
| Task groups/plans | `Skill(command: "bazinga-db-workflow")` |
| Logging/reasoning | `Skill(command: "bazinga-db-agents")` |
| Context packages | `Skill(command: "bazinga-db-context")` |
**Reference docs (for skill authors only):**
- Schema: `.claude/skills/bazinga-db/references/schema.md`
- Examples: `.claude/skills/bazinga-db/references/command_examples.md`
## If You're Here By Mistake
1. Identify what you're trying to do
2. Invoke the correct domain skill:
- Session ops? → `Skill(command: "bazinga-db-core")`
- Task groups? → `Skill(command: "bazinga-db-workflow")`
- Logging/reasoning? → `Skill(command: "bazinga-db-agents")`
- Context packages? → `Skill(command: "bazinga-db-context")`
## Migration Notes
The original monolithic bazinga-db skill (v1.x, 887 lines) has been split into 4 domain-focused skills for better maintainability and to stay within file size limits.
**All scripts remain in this directory:**
- `scripts/bazinga_db.py` - Main CLI (unchanged)
- `scripts/init_db.py` - Database initialization
- `scripts/init_session.py` - Session creation helper
**All references remain in this directory:**
- `references/schema.md` - Full database schema
- `references/command_examples.md` - Detailed command examples
## Technical Notes
### Event Idempotency (v17+)
Events support idempotency keys via `idx_logs_idempotency` unique index:
- Index: `(session_id, event_subtype, group_id, idempotency_key)` WHERE `idempotency_key IS NOT NULL AND log_type = 'event'`
- Pattern: INSERT-first, catch IntegrityError, return existing row
- Recommended key format: `{session_id}|{group_id}|{event_type}|{iteration}`
### Group ID Validation (v18+)
Three explicit validators for group_id scope (See: research/domain-skill-migration-phase6-ultrathink.md):
| Validator | Allows 'global' | Use For |
|-----------|-----------------|---------|
| `validate_scope_global_or_group` | ✅ Yes | Events, reasoning, state (session-level) |
| `validate_scope_group_only` | ❌ No | Investigation state, consumption, strategies |
| `validate_task_group_id` | ❌ No (+ reserved) | Task group creation (rejects 'session', 'all', 'default') |
Reserved identifiers (cannot be used as task_group_id): `global`, `session`, `all`, `default`
### Legacy Data Diagnostic
```bash
# Scan for invalid group_ids (CI/CD)
python3 .claude/skills/bazinga-db/scripts/bazinga_db.py diagnose-group-ids
# Auto-fix issues
python3 .claude/skills/bazinga-db/scripts/bazinga_db.py diagnose-group-ids --fix
```
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/bazinga_db.py
```python
#!/usr/bin/env python3
"""
BAZINGA Database Client - Simple command interface for database operations.
Provides high-level commands for agents without requiring SQL knowledge.
Path Resolution:
The script auto-detects the project root and database path. You can override:
- --db PATH Explicit database path
- --project-root DIR Explicit project root (db at DIR/bazinga/bazinga.db)
- BAZINGA_ROOT env Environment variable override
If none provided, auto-detects by walking up from script location or CWD.
"""
import sqlite3
import json
import sys
import time
import re
import random
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple
import argparse
# Secret patterns for redaction (compiled for performance)
# See: research/agent-reasoning-capture-ultrathink.md
# Context-preserving: patterns with capture groups use \1= to keep variable names
# Word boundaries (\b) prevent false positives from partial matches in URLs/identifiers
SECRET_PATTERNS = [
# Generic patterns (preserve variable name context)
(re.compile(r'(?i)\b(api[_-]?key|apikey)\s*[=:]\s*["\']?[a-zA-Z0-9_-]{20,}["\']?'), r'\1=REDACTED'),
(re.compile(r'(?i)\b(secret|password|passwd|pwd)\s*[=:]\s*["\']?[^\s"\']+["\']?'), r'\1=REDACTED'),
(re.compile(r'(?i)\b(token)\s*[=:]\s*["\']?[a-zA-Z0-9_.-]{20,}["\']?'), r'\1=REDACTED'),
# Anthropic - MUST be before generic OpenAI pattern (more specific prefix)
(re.compile(r'\bsk-ant-[a-zA-Z0-9-]{20,}\b'), 'ANTHROPIC_KEY_REDACTED'),
# OpenAI (including sk-proj-* format with hyphens) - word boundary prevents flask-sk-... matches
(re.compile(r'\bsk-[a-zA-Z0-9-]{20,}\b'), 'OPENAI_KEY_REDACTED'),
# GitHub
(re.compile(r'\bghp_[a-zA-Z0-9]{36}\b'), 'GITHUB_TOKEN_REDACTED'),
(re.compile(r'\bgho_[a-zA-Z0-9]{36}\b'), 'GITHUB_OAUTH_REDACTED'),
(re.compile(r'\bgithub_pat_[a-zA-Z0-9_]{22,}\b'), 'GITHUB_PAT_REDACTED'),
# AWS (preserve variable name context)
(re.compile(r'\bAKIA[0-9A-Z]{16}\b'), 'AWS_ACCESS_KEY_REDACTED'),
(re.compile(r'(?i)\b(aws[_-]?secret[_-]?access[_-]?key)\s*[=:]\s*["\']?[a-zA-Z0-9/+=]{40}["\']?'), r'\1=REDACTED'),
# Private keys (match entire block from BEGIN to END)
(re.compile(r'-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----[\s\S]*?-----END (RSA |EC |DSA )?PRIVATE KEY-----'), 'PRIVATE_KEY_REDACTED'),
(re.compile(r'-----BEGIN OPENSSH PRIVATE KEY-----[\s\S]*?-----END OPENSSH PRIVATE KEY-----'), 'SSH_KEY_REDACTED'),
# Slack
(re.compile(r'\bxox[baprs]-[a-zA-Z0-9-]{10,}\b'), 'SLACK_TOKEN_REDACTED'),
# Stripe
(re.compile(r'\bpk_(test|live)_[a-zA-Z0-9]{10,}\b'), 'STRIPE_PK_REDACTED'),
(re.compile(r'\bsk_(test|live)_[a-zA-Z0-9]{10,}\b'), 'STRIPE_SK_REDACTED'),
# Authorization headers (preserve header name)
(re.compile(r'(?i)\b(authorization):\s*bearer\s+[a-zA-Z0-9._-]{10,}'), r'\1: Bearer REDACTED'),
]
def scan_and_redact(text: str) -> Tuple[str, bool]:
"""Scan text for secrets and redact them.
Args:
text: The text to scan and potentially redact
Returns:
Tuple of (redacted_text, was_redacted)
"""
redacted = False
result = text
for pattern, replacement in SECRET_PATTERNS:
# Use subn for single-pass operation (returns replacement count)
result, num_subs = pattern.subn(replacement, result)
if num_subs > 0:
redacted = True
return result, redacted
def validate_complexity(complexity: Any) -> Optional[str]:
"""Validate complexity value. Returns error message if invalid, None if valid.
Args:
complexity: Value to validate (should be int 1-10)
Returns:
Error message string if invalid, None if valid
"""
if complexity is None:
return None # None is valid (optional field)
if not isinstance(complexity, int):
return f"complexity must be an integer, got {type(complexity).__name__}"
if not 1 <= complexity <= 10:
return f"complexity must be between 1 and 10, got {complexity}"
return None
# See: research/domain-skill-migration-phase6-ultrathink.md
# Pattern: alphanumeric with underscores and hyphens, max 64 chars
GROUP_ID_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$')
# Reserved names - checked CASE-INSENSITIVELY per OpenAI review
RESERVED_GROUP_IDS = frozenset({'global', 'session', 'all', 'default'})
# Investigation status whitelist (per CI review suggestion)
VALID_INVESTIGATION_STATUSES = frozenset({
'under_investigation',
'root_cause_found',
'hypothesis_eliminated',
'fix_proposed',
'fix_verified',
'escalated',
'blocked',
'completed'
})
MAX_INVESTIGATION_ITERATION = 20 # Reasonable upper bound
def _validate_group_id_base(group_id: Any) -> Optional[str]:
"""Base validation: type, format, length. Used by all three validators.
See: research/domain-skill-migration-phase6-ultrathink.md
"""
if group_id is None:
return "group_id cannot be None"
if not isinstance(group_id, str):
return f"group_id must be a string, got {type(group_id).__name__}"
if not group_id or not group_id.strip():
return "group_id cannot be empty"
if len(group_id) > 64:
return f"group_id too long (max 64 chars, got {len(group_id)})"
if not GROUP_ID_PATTERN.match(group_id):
return "group_id must be alphanumeric with underscores/hyphens only"
return None
def validate_scope_global_or_group(group_id: Any) -> Optional[str]:
"""Validate scope allowing 'global' (session-level) or any valid group ID.
Use for: save_state (non-investigation), get_latest_state, save_event,
save_context_package, get_context_packages, save_reasoning,
get_reasoning, reasoning_timeline, check_mandatory_phases,
get_consumption
Note: Callers accepting None should coerce to 'global' BEFORE calling.
See: research/domain-skill-migration-phase6-ultrathink.md
"""
return _validate_group_id_base(group_id)
def validate_scope_group_only(group_id: Any) -> Optional[str]:
"""Validate scope that MUST be group-specific (rejects 'global').
Use for: save_investigation_iteration, save_state (investigation type),
save_consumption, extract_strategies
CASE-INSENSITIVE: 'GLOBAL', 'Global', 'global' all rejected.
See: research/domain-skill-migration-phase6-ultrathink.md
"""
error = _validate_group_id_base(group_id)
if error:
return error
if group_id.lower() == 'global':
return "group_id cannot be 'global' for this operation (must be group-specific)"
return None
def validate_task_group_id(task_group_id: Any) -> Optional[str]:
"""Validate task group identifier (rejects reserved names).
Use for: create_task_group, update_task_group, update_context_references
Reserved names (case-insensitive): global, session, all, default
See: research/domain-skill-migration-phase6-ultrathink.md
"""
error = _validate_group_id_base(task_group_id)
if error:
return error
if task_group_id.lower() in RESERVED_GROUP_IDS:
return f"'{task_group_id}' is a reserved identifier and cannot be used as a task group ID"
return None
def validate_investigation_inputs(iteration: Any, status: Any) -> Optional[str]:
"""Validate investigation iteration number and status.
Per CI review: Enforce iteration as positive int (1-20) and whitelist status values.
"""
# Validate iteration
if not isinstance(iteration, int):
return f"iteration must be an integer, got {type(iteration).__name__}"
if iteration < 1:
return f"iteration must be >= 1, got {iteration}"
if iteration > MAX_INVESTIGATION_ITERATION:
return f"iteration too high (max {MAX_INVESTIGATION_ITERATION}, got {iteration})"
# Validate status
if not isinstance(status, str):
return f"status must be a string, got {type(status).__name__}"
if status not in VALID_INVESTIGATION_STATUSES:
valid_list = ', '.join(sorted(VALID_INVESTIGATION_STATUSES))
return f"invalid status '{status}'. Valid: {valid_list}"
return None
# DEPRECATED: Use validate_scope_global_or_group, validate_scope_group_only,
# or validate_task_group_id instead. This function will be removed in Phase 7.
# See: research/domain-skill-migration-phase6-ultrathink.md
_DEPRECATION_WARNED = False # One-time warning flag
def validate_group_id(group_id: Any, allow_global: bool = True) -> Optional[str]:
"""DEPRECATED: Use the new explicit validators instead.
- validate_scope_global_or_group() for session/group scope
- validate_scope_group_only() for group-only operations
- validate_task_group_id() for task group identifiers
"""
global _DEPRECATION_WARNED
if not _DEPRECATION_WARNED:
# Use stderr instead of warnings.warn to avoid noisy output (per CI review)
import sys
print("DEPRECATION: validate_group_id is deprecated. Use validate_scope_global_or_group, "
"validate_scope_group_only, or validate_task_group_id instead.",
file=sys.stderr)
_DEPRECATION_WARNED = True
return _validate_group_id_base(group_id)
# Cross-platform file locking
# fcntl is Unix/Linux/macOS only; msvcrt is Windows only
try:
import fcntl
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
# Windows file locking via msvcrt
try:
import msvcrt
HAS_MSVCRT = True
except ImportError:
HAS_MSVCRT = False
# Deferred warning flag - only warn once when lock is actually needed
_LOCK_WARNING_SHOWN = False
# Add _shared directory to path for bazinga_paths import
# Path: .claude/skills/bazinga-db/scripts/bazinga_db.py
# -> .claude/skills/bazinga-db/scripts/ (parent)
# -> .claude/skills/bazinga-db/ (parent.parent)
# -> .claude/skills/ (parent.parent.parent)
# -> .claude/skills/_shared/ (where bazinga_paths.py lives)
_script_dir = Path(__file__).parent.resolve()
_shared_dir = _script_dir.parent.parent / '_shared'
if _shared_dir.exists() and str(_shared_dir) not in sys.path:
sys.path.insert(0, str(_shared_dir))
try:
from bazinga_paths import get_project_root, get_db_path, get_detection_info
_HAS_BAZINGA_PATHS = True
except ImportError:
_HAS_BAZINGA_PATHS = False
def _ensure_cwd_at_project_root():
"""Change to project root so all relative paths work correctly.
This is critical when the script is invoked from a different CWD.
See: research/absolute-path-resolution-ultrathink.md
Must be called at entry point (main), NOT at module import time,
to avoid side effects when this module is imported by tests.
"""
if not _HAS_BAZINGA_PATHS:
return # Cannot detect project root without bazinga_paths
try:
project_root = get_project_root()
import os
os.chdir(project_root)
# Only log if BAZINGA_VERBOSE is set to reduce noise
if os.environ.get("BAZINGA_VERBOSE"):
print(f"[INFO] project_root={project_root}", file=sys.stderr)
except RuntimeError:
# Project root detection failed - no valid markers found
# Don't chdir to avoid changing to wrong directory
pass
except OSError as e:
print(f"[WARNING] Failed to chdir to project root: {e}", file=sys.stderr)
# Import SCHEMA_VERSION from init_db.py to avoid duplication
try:
from init_db import SCHEMA_VERSION as EXPECTED_SCHEMA_VERSION
except ImportError:
# Fallback if init_db.py is not accessible
print("Warning: Could not import SCHEMA_VERSION from init_db.py, using fallback value 7. "
"Check if init_db.py exists in the same directory.", file=sys.stderr)
EXPECTED_SCHEMA_VERSION = 7
class DatabaseInitError(Exception):
"""Exception raised when database initialization fails.
This allows callers to handle initialization failures gracefully
rather than having the process terminated by sys.exit().
"""
pass
class MigrationLockError(Exception):
"""Exception raised when migration lock cannot be acquired after retries."""
pass
class BazingaDB:
"""Database client for BAZINGA orchestration."""
# SQLite errors that indicate ACTUAL database corruption (file is unrecoverable)
# NOTE: Only true corruption errors that indicate the database file itself is damaged.
# Transient/operational errors (locked, full disk, readonly) should NOT trigger recovery!
CORRUPTION_ERRORS = [
"database disk image is malformed",
"malformed database schema", # Orphan indexes from interrupted table recreations, inconsistent schema catalog
"file is not a database",
# "database or disk is full" - operational, not corruption
# "attempt to write a readonly database" - permission issue, not corruption
]
# Tables to salvage during recovery (ordered for FK dependencies)
# Includes all tables from schema.md - code handles missing tables gracefully
SALVAGE_TABLE_ORDER = [
'sessions', 'orchestration_logs', 'state_snapshots', 'task_groups',
'token_usage', 'skill_outputs', 'development_plans', 'success_criteria',
'context_packages', 'context_package_consumers',
'configuration', 'decisions', 'model_config' # May not exist in all DBs
]
# SQLite errors that indicate BAD QUERIES, NOT corruption
# These happen when agents write inline SQL with wrong column/table names
# They should NEVER trigger database recovery/deletion
QUERY_ERRORS = [
"no such column",
"no such table",
"syntax error",
"near \"", # Syntax errors like 'near "SELECT"'
"unrecognized token",
"no such function",
"ambiguous column name",
"constraint failed", # Constraint violations are not corruption
"unique constraint",
"foreign key constraint",
]
def __init__(self, db_path: str, quiet: bool = False):
self.db_path = db_path
self.quiet = quiet
self._ensure_db_exists()
def _print_success(self, message: str):
"""Print success message unless in quiet mode."""
if not self.quiet:
print(message)
def _print_warning(self, message: str):
"""Print warning message to stderr unless in quiet mode.
Per CI review: Suppress non-critical warnings (like secret redaction)
in quiet mode to keep stdout/stderr clean for JSON-only flows.
"""
if not self.quiet:
print(f"⚠ {message}", file=sys.stderr)
def _print_error(self, message: str):
"""Print error message to stderr (always, even in quiet mode)."""
print(f"! {message}", file=sys.stderr)
def _is_query_error(self, error: Exception) -> bool:
"""Check if an exception indicates a bad query (NOT corruption).
These are errors caused by wrong column/table names, syntax errors, etc.
They should NEVER trigger database recovery/deletion.
"""
error_msg = str(error).lower()
return any(query_err in error_msg for query_err in self.QUERY_ERRORS)
def _is_corruption_error(self, error: Exception) -> bool:
"""Check if an exception indicates database corruption.
IMPORTANT: Query errors (wrong column names, etc.) are NOT corruption.
This prevents data loss when agents write bad SQL.
"""
error_msg = str(error).lower()
# First, check if this is a query error - these are NEVER corruption
if self._is_query_error(error):
return False
return any(corruption in error_msg for corruption in self.CORRUPTION_ERRORS)
def _normalize_specialization_path(self, spec_path: str, project_root: Optional[Path] = None, verify_exists: bool = False) -> Tuple[bool, str]:
"""Normalize and validate specialization path.
Accepts either:
- Short path: "01-languages/python.md" -> auto-prefixed
- Medium path: "specializations/01-languages/python.md" -> normalized
- Full path: "bazinga/templates/specializations/01-languages/python.md"
Args:
spec_path: Path to specialization file (short, medium, or full)
project_root: Project root directory (auto-detected if not provided)
verify_exists: If True, verify the file actually exists (optional)
Returns:
Tuple of (is_valid, normalized_path_or_error_message)
"""
try:
import re
# Validate path contains only safe characters first
if not re.match(r'^[a-zA-Z0-9/_.-]+$', spec_path):
return False, f"Path contains unsafe characters: {spec_path}"
# Block path traversal attempts
if '..' in spec_path:
return False, f"Path traversal not allowed: {spec_path}"
# Auto-detect project root if not provided
if project_root is None:
if _HAS_BAZINGA_PATHS:
project_root = get_project_root()
else:
project_root = Path(self.db_path).parent.parent
# Define the specializations base (canonical path for installed mode)
spec_base = "bazinga/templates/specializations/"
# Normalize: auto-prefix if not already a full path
if spec_path.startswith(spec_base):
# Already canonical full path
normalized_path = spec_path
elif spec_path.startswith("templates/specializations/"):
# Dev mode path: templates/specializations/... -> normalize to bazinga/templates/...
normalized_path = "bazinga/" + spec_path
elif spec_path.startswith("specializations/"):
# Handle "specializations/01-languages/..." -> strip "specializations/" and prefix
normalized_path = spec_base + spec_path[len("specializations/"):]
else:
# Short path like "01-languages/python.md" -> auto-prefix
normalized_path = spec_base + spec_path.lstrip('/')
# Verify the normalized path is within allowed directory
allowed_base = (project_root / "bazinga" / "templates" / "specializations").resolve()
full_path = (project_root / normalized_path).resolve()
try:
full_path.relative_to(allowed_base)
except ValueError:
return False, f"Path escapes allowed directory: {spec_path}"
# Optional file existence check
if verify_exists and not full_path.exists():
return False, f"File not found: {normalized_path}"
return True, normalized_path
except Exception as e:
return False, f"Path validation error: {e}"
def _backup_corrupted_db(self) -> Optional[str]:
"""Backup a corrupted database file before recovery.
Also backs up WAL and SHM sidecar files if present for complete recovery.
"""
db_path = Path(self.db_path)
if not db_path.exists():
return None
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = db_path.with_suffix(f".corrupted_{timestamp}.db")
try:
import shutil
shutil.copy2(self.db_path, backup_path)
self._print_error(f"Corrupted database backed up to: {backup_path}")
# Also backup WAL and SHM files if they exist (for complete recovery)
for ext in ['-wal', '-shm']:
sidecar = Path(str(db_path) + ext)
if sidecar.exists():
sidecar_backup = Path(str(backup_path) + ext)
try:
shutil.copy2(sidecar, sidecar_backup)
self._print_error(f" Also backed up {sidecar.name}")
except Exception:
pass # Non-fatal - main backup succeeded
return str(backup_path)
except Exception as e:
self._print_error(f"Failed to backup corrupted database: {e}")
return None
def _extract_salvageable_data(self) -> Dict[str, Dict[str, Any]]:
"""Try to extract data from a corrupted database before recovery.
Returns:
Dict mapping table names to {'columns': List[str], 'rows': List[tuple]}.
Empty dict if extraction fails.
"""
salvaged: Dict[str, Dict[str, Any]] = {}
try:
# Use a short timeout - if DB is badly corrupted, don't hang
# Open in read-only mode to prevent accidental writes to corrupted DB
uri = f"file:{self.db_path}?mode=ro"
with sqlite3.connect(uri, uri=True, timeout=5.0) as conn:
cursor = conn.cursor()
for table in self.SALVAGE_TABLE_ORDER:
try:
cursor.execute(f"SELECT * FROM \"{table}\"")
rows = cursor.fetchall()
if rows:
# Get column names for this table
cursor.execute(f"PRAGMA table_info(\"{table}\")")
columns = [col[1] for col in cursor.fetchall()]
salvaged[table] = {'columns': columns, 'rows': rows}
self._print_error(f" Salvaged {len(rows)} rows from {table}")
except sqlite3.Error:
# Table doesn't exist or is unreadable - skip
pass
except Exception as e:
self._print_error(f" Could not extract data: {e}")
return {}
return salvaged
def _restore_salvaged_data(self, salvaged: Dict[str, Dict[str, Any]]) -> int:
"""Restore salvaged data to the new database.
Args:
salvaged: Dict from _extract_salvageable_data()
Returns:
Number of rows restored.
"""
if not salvaged:
return 0
total_restored = 0
try:
with sqlite3.connect(self.db_path, timeout=10.0) as conn:
cursor = conn.cursor()
# Disable FK constraints during restore to avoid ordering issues
# Table ordering handles most cases, but this is a safety layer
cursor.execute("PRAGMA foreign_keys = OFF")
for table in self.SALVAGE_TABLE_ORDER:
if table not in salvaged:
continue
data = salvaged[table]
old_columns = data['columns']
rows = data['rows']
if not rows:
continue
# Get current schema columns to handle schema changes
cursor.execute(f"PRAGMA table_info(\"{table}\")")
new_columns = [col[1] for col in cursor.fetchall()]
# Intersect: only restore columns that exist in both old and new schema
valid_columns = [c for c in old_columns if c in new_columns]
if not valid_columns:
self._print_error(f" Skipping {table}: no matching columns")
continue
# Get indices of valid columns in original row data
col_indices = [old_columns.index(c) for c in valid_columns]
# Build INSERT statement once (with quoted identifiers)
cols_str = ', '.join(f'"{c}"' for c in valid_columns)
placeholders = ', '.join(['?' for _ in valid_columns])
insert_sql = f"INSERT OR IGNORE INTO \"{table}\" ({cols_str}) VALUES ({placeholders})"
# Filter row data to only valid columns and use executemany
filtered_rows = [tuple(row[i] for i in col_indices) for row in rows]
try:
cursor.executemany(insert_sql, filtered_rows)
restored_count = cursor.rowcount if cursor.rowcount > 0 else 0
except sqlite3.Error:
# Fall back to row-by-row on error
restored_count = 0
for row in filtered_rows:
try:
cursor.execute(insert_sql, row)
if cursor.rowcount > 0:
restored_count += 1
except sqlite3.Error:
# Skip rows that fail (e.g., constraint violations, duplicates)
# This is intentional - salvage as much data as possible
pass
if restored_count > 0:
self._print_error(f" Restored {restored_count}/{len(rows)} rows to {table}")
total_restored += restored_count
elif len(rows) > 0:
# Warn when salvaged data couldn't be restored (schema mismatch, constraints)
self._print_error(f" Warning: 0/{len(rows)} rows restored to {table}")
# Re-enable FK constraints after restore
cursor.execute("PRAGMA foreign_keys = ON")
conn.commit()
except Exception as e:
self._print_error(f" Error restoring data: {e}")
return total_restored
def _recover_from_corruption(self) -> bool:
"""Attempt to recover from database corruption by reinitializing.
Tries to salvage data from the old database before replacing it.
Returns:
True if recovery succeeded, False otherwise.
"""
self._print_error("Database corruption detected. Attempting recovery...")
# Step 1: Try to salvage data before doing anything destructive
self._print_error("Attempting to salvage data from corrupted database...")
salvaged_data = self._extract_salvageable_data()
# Step 2: Backup corrupted file
self._backup_corrupted_db()
# Step 3: Delete corrupted file (and WAL/SHM sidecars)
db_path = Path(self.db_path)
try:
if db_path.exists():
db_path.unlink()
# Also remove WAL and SHM sidecar files to ensure clean reinit
for ext in ['-wal', '-shm']:
sidecar = Path(str(db_path) + ext)
if sidecar.exists():
sidecar.unlink()
except Exception as e:
self._print_error(f"Failed to remove corrupted database: {e}")
return False
# Step 4: Reinitialize with fresh schema
try:
script_dir = Path(__file__).parent
init_script = script_dir / "init_db.py"
import subprocess
result = subprocess.run(
[sys.executable, str(init_script), self.db_path],
capture_output=True,
text=True
)
if result.returncode != 0:
self._print_error(f"Failed to reinitialize database: {result.stderr}")
return False
except Exception as e:
self._print_error(f"Recovery failed: {e}")
return False
# Step 5: Restore salvaged data
if salvaged_data:
self._print_error("Restoring salvaged data to new database...")
restored = self._restore_salvaged_data(salvaged_data)
if restored > 0:
self._print_error(f"✓ Database recovered with {restored} rows restored")
else:
self._print_error(f"✓ Database recovered (no data could be restored)")
else:
self._print_error(f"✓ Database recovered and reinitialized (fresh start)")
return True
def check_integrity(self) -> Dict[str, Any]:
"""Run SQLite integrity check on the database.
Returns:
Dict with 'ok' bool and 'details' string.
"""
try:
conn = sqlite3.connect(self.db_path, timeout=10.0)
cursor = conn.execute("PRAGMA integrity_check;")
result = cursor.fetchone()[0]
conn.close()
if result == "ok":
return {"ok": True, "details": "Database integrity check passed"}
else:
return {"ok": False, "details": f"Integrity issues found: {result}"}
except Exception as e:
return {"ok": False, "details": f"Integrity check failed: {e}"}
def _ensure_db_exists(self):
"""Ensure database exists and has schema, create if not."""
db_path = Path(self.db_path)
needs_init = False
is_corrupted = False
if not db_path.exists():
needs_init = True
print(f"Database not found at {self.db_path}. Auto-initializing...", file=sys.stderr)
elif db_path.stat().st_size == 0:
needs_init = True
print(f"Database file is empty at {self.db_path}. Auto-initializing...", file=sys.stderr)
else:
# File exists and has content - check if it has tables and is not corrupted
# Retry loop for transient lock errors during startup
for attempt in range(4): # 0, 1, 2, 3 = max 4 attempts
try:
with sqlite3.connect(self.db_path, timeout=10.0) as conn:
cursor = conn.cursor()
# First check integrity
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
is_corrupted = True
needs_init = True
print(f"Database corrupted at {self.db_path}: {integrity}. Auto-recovering...", file=sys.stderr)
else:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sessions'")
if not cursor.fetchone():
needs_init = True
print(f"Database missing schema at {self.db_path}. Auto-initializing...", file=sys.stderr)
else:
# Check schema version - run migrations if outdated
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='schema_version'")
if cursor.fetchone():
cursor.execute("SELECT version FROM schema_version ORDER BY version DESC LIMIT 1")
version_row = cursor.fetchone()
current_version = version_row[0] if version_row else 0
# Check against expected version from init_db.py
if current_version < EXPECTED_SCHEMA_VERSION:
needs_init = True
print(f"Database schema outdated (v{current_version} < v{EXPECTED_SCHEMA_VERSION}). Running migrations...", file=sys.stderr)
else:
# schema_version table missing - treat as outdated and run migrations
needs_init = True
print(f"Database missing schema_version table at {self.db_path}. Running migrations...", file=sys.stderr)
break # Success - exit retry loop
except sqlite3.OperationalError as e:
# Handle transient lock errors with retry/backoff
error_msg = str(e).lower()
if any(lock_err in error_msg for lock_err in ["database is locked", "database is busy", "schema is locked"]):
if attempt < 3:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f"Database locked during init, retrying in {wait_time}s (attempt {attempt + 1}/4)...", file=sys.stderr)
time.sleep(wait_time)
continue
else:
print(f"Database locked after 4 attempts at {self.db_path}: {e}", file=sys.stderr)
raise
raise # Non-lock operational error
except sqlite3.DatabaseError as e:
# Check if this is a query error (wrong column/table names) vs real corruption
if self._is_query_error(e):
# Query errors should NOT trigger recovery - just propagate the error
print(f"Query error at {self.db_path}: {e}. This is a SQL syntax/schema error (e.g., incorrect table/column name), NOT database corruption. Fix the query.", file=sys.stderr)
raise # Let caller handle it - don't destroy the database
elif self._is_corruption_error(e):
is_corrupted = True
needs_init = True
print(f"Database corrupted at {self.db_path}: {e}. Auto-recovering...", file=sys.stderr)
else:
# Unknown database error - log but don't assume corruption
print(f"Database error at {self.db_path}: {e}. May need investigation.", file=sys.stderr)
raise # Let caller handle it
break # Exit retry loop on corruption (will reinit)
except Exception as e:
needs_init = True
print(f"Database check failed at {self.db_path}: {e}. Auto-initializing...", file=sys.stderr)
break # Exit retry loop
if not needs_init:
return
# CRITICAL: Use file lock to prevent concurrent schema migrations
# Multiple parallel agents checking schema simultaneously can all trigger
# migration, corrupting the database with orphan indexes
lock_file_path = Path(str(db_path) + '.migrate.lock')
lock_file = None
lock_acquired = False
try:
# Create lock file and acquire exclusive lock with retry/backoff
if HAS_FCNTL:
# Unix/Linux/macOS: Use fcntl.flock with bounded retry
lock_file = open(lock_file_path, 'a')
max_retries = 4
for attempt in range(max_retries):
try:
# Use LOCK_EX | LOCK_NB for non-blocking to enable retry
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
lock_acquired = True
print(f"Acquired migration lock", file=sys.stderr)
break
except (OSError, IOError) as lock_err:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f"Migration lock busy, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})...", file=sys.stderr)
time.sleep(wait_time)
else:
# Final attempt failed - abort migration
if lock_file:
lock_file.close()
raise MigrationLockError(
f"Failed to acquire migration lock after {max_retries} attempts: {lock_err}. "
"Another process may be migrating the database."
)
elif HAS_MSVCRT:
# Windows: Use msvcrt.locking with bounded retry
# msvcrt.locking locks from current file position, so we must:
# 1. Ensure file has at least 1 byte (write sentinel if empty)
# 2. Seek to position 0 before locking
# 3. Lock the same position we'll unlock (position 0, 1 byte)
try:
# Try to open existing file
lock_file = open(lock_file_path, 'r+b')
except FileNotFoundError:
# Create new file
lock_file = open(lock_file_path, 'w+b')
# Ensure file has at least 1 byte for valid lock region
lock_file.seek(0, 2) # Seek to end
if lock_file.tell() == 0:
lock_file.write(b'\x00') # Write sentinel byte
lock_file.flush()
max_retries = 4
for attempt in range(max_retries):
try:
# Always seek to 0 before locking to ensure consistent position
lock_file.seek(0)
# LK_NBLCK = non-blocking exclusive lock
msvcrt.locking(lock_file.fileno(), msvcrt.LK_NBLCK, 1)
lock_acquired = True
print(f"Acquired migration lock (Windows)", file=sys.stderr)
break
except (OSError, IOError) as lock_err:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f"Migration lock busy, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})...", file=sys.stderr)
time.sleep(wait_time)
else:
# Final attempt failed - abort migration
if lock_file:
lock_file.close()
raise MigrationLockError(
f"Failed to acquire migration lock after {max_retries} attempts: {lock_err}. "
"Another process may be migrating the database."
)
else:
# No locking available - warn once and abort migration to prevent corruption
global _LOCK_WARNING_SHOWN
if not _LOCK_WARNING_SHOWN:
print("Warning: No file locking mechanism available (fcntl/msvcrt not found).", file=sys.stderr)
_LOCK_WARNING_SHOWN = True
raise MigrationLockError(
"Cannot safely migrate database: no file locking mechanism available. "
"This prevents concurrent migration corruption. "
"Please ensure fcntl (Unix) or msvcrt (Windows) is available."
)
# Re-check schema version while holding lock - another process may have migrated
if db_path.exists() and db_path.stat().st_size > 0:
try:
with sqlite3.connect(self.db_path, timeout=10.0) as conn:
cursor = conn.cursor()
# If corruption was detected earlier, verify it still exists
if is_corrupted:
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
# Corruption confirmed - continue to recovery below
print(f"Corruption confirmed under lock: {integrity}", file=sys.stderr)
else:
# Corruption was fixed by another process
is_corrupted = False
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='schema_version'")
if cursor.fetchone():
cursor.execute("SELECT version FROM schema_version ORDER BY version DESC LIMIT 1")
version_row = cursor.fetchone()
current_version = version_row[0] if version_row else 0
if current_version >= EXPECTED_SCHEMA_VERSION and not is_corrupted:
print(f"Schema already up-to-date (migrated by another process)", file=sys.stderr)
return # Another process already migrated
except (sqlite3.Error, OSError) as e:
# Log the specific error for debugging, but don't abort migration
print(f"Warning: Schema re-check failed ({type(e).__name__}): {e}", file=sys.stderr)
# Continue with migration if re-check fails
# If corrupted, use _recover_from_corruption() to properly salvage data
# This follows research/sqlite-orphan-index-corruption-ultrathink.md mandate
# to extract salvageable data before re-initialization
if is_corrupted and db_path.exists():
if not self._recover_from_corruption():
raise DatabaseInitError(
f"Failed to recover corrupted database at {self.db_path}. "
"Manual intervention may be required."
)
print(f"✓ Database recovered at {self.db_path}", file=sys.stderr)
return # Recovery includes re-initialization, no need to continue
# Auto-initialize the database (non-corrupted case: new or schema upgrade)
script_dir = Path(__file__).parent
init_script = script_dir / "init_db.py"
import subprocess
result = subprocess.run(
[sys.executable, str(init_script), self.db_path],
capture_output=True,
text=True
)
if result.returncode != 0:
raise DatabaseInitError(
f"Failed to initialize database at {self.db_path}: {result.stderr}"
)
print(f"✓ Database auto-initialized at {self.db_path}", file=sys.stderr)
finally:
# Release lock and clean up (handles both Unix fcntl and Windows msvcrt)
if lock_file:
try:
if lock_acquired:
if HAS_FCNTL:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
elif HAS_MSVCRT:
# msvcrt.locking requires unlocking same byte range
lock_file.seek(0)
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
lock_file.close()
# DO NOT delete lock file - leaves it for future processes
# Deleting creates race condition: waiting process may hold FD to deleted inode
# while new process creates new file and locks it simultaneously
except Exception:
# Ignore errors during lock release - not critical if cleanup fails
pass
def _get_connection(self, retry_on_corruption: bool = True, _lock_retry: int = 0) -> sqlite3.Connection:
"""Get database connection with proper settings.
Args:
retry_on_corruption: If True, attempt recovery on corruption errors.
_lock_retry: Internal counter for lock retry attempts (max 3 retries with backoff).
Returns:
sqlite3.Connection with WAL mode and foreign keys enabled.
"""
try:
conn = sqlite3.connect(self.db_path, timeout=30.0)
# Enable WAL mode for better concurrency (reduces "database is locked" errors)
conn.execute("PRAGMA journal_mode=WAL")
# Enable foreign key constraints
conn.execute("PRAGMA foreign_keys = ON")
# Increase busy timeout to handle concurrent access
conn.execute("PRAGMA busy_timeout = 30000")
conn.row_factory = sqlite3.Row
return conn
except sqlite3.OperationalError as e:
# Handle transient "database is locked" errors with retry/backoff
if "database is locked" in str(e).lower() and _lock_retry < 3:
wait_time = 2 ** _lock_retry # Exponential backoff: 1s, 2s, 4s
self._print_error(f"Database locked, retrying in {wait_time}s (attempt {_lock_retry + 1}/3)...")
time.sleep(wait_time)
return self._get_connection(retry_on_corruption, _lock_retry + 1)
raise
except sqlite3.DatabaseError as e:
if retry_on_corruption and self._is_corruption_error(e):
if self._recover_from_corruption():
# Retry connection after recovery
return self._get_connection(retry_on_corruption=False)
raise
# ==================== SESSION OPERATIONS ====================
def create_session(self, session_id: str, mode: str, requirements: str,
initial_branch: Optional[str] = None,
metadata: Optional[str] = None) -> Dict[str, Any]:
"""Create a new session with validation.
Args:
session_id: Unique session identifier
mode: 'simple' or 'parallel'
requirements: Original user requirements
initial_branch: Git branch name (defaults to 'main')
metadata: JSON string containing original_scope and other extensible data
"""
# Validate inputs
if not session_id or not session_id.strip():
raise ValueError("session_id cannot be empty")
if mode not in ['simple', 'parallel']:
raise ValueError(f"Invalid mode: {mode}. Must be 'simple' or 'parallel'")
if not requirements or not requirements.strip():
raise ValueError("requirements cannot be empty")
# Default initial_branch to 'main' if not provided
if initial_branch is None:
initial_branch = 'main'
conn = self._get_connection()
try:
cursor = conn.execute("""
INSERT INTO sessions (session_id, mode, original_requirements, status, initial_branch, metadata)
VALUES (?, ?, ?, 'active', ?, ?)
""", (session_id, mode, requirements, initial_branch, metadata))
conn.commit()
# Verify the insert by reading it back
verify = conn.execute("""
SELECT session_id, mode, status, start_time, created_at, initial_branch, metadata
FROM sessions WHERE session_id = ?
""", (session_id,)).fetchone()
if not verify:
raise RuntimeError(f"Failed to verify session creation for {session_id}")
result = {
'success': True,
'session_id': verify['session_id'],
'mode': verify['mode'],
'status': verify['status'],
'start_time': verify['start_time'],
'created_at': verify['created_at'],
'initial_branch': verify['initial_branch'],
'metadata': verify['metadata']
}
self._print_success(f"✓ Session created: {session_id}")
return result
except sqlite3.IntegrityError as e:
error_msg = str(e).lower()
if "unique constraint" in error_msg or "primary key" in error_msg:
# Session already exists - return existing session info
existing = conn.execute("""
SELECT session_id, mode, status, start_time, created_at
FROM sessions WHERE session_id = ?
""", (session_id,)).fetchone()
if existing:
self._print_success(f"✓ Session already exists: {session_id}")
return dict(existing)
else:
raise RuntimeError(f"Session reported as existing but not found: {session_id}")
else:
# Other integrity error (e.g., foreign key, check constraint)
raise RuntimeError(f"Database constraint violation: {e}")
finally:
conn.close()
def update_session_status(self, session_id: str, status: str) -> None:
"""Update session status."""
conn = self._get_connection()
end_time = datetime.now().isoformat() if status in ['completed', 'failed'] else None
conn.execute("""
UPDATE sessions
SET status = ?, end_time = ?
WHERE session_id = ?
""", (status, end_time, session_id))
conn.commit()
conn.close()
self._print_success(f"✓ Session {session_id} status updated to: {status}")
def get_session(self, session_id: str) -> Optional[Dict]:
"""Get session details."""
conn = self._get_connection()
row = conn.execute("""
SELECT * FROM sessions WHERE session_id = ?
""", (session_id,)).fetchone()
conn.close()
return dict(row) if row else None
def list_sessions(self, limit: int = 10) -> List[Dict]:
"""List recent sessions ordered by created_at (most recent first)."""
conn = self._get_connection()
rows = conn.execute("""
SELECT * FROM sessions ORDER BY created_at DESC LIMIT ?
""", (limit,)).fetchall()
conn.close()
return [dict(row) for row in rows]
# ==================== LOG OPERATIONS ====================
def log_interaction(self, session_id: str, agent_type: str, content: str,
iteration: Optional[int] = None, agent_id: Optional[str] = None,
_retry_count: int = 0) -> Dict[str, Any]:
"""Log an agent interaction with validation.
Args:
_retry_count: Internal parameter to prevent infinite recursion. Do not set manually.
"""
# Prevent infinite recursion on repeated failures
if _retry_count > 1:
self._print_error(f"Max retries exceeded for log_interaction")
return {"success": False, "error": "Max retries exceeded after recovery attempt"}
# Validate inputs
if not session_id or not session_id.strip():
raise ValueError("session_id cannot be empty")
if not agent_type or not agent_type.strip():
raise ValueError("agent_type cannot be empty")
if not content or not content.strip():
raise ValueError("content cannot be empty")
# Note: No agent_type validation against a hardcoded list.
# Per schema v2 migration, BAZINGA is designed to be extensible.
# New agent types can be added without code changes.
# Database enforces NOT NULL, which is sufficient.
conn = None
try:
conn = self._get_connection()
cursor = conn.execute("""
INSERT INTO orchestration_logs (session_id, iteration, agent_type, agent_id, content)
VALUES (?, ?, ?, ?, ?)
""", (session_id, iteration, agent_type, agent_id, content))
log_id = cursor.lastrowid
conn.commit()
# Verify the insert by reading it back
verify = conn.execute("""
SELECT id, session_id, agent_type, LENGTH(content) as content_length, timestamp
FROM orchestration_logs WHERE id = ?
""", (log_id,)).fetchone()
if not verify:
raise RuntimeError(f"Failed to verify log insertion for log_id={log_id}")
result = {
'success': True,
'log_id': log_id,
'session_id': verify['session_id'],
'agent_type': verify['agent_type'],
'content_length': verify['content_length'],
'timestamp': verify['timestamp'],
'iteration': iteration,
'agent_id': agent_id
}
self._print_success(f"✓ Logged {agent_type} interaction (log_id={log_id}, {result['content_length']} chars)")
return result
except sqlite3.DatabaseError as e:
if conn:
try:
conn.rollback()
except Exception:
pass # Best-effort cleanup, ignore rollback failures
# Check if it's a corruption error
if self._is_corruption_error(e):
if self._recover_from_corruption():
# Retry once after recovery (with incremented counter to prevent infinite loop)
self._print_error(f"Retrying log operation after recovery...")
return self.log_interaction(session_id, agent_type, content, iteration, agent_id,
_retry_count=_retry_count + 1)
self._print_error(f"Failed to log {agent_type} interaction: {str(e)}")
return {"success": False, "error": f"Database error: {str(e)}"}
except Exception as e:
if conn:
try:
conn.rollback()
except Exception:
pass # Best-effort cleanup, ignore rollback failures
self._print_error(f"Failed to log {agent_type} interaction: {str(e)}")
return {"success": False, "error": str(e)}
finally:
if conn:
conn.close()
def get_logs(self, session_id: str, limit: int = 50, offset: int = 0,
agent_type: Optional[str] = None, since: Optional[str] = None) -> List[Dict]:
"""Get orchestration logs with optional filtering."""
conn = self._get_connection()
query = "SELECT * FROM orchestration_logs WHERE session_id = ?"
params = [session_id]
if agent_type:
query += " AND agent_type = ?"
params.append(agent_type)
if since:
query += " AND timestamp >= ?"
params.append(since)
query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(query, params).fetchall()
conn.close()
return [dict(row) for row in rows]
def stream_logs(self, session_id: str, limit: int = 50, offset: int = 0) -> str:
"""Stream logs in markdown format (for dashboard)."""
logs = self.get_logs(session_id, limit, offset)
if not logs:
return "No logs found."
output = []
for log in reversed(logs): # Show oldest first
timestamp = log['timestamp']
agent_type = log['agent_type'].upper()
iteration = log['iteration'] if log['iteration'] else '?'
content = log['content']
output.append(f"## [{timestamp}] Iteration {iteration} - {agent_type}")
output.append("")
output.append(content)
output.append("")
output.append("---")
output.append("")
return "\n".join(output)
# ==================== EVENT OPERATIONS (v9) ====================
def save_event(self, session_id: str, event_subtype: str, payload: str,
idempotency_key: Optional[str] = None,
group_id: str = 'global',
_retry_count: int = 0) -> Dict[str, Any]:
"""Save an event to orchestration_logs with log_type='event'.
SECURITY: Payload is scanned for secrets and redacted before storage.
Used for: pm_bazinga, scope_change, validator_verdict, tl_issues, etc.
Args:
session_id: The session ID
event_subtype: Type of event (pm_bazinga, scope_change, validator_verdict, etc.)
payload: JSON string payload
idempotency_key: If provided, prevents duplicate events with same key.
Recommended format: {session_id}|{group_id}|{event_type}|{iteration}
group_id: Group isolation key (default 'global'). Used in idempotency check
to ensure events are unique per group.
_retry_count: Internal retry counter
Returns:
Dict with success status and event_id
"""
if _retry_count > 1:
self._print_error(f"Max retries exceeded for save_event")
return {"success": False, "error": "Max retries exceeded"}
# Validate inputs
if not session_id or not session_id.strip():
raise ValueError("session_id cannot be empty")
if not event_subtype or not event_subtype.strip():
raise ValueError("event_subtype cannot be empty")
# Phase 6: Use explicit validator for scope (events can be session-level)
error = validate_scope_global_or_group(group_id)
if error:
raise ValueError(error)
# Scan and redact secrets from payload (security parity with save_reasoning)
redacted_payload, was_redacted = scan_and_redact(payload)
if was_redacted:
self._print_warning("Secrets detected and redacted from event payload")
conn = None
try:
conn = self._get_connection()
# Race-safe INSERT-first pattern (See: research/domain-skill-migration-phase6-ultrathink.md)
# Instead of SELECT-then-INSERT (race condition), we INSERT first and catch IntegrityError
# The unique index idx_logs_idempotency enforces uniqueness on (session_id, event_subtype, group_id, idempotency_key)
try:
cursor = conn.execute("""
INSERT INTO orchestration_logs
(session_id, group_id, agent_type, content, log_type, event_subtype, event_payload, redacted, idempotency_key)
VALUES (?, ?, 'system', ?, 'event', ?, ?, ?, ?)
""", (session_id, group_id, f"Event: {event_subtype}", event_subtype, redacted_payload,
1 if was_redacted else 0, idempotency_key))
event_id = cursor.lastrowid
conn.commit()
result = {
'success': True,
'event_id': event_id,
'session_id': session_id,
'event_subtype': event_subtype,
'redacted': was_redacted
}
self._print_success(f"✓ Saved {event_subtype} event (id={event_id})")
return result
except sqlite3.IntegrityError as e:
# Unique constraint violation - event already exists (idempotent case)
# This is the expected path when concurrent saves happen
conn.rollback()
if idempotency_key:
existing = conn.execute("""
SELECT id FROM orchestration_logs
WHERE session_id = ? AND log_type = 'event'
AND event_subtype = ? AND group_id = ? AND idempotency_key = ?
""", (session_id, event_subtype, group_id, idempotency_key)).fetchone()
if existing:
self._print_success(f"✓ Event already exists (idempotent, id={existing[0]})")
return {
'success': True,
'event_id': existing[0],
'session_id': session_id,
'event_subtype': event_subtype,
'idempotent': True,
'message': 'Event already exists with same idempotency key'
}
# Idempotency key provided but existing record not found - unusual
raise
else:
# Per CI review: Provide helpful error when concurrent insert fails
# without idempotency key
return {
'success': False,
'error': 'Concurrent insert conflict. Use --idempotency-key for safe retries.',
'hint': 'Recommended format: {session_id}|{group_id}|{event_type}|{iteration}',
'original_error': str(e)
}
except sqlite3.DatabaseError as e:
if conn:
try:
conn.rollback()
except Exception:
pass
if self._is_corruption_error(e):
if self._recover_from_corruption():
return self.save_event(session_id, event_subtype, payload,
idempotency_key=idempotency_key,
group_id=group_id,
_retry_count=_retry_count + 1)
self._print_error(f"Failed to save {event_subtype} event: {str(e)}")
return {"success": False, "error": f"Database error: {str(e)}"}
except Exception as e:
if conn:
try:
conn.rollback()
except Exception:
pass
self._print_error(f"Failed to save {event_subtype} event: {str(e)}")
return {"success": False, "error": str(e)}
finally:
if conn:
conn.close()
def get_events(self, session_id: str, event_subtype: Optional[str] = None,
limit: int = 50) -> List[Dict]:
"""Get events from orchestration_logs where log_type='event'.
Args:
session_id: The session ID
event_subtype: Optional filter by event type (pm_bazinga, scope_change, etc.)
limit: Maximum number of events to return
Returns:
List of event dictionaries with id, event_subtype, event_payload, timestamp
"""
conn = self._get_connection()
query = """
SELECT id, session_id, timestamp, event_subtype, event_payload
FROM orchestration_logs
WHERE session_id = ? AND log_type = 'event'
"""
params = [session_id]
if event_subtype:
query += " AND event_subtype = ?"
params.append(event_subtype)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
rows = conn.execute(query, params).fetchall()
conn.close()
return [dict(row) for row in rows]
def save_investigation_iteration(self, session_id: str, group_id: str,
iteration: int, status: str,
state_data: str, event_payload: str) -> Dict[str, Any]:
"""Atomically save investigation state AND event together.
This ensures consistency between state (for resumption) and events (for audit).
Uses single transaction - both succeed or both fail.
Args:
session_id: The session ID
group_id: Task group identifier
iteration: Investigation iteration number
status: Investigation status (under_investigation, root_cause_found, etc.)
state_data: JSON string for state snapshot
event_payload: JSON string for event payload
Returns:
Dict with success status and operation details
Raises:
ValueError: If group_id, iteration, or status is invalid
"""
# Phase 6: Investigation MUST be group-specific (rejects 'global')
error = validate_scope_group_only(group_id)
if error:
raise ValueError(f"Invalid group_id: {error}")
# Validate iteration and status (per CI review suggestion)
error = validate_investigation_inputs(iteration, status)
if error:
raise ValueError(f"Invalid investigation input: {error}")
# Generate idempotency key
idempotency_key = f"{session_id}|{group_id}|investigation_iteration|{iteration}"
# Scan and redact secrets from payloads
redacted_state, state_redacted = scan_and_redact(state_data)
redacted_event, event_redacted = scan_and_redact(event_payload)
if state_redacted or event_redacted:
self._print_warning("Secrets detected and redacted from investigation data")
conn = None
try:
conn = self._get_connection()
# Start transaction with IMMEDIATE mode to prevent deadlocks
conn.execute("BEGIN IMMEDIATE")
# 1. Save/update state snapshot using UPSERT (not INSERT OR REPLACE)
# Uses ON CONFLICT for proper update semantics without delete-insert cascade
conn.execute("""
INSERT INTO state_snapshots
(session_id, group_id, state_type, state_data, timestamp)
VALUES (?, ?, 'investigation', ?, datetime('now'))
ON CONFLICT(session_id, state_type, group_id)
DO UPDATE SET
state_data = excluded.state_data,
timestamp = excluded.timestamp
""", (session_id, group_id, redacted_state))
# 2. Save event with race-safe idempotency handling
# Use INSERT with IntegrityError handling to avoid SELECT-then-INSERT race
event_saved = False
event_id = None
try:
cursor = conn.execute("""
INSERT INTO orchestration_logs
(session_id, agent_type, content, log_type, event_subtype,
event_payload, redacted, group_id, idempotency_key)
VALUES (?, 'investigator', ?, 'event', 'investigation_iteration',
?, ?, ?, ?)
""", (session_id, f"Investigation iteration {iteration}: {status}",
redacted_event, 1 if event_redacted else 0, group_id, idempotency_key))
event_id = cursor.lastrowid
event_saved = True
except sqlite3.IntegrityError:
# Race condition: another process inserted first - this is idempotent success
# Query with full index columns for optimal lookup
existing = conn.execute("""
SELECT id FROM orchestration_logs
WHERE session_id = ? AND log_type = 'event'
AND event_subtype = 'investigation_iteration'
AND group_id = ? AND idempotency_key = ?
""", (session_id, group_id, idempotency_key)).fetchone()
if existing:
event_id = existing[0]
else:
# Integrity error for other reason - re-raise
raise
conn.commit()
result = {
'success': True,
'atomic': True,
'state_saved': True,
'event_saved': event_saved,
'event_id': event_id,
'iteration': iteration,
'idempotent': not event_saved,
'redacted': state_redacted or event_redacted
}
self._print_success(f"✓ Saved investigation iteration {iteration} atomically")
return result
except Exception as e:
if conn:
try:
conn.rollback()
except Exception:
pass
self._print_error(f"Failed to save investigation iteration atomically: {str(e)}")
return {'success': False, 'error': str(e), 'atomic': False}
finally:
if conn:
conn.close()
# ==================== STATE OPERATIONS ====================
def save_state(self, session_id: str, state_type: str, state_data: Dict,
group_id: str = 'global') -> Dict[str, Any]:
"""Save state snapshot with group isolation (UPSERT).
Args:
session_id: The session ID
state_type: Type of state ('pm', 'orchestrator', 'group_status', 'investigation')
state_data: Dict to be JSON-encoded and stored
group_id: Group isolation key (default 'global' for session-level state,
use task group ID for group-specific state like 'investigation')
Returns:
Dict with success status and details (Fix G: consistent API)
Raises:
ValueError: If group_id is invalid
Note: Uses UPSERT (INSERT ... ON CONFLICT ... DO UPDATE) to handle
repeated saves for the same (session_id, state_type, group_id).
"""
# Phase 6: Conditional validation based on state_type
# Investigation state MUST be group-specific; other states can be session-level
if state_type == 'investigation':
error = validate_scope_group_only(group_id)
else:
error = validate_scope_global_or_group(group_id)
if error:
raise ValueError(error)
conn = self._get_connection()
conn.execute("""
INSERT INTO state_snapshots (session_id, group_id, state_type, state_data, timestamp)
VALUES (?, ?, ?, ?, datetime('now'))
ON CONFLICT(session_id, state_type, group_id)
DO UPDATE SET
state_data = excluded.state_data,
timestamp = excluded.timestamp
""", (session_id, group_id, state_type, json.dumps(state_data)))
conn.commit()
conn.close()
self._print_success(f"✓ Saved {state_type} state (group={group_id})")
return {
'success': True,
'session_id': session_id,
'state_type': state_type,
'group_id': group_id
}
def get_latest_state(self, session_id: str, state_type: str,
group_id: str = 'global') -> Optional[Dict]:
"""Get latest state snapshot for specific group.
Args:
session_id: The session ID
state_type: Type of state ('pm', 'orchestrator', 'group_status', 'investigation')
group_id: Group isolation key (default 'global' for session-level state)
Returns:
Dict of state data, or None if not found
Raises:
ValueError: If group_id is invalid
"""
# Phase 6: Reads can be session or group level
error = validate_scope_global_or_group(group_id)
if error:
raise ValueError(error)
conn = self._get_connection()
# Fix E: No ORDER BY needed - UNIQUE(session_id, state_type, group_id) ensures at most one row
row = conn.execute("""
SELECT state_data FROM state_snapshots
WHERE session_id = ? AND state_type = ? AND group_id = ?
""", (session_id, state_type, group_id)).fetchone()
conn.close()
return json.loads(row['state_data']) if row else None
# ==================== TASK GROUP OPERATIONS ====================
def create_task_group(self, group_id: str, session_id: str, name: str,
status: str = 'pending', assigned_to: Optional[str] = None,
specializations: Optional[List[str]] = None,
item_count: Optional[int] = None,
component_path: Optional[str] = None,
initial_tier: Optional[str] = None,
complexity: Optional[int] = None) -> Dict[str, Any]:
"""Create or update a task group (upsert - idempotent operation).
Uses INSERT ... ON CONFLICT to handle duplicates gracefully. If the group
already exists, only name/status/assigned_to/specializations/item_count/component_path/initial_tier/complexity
are updated - preserving revision_count, last_review_status, and created_at.
Args:
specializations: List of specialization file paths for this group
item_count: Number of discrete tasks/items in this group (for progress tracking)
component_path: Monorepo component path (e.g., 'frontend/', 'backend/') for version lookup
initial_tier: Starting agent tier ('Developer' or 'Senior Software Engineer')
complexity: Task complexity score (1-10). 1-3=Low, 4-6=Medium, 7-10=High
Returns:
Dict with 'success' bool and 'task_group' data, or 'error' on failure.
"""
# Phase 6: Validate task group ID (rejects reserved names)
error = validate_task_group_id(group_id)
if error:
return {"success": False, "error": error}
conn = None
try:
# Defensive type validation for specializations
if specializations is not None:
if not isinstance(specializations, list):
return {
"success": False,
"error": f"specializations must be a list, got {type(specializations).__name__}"
}
if not all(isinstance(s, str) for s in specializations):
return {
"success": False,
"error": "specializations must contain only strings"
}
# Normalize and validate paths (auto-prefix short paths)
normalized_specs = []
for spec_path in specializations:
is_valid, result = self._normalize_specialization_path(spec_path)
if not is_valid:
return {
"success": False,
"error": f"Invalid specialization path: {result}"
}
normalized_specs.append(result)
specializations = normalized_specs
# Validate item_count if provided
if item_count is not None and (not isinstance(item_count, int) or item_count < 1):
return {
"success": False,
"error": "item_count must be a positive integer"
}
# Validate initial_tier if provided
valid_tiers = ('Developer', 'Senior Software Engineer')
if initial_tier is not None and initial_tier not in valid_tiers:
return {
"success": False,
"error": f"initial_tier must be one of {valid_tiers}, got '{initial_tier}'"
}
# Validate complexity if provided (must be 1-10)
complexity_error = validate_complexity(complexity)
if complexity_error:
return {"success": False, "error": complexity_error}
conn = self._get_connection()
# Serialize specializations to JSON (preserve [] vs None distinction)
specs_json = json.dumps(specializations) if specializations is not None else None
# Use ON CONFLICT for true upsert - preserves existing metadata
# COALESCE for status: INSERT uses 'pending' default, UPDATE preserves existing if None passed
conn.execute("""
INSERT INTO task_groups (id, session_id, name, status, assigned_to, specializations, item_count, component_path, initial_tier, complexity)
VALUES (?, ?, ?, COALESCE(?, 'pending'), ?, ?, COALESCE(?, 1), ?, COALESCE(?, 'Developer'), ?)
ON CONFLICT(id, session_id) DO UPDATE SET
name = excluded.name,
status = COALESCE(excluded.status, task_groups.status),
assigned_to = COALESCE(excluded.assigned_to, task_groups.assigned_to),
specializations = COALESCE(excluded.specializations, task_groups.specializations),
item_count = COALESCE(excluded.item_count, task_groups.item_count),
component_path = COALESCE(excluded.component_path, task_groups.component_path),
initial_tier = COALESCE(excluded.initial_tier, task_groups.initial_tier),
complexity = COALESCE(excluded.complexity, task_groups.complexity),
updated_at = CURRENT_TIMESTAMP
""", (group_id, session_id, name, status, assigned_to, specs_json, item_count, component_path, initial_tier, complexity))
conn.commit()
# Fetch and return the saved record
row = conn.execute("""
SELECT * FROM task_groups WHERE id = ? AND session_id = ?
""", (group_id, session_id)).fetchone()
result = dict(row) if row else None
self._print_success(f"✓ Task group saved: {group_id} (session: {session_id[:20]}...)")
return {"success": True, "task_group": result}
except Exception as e:
print(f"! Failed to save task group {group_id}: {e}", file=sys.stderr)
return {"success": False, "error": str(e)}
finally:
if conn:
conn.close()
def update_task_group(self, group_id: str, session_id: str, status: Optional[str] = None,
assigned_to: Optional[str] = None, revision_count: Optional[int] = None,
last_review_status: Optional[str] = None,
auto_create: bool = True, name: Optional[str] = None,
specializations: Optional[List[str]] = None,
item_count: Optional[int] = None,
security_sensitive: Optional[int] = None,
qa_attempts: Optional[int] = None,
tl_review_attempts: Optional[int] = None,
component_path: Optional[str] = None,
initial_tier: Optional[str] = None,
complexity: Optional[int] = None,
review_iteration: Optional[int] = None,
no_progress_count: Optional[int] = None,
blocking_issues_count: Optional[int] = None) -> Dict[str, Any]:
"""Update task group fields (requires session_id for composite key).
Args:
group_id: Task group identifier
session_id: Session identifier
status: New status value
assigned_to: Agent assignment
revision_count: Number of revisions
last_review_status: APPROVED or CHANGES_REQUESTED
auto_create: If True and group doesn't exist, create it (default: True)
name: Name for auto-creation (defaults to group_id if not provided)
specializations: List of specialization file paths for this group
item_count: Number of discrete tasks/items in this group
security_sensitive: Whether this group has security-sensitive code (0 or 1)
qa_attempts: Number of QA test attempts
tl_review_attempts: Number of Tech Lead review attempts
component_path: Monorepo component path (e.g., 'frontend/', 'backend/') for version lookup
initial_tier: Starting agent tier ('Developer' or 'Senior Software Engineer')
complexity: Task complexity score (1-10). 1-3=Low, 4-6=Medium, 7-10=High
review_iteration: Current iteration in feedback loop (starts at 1)
no_progress_count: Consecutive iterations with 0 fixes (escalate at 2)
blocking_issues_count: Current count of unresolved CRITICAL/HIGH issues
Returns:
Dict with 'success' bool and 'task_group' data, or 'error' on failure.
"""
# Phase 6: Validate task group ID (rejects reserved names)
error = validate_task_group_id(group_id)
if error:
return {"success": False, "error": error}
conn = None
try:
# Defensive type validation for specializations
if specializations is not None:
if not isinstance(specializations, list):
return {
"success": False,
"error": f"specializations must be a list, got {type(specializations).__name__}"
}
if not all(isinstance(s, str) for s in specializations):
return {
"success": False,
"error": "specializations must contain only strings"
}
# Normalize and validate paths (auto-prefix short paths)
normalized_specs = []
for spec_path in specializations:
is_valid, result = self._normalize_specialization_path(spec_path)
if not is_valid:
return {
"success": False,
"error": f"Invalid specialization path: {result}"
}
normalized_specs.append(result)
specializations = normalized_specs
conn = self._get_connection()
updates = []
params = []
if status:
updates.append("status = ?")
params.append(status)
if assigned_to:
updates.append("assigned_to = ?")
params.append(assigned_to)
if revision_count is not None:
updates.append("revision_count = ?")
params.append(revision_count)
if last_review_status:
updates.append("last_review_status = ?")
params.append(last_review_status)
if name:
updates.append("name = ?")
params.append(name)
if specializations is not None:
updates.append("specializations = ?")
params.append(json.dumps(specializations))
if item_count is not None:
updates.append("item_count = ?")
params.append(item_count)
if security_sensitive is not None:
updates.append("security_sensitive = ?")
params.append(security_sensitive)
if qa_attempts is not None:
updates.append("qa_attempts = ?")
params.append(qa_attempts)
if tl_review_attempts is not None:
updates.append("tl_review_attempts = ?")
params.append(tl_review_attempts)
if component_path is not None:
updates.append("component_path = ?")
params.append(component_path)
if initial_tier is not None:
valid_tiers = ('Developer', 'Senior Software Engineer')
if initial_tier not in valid_tiers:
return {
"success": False,
"error": f"initial_tier must be one of {valid_tiers}, got '{initial_tier}'"
}
updates.append("initial_tier = ?")
params.append(initial_tier)
if complexity is not None:
complexity_error = validate_complexity(complexity)
if complexity_error:
return {"success": False, "error": complexity_error}
updates.append("complexity = ?")
params.append(complexity)
if review_iteration is not None:
updates.append("review_iteration = ?")
params.append(review_iteration)
if no_progress_count is not None:
updates.append("no_progress_count = ?")
params.append(no_progress_count)
if blocking_issues_count is not None:
updates.append("blocking_issues_count = ?")
params.append(blocking_issues_count)
# Server-side validation and clamping for counters (defense in depth)
# Clamp negative values to 0 rather than rejecting - handles race conditions gracefully
if no_progress_count is not None and no_progress_count < 0:
no_progress_count = 0
# Find and update the param value
for i, (u, p) in enumerate(zip(updates, params)):
if "no_progress_count" in u:
params[i] = 0
break
if blocking_issues_count is not None and blocking_issues_count < 0:
blocking_issues_count = 0
# Find and update the param value
for i, (u, p) in enumerate(zip(updates, params)):
if "blocking_issues_count" in u:
params[i] = 0
break
if review_iteration is not None and review_iteration < 1:
return {"success": False, "error": f"review_iteration must be >= 1: {review_iteration}"}
# Monotonicity enforcement for review_iteration only (use SQL MAX for idempotency)
# Note: no_progress_count is NOT monotonic - it resets to 0 on progress
# Use field-name search instead of string equality for robustness
if review_iteration is not None:
# Use SQL-level MAX() to enforce monotonicity atomically
# This avoids race conditions from read-check-update pattern
for i, clause in enumerate(updates):
if "review_iteration" in clause and "MAX(" not in clause:
updates[i] = "review_iteration = MAX(COALESCE(review_iteration, 0), ?)"
break
if updates:
updates.append("updated_at = CURRENT_TIMESTAMP")
query = f"UPDATE task_groups SET {', '.join(updates)} WHERE id = ? AND session_id = ?"
params.extend([group_id, session_id])
cursor = conn.execute(query, params)
conn.commit()
# Check if UPDATE modified any rows
if cursor.rowcount == 0:
if auto_create:
# Auto-create the task group if it doesn't exist
# Close connection before delegating to create_task_group
conn.close()
conn = None # Prevent double-close in finally block
group_name = name or f"Task Group {group_id}"
self._print_success(f"Task group {group_id} not found, auto-creating...")
return self.create_task_group(
group_id, session_id, group_name,
status=status or 'pending',
assigned_to=assigned_to
)
else:
print(f"! Task group not found: {group_id} in session {session_id}", file=sys.stderr)
return {"success": False, "error": f"Task group not found: {group_id}"}
else:
self._print_success(f"✓ Task group updated: {group_id} (session: {session_id[:20]}...)")
# Fetch and return the updated record
row = conn.execute("""
SELECT * FROM task_groups WHERE id = ? AND session_id = ?
""", (group_id, session_id)).fetchone()
return {"success": True, "task_group": dict(row) if row else None}
except Exception as e:
print(f"! Failed to update task group {group_id}: {e}", file=sys.stderr)
return {"success": False, "error": str(e)}
finally:
if conn:
conn.close()
def get_task_groups(self, session_id: str, status: Optional[str] = None) -> List[Dict]:
"""Get task groups for a session."""
conn = self._get_connection()
if status:
rows = conn.execute("""
SELECT * FROM task_groups WHERE session_id = ? AND status = ?
ORDER BY created_at
""", (session_id, status)).fetchall()
else:
rows = conn.execute("""
SELECT * FROM task_groups WHERE session_id = ?
ORDER BY created_at
""", (session_id,)).fetchall()
conn.close()
return [dict(row) for row in rows]
# ==================== TOKEN USAGE OPERATIONS ====================
def log_tokens(self, session_id: str, agent_type: str, tokens: int,
agent_id: Optional[str] = None) -> None:
"""Log token usage."""
conn = self._get_connection()
conn.execute("""
INSERT INTO token_usage (session_id, agent_type, agent_id, tokens_estimated)
VALUES (?, ?, ?, ?)
""", (session_id, agent_type, agent_id, tokens))
conn.commit()
conn.close()
def get_token_summary(self, session_id: str, by: str = 'agent_type') -> Dict:
"""Get token usage summary grouped by agent_type or agent_id."""
conn = self._get_connection()
if by == 'agent_type':
rows = conn.execute("""
SELECT agent_type, SUM(tokens_estimated) as total
FROM token_usage
WHERE session_id = ?
GROUP BY agent_type
""", (session_id,)).fetchall()
else:
rows = conn.execute("""
SELECT agent_id, SUM(tokens_estimated) as total
FROM token_usage
WHERE session_id = ?
GROUP BY agent_id
""", (session_id,)).fetchall()
conn.close()
result = {row[0]: row[1] for row in rows}
result['total'] = sum(result.values())
return result
# ==================== SKILL OUTPUT OPERATIONS ====================
def save_skill_output(self, session_id: str, skill_name: str, output_data: Dict,
agent_type: Optional[str] = None, group_id: Optional[str] = None) -> int:
"""Save skill output with auto-computed iteration.
Iteration is computed atomically using INSERT...SELECT to prevent race conditions.
Uses UNIQUE constraint with retry on IntegrityError for concurrent safety.
Returns the iteration number assigned.
"""
conn = self._get_connection()
max_retries = 3
for attempt in range(max_retries):
try:
# Use BEGIN IMMEDIATE to acquire exclusive lock upfront
conn.execute("BEGIN IMMEDIATE")
# Atomic INSERT with computed iteration using INSERT...SELECT
# Build WHERE clause properly for NULL handling
if agent_type is None and group_id is None:
where_clause = "agent_type IS NULL AND group_id IS NULL"
params = (session_id, skill_name, json.dumps(output_data), session_id, skill_name)
elif agent_type is None:
where_clause = "agent_type IS NULL AND group_id = ?"
params = (session_id, skill_name, json.dumps(output_data), group_id, session_id, skill_name, group_id)
elif group_id is None:
where_clause = "agent_type = ? AND group_id IS NULL"
params = (session_id, skill_name, json.dumps(output_data), agent_type, session_id, skill_name, agent_type)
else:
where_clause = "agent_type = ? AND group_id = ?"
params = (session_id, skill_name, json.dumps(output_data), agent_type, group_id, session_id, skill_name, agent_type, group_id)
cursor = conn.execute(f"""
INSERT INTO skill_outputs (session_id, skill_name, output_data, agent_type, group_id, iteration)
SELECT ?, ?, ?, {'NULL' if agent_type is None else '?'}, {'NULL' if group_id is None else '?'},
COALESCE((SELECT MAX(iteration) FROM skill_outputs
WHERE session_id = ? AND skill_name = ? AND {where_clause}), 0) + 1
""", params)
# Get the iteration that was assigned
next_iteration = conn.execute(f"""
SELECT iteration FROM skill_outputs
WHERE rowid = ?
""", (cursor.lastrowid,)).fetchone()['iteration']
conn.commit()
conn.close()
break # Success, exit retry loop
except sqlite3.IntegrityError as e:
conn.rollback()
if attempt < max_retries - 1:
# Retry on UNIQUE constraint violation (concurrent insert)
import time
time.sleep(0.01 * (attempt + 1)) # Brief backoff
continue
else:
conn.close()
raise
except Exception as e:
conn.rollback()
conn.close()
raise
if agent_type:
self._print_success(f"✓ Saved {skill_name} output for {agent_type} (iteration {next_iteration})")
else:
self._print_success(f"✓ Saved {skill_name} output (iteration {next_iteration})")
return next_iteration
def get_skill_output(self, session_id: str, skill_name: str,
agent_type: Optional[str] = None) -> Optional[Dict]:
"""Get latest skill output (backward compatible).
If agent_type is provided, returns latest for that agent.
Otherwise returns latest across all agents.
"""
conn = self._get_connection()
if agent_type:
row = conn.execute("""
SELECT output_data FROM skill_outputs
WHERE session_id = ? AND skill_name = ? AND agent_type = ?
ORDER BY timestamp DESC LIMIT 1
""", (session_id, skill_name, agent_type)).fetchone()
else:
row = conn.execute("""
SELECT output_data FROM skill_outputs
WHERE session_id = ? AND skill_name = ?
ORDER BY timestamp DESC LIMIT 1
""", (session_id, skill_name)).fetchone()
conn.close()
return json.loads(row['output_data']) if row else None
def get_skill_output_all(self, session_id: str, skill_name: str,
agent_type: Optional[str] = None) -> List[Dict]:
"""Get all skill outputs for a skill (supports multi-invocation).
Returns array of objects with iteration, agent_type, group_id, timestamp, and output_data.
Ordered by timestamp DESC (most recent first) for consistent access patterns.
"""
conn = self._get_connection()
if agent_type:
# Filter by agent type, order by timestamp DESC for consistent "latest first"
rows = conn.execute("""
SELECT iteration, agent_type, group_id, timestamp, output_data
FROM skill_outputs
WHERE session_id = ? AND skill_name = ? AND agent_type = ?
ORDER BY timestamp DESC
""", (session_id, skill_name, agent_type)).fetchall()
else:
# All outputs for skill, order by timestamp DESC
rows = conn.execute("""
SELECT iteration, agent_type, group_id, timestamp, output_data
FROM skill_outputs
WHERE session_id = ? AND skill_name = ?
ORDER BY timestamp DESC
""", (session_id, skill_name)).fetchall()
conn.close()
return [{
'iteration': row['iteration'],
'agent_type': row['agent_type'],
'group_id': row['group_id'],
'timestamp': row['timestamp'],
'output_data': json.loads(row['output_data'])
} for row in rows]
def check_skill_evidence(self, session_id: str, mandatory_skills: List[str],
agent_type: Optional[str] = None,
since_minutes: int = 30) -> Dict:
"""Check if mandatory skills have recent evidence in skill_outputs.
Uses the existing skill_outputs table as evidence of skill invocation.
This avoids needing a new table or modifying individual skills.
Args:
session_id: Session to check
mandatory_skills: List of skill names that must have evidence
agent_type: Optional filter by agent type
since_minutes: Look for evidence within this time window (default: 30)
Returns:
Dict with: complete (bool), missing (list), found (list),
mandatory (list), recency_window_minutes (int)
Note: Uses SQL datetime() for proper timestamp comparison.
See: research/skills-configuration-enforcement-plan.md
"""
if not mandatory_skills:
return {
"complete": True,
"missing": [],
"found": [],
"mandatory": [],
"recency_window_minutes": since_minutes
}
conn = self._get_connection()
try:
# Build query with proper SQL datetime filtering
# Using datetime('now', '-N minutes') for correct timestamp comparison
time_modifier = f'-{since_minutes} minutes'
if agent_type:
# Filter by session_id AND agent_type
rows = conn.execute("""
SELECT DISTINCT skill_name FROM skill_outputs
WHERE session_id = ?
AND agent_type = ?
AND timestamp >= datetime('now', ?)
""", (session_id, agent_type, time_modifier)).fetchall()
else:
# Filter by session_id only
rows = conn.execute("""
SELECT DISTINCT skill_name FROM skill_outputs
WHERE session_id = ?
AND timestamp >= datetime('now', ?)
""", (session_id, time_modifier)).fetchall()
recent_outputs = {row['skill_name'] for row in rows}
# Calculate found and missing
found = [s for s in mandatory_skills if s in recent_outputs]
missing = [s for s in mandatory_skills if s not in recent_outputs]
return {
"complete": len(missing) == 0,
"missing": missing,
"found": found,
"mandatory": mandatory_skills,
"recency_window_minutes": since_minutes
}
finally:
conn.close()
# ==================== CONFIGURATION OPERATIONS ====================
# REMOVED: Configuration table no longer exists (2025-11-21)
# See research/empty-tables-analysis.md for details
#
# def set_config(self, key: str, value: Any) -> None:
# """Set configuration value."""
# ...
#
# def get_config(self, key: str) -> Optional[Any]:
# """Get configuration value."""
# ...
# ==================== DASHBOARD DATA ====================
def get_dashboard_snapshot(self, session_id: str) -> Dict:
"""Get complete dashboard data snapshot."""
return {
'session': self.get_session(session_id),
'orchestrator_state': self.get_latest_state(session_id, 'orchestrator'),
'pm_state': self.get_latest_state(session_id, 'pm'),
'task_groups': self.get_task_groups(session_id),
'token_summary': self.get_token_summary(session_id),
'recent_logs': self.get_logs(session_id, limit=10)
}
# ==================
... (truncated)
```
### scripts/init_db.py
```python
#!/usr/bin/env python3
"""
Initialize BAZINGA database schema.
Creates all necessary tables for managing orchestration state.
Path Resolution:
If no database path is provided, auto-detects project root and uses:
PROJECT_ROOT/bazinga/bazinga.db
"""
import sqlite3
import sys
import time
from pathlib import Path
import tempfile
import shutil
import subprocess
# Add _shared directory to path for bazinga_paths import
_script_dir = Path(__file__).parent.resolve()
_shared_dir = _script_dir.parent.parent / '_shared'
if _shared_dir.exists() and str(_shared_dir) not in sys.path:
sys.path.insert(0, str(_shared_dir))
try:
from bazinga_paths import get_db_path
_HAS_BAZINGA_PATHS = True
except ImportError:
_HAS_BAZINGA_PATHS = False
# Current schema version
SCHEMA_VERSION = 18
def get_schema_version(cursor) -> int:
"""Get current schema version from database."""
try:
cursor.execute("SELECT version FROM schema_version ORDER BY version DESC LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
except sqlite3.OperationalError:
# Table doesn't exist, this is version 0 (pre-versioning)
return 0
def migrate_v1_to_v2(conn, cursor) -> None:
"""Migrate from v1 (CHECK constraint) to v2 (no constraint)."""
print("🔄 Migrating schema from v1 to v2...")
# Export existing data
cursor.execute("SELECT * FROM orchestration_logs")
logs_data = cursor.fetchall()
print(f" - Backing up {len(logs_data)} orchestration log entries")
# Drop old table
cursor.execute("DROP TABLE IF EXISTS orchestration_logs")
# Recreate with new schema (no CHECK constraint)
cursor.execute("""
CREATE TABLE orchestration_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
iteration INTEGER,
agent_type TEXT NOT NULL,
agent_id TEXT,
content TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
# Recreate indexes
cursor.execute("""
CREATE INDEX idx_logs_session
ON orchestration_logs(session_id, timestamp DESC)
""")
cursor.execute("""
CREATE INDEX idx_logs_agent_type
ON orchestration_logs(session_id, agent_type)
""")
# Restore data
if logs_data:
cursor.executemany("""
INSERT INTO orchestration_logs
(id, session_id, timestamp, iteration, agent_type, agent_id, content)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", logs_data)
print(f" - Restored {len(logs_data)} orchestration log entries")
print("✓ Migration to v2 complete")
def init_database(db_path: str) -> None:
"""Initialize the BAZINGA database with complete schema."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Enable foreign keys
cursor.execute("PRAGMA foreign_keys = ON")
# Enable WAL mode for better concurrency
cursor.execute("PRAGMA journal_mode = WAL")
# Create schema_version table first (if doesn't exist)
cursor.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT
)
""")
# Get current schema version
current_version = get_schema_version(cursor)
print(f"Current schema version: {current_version}")
# Run migrations if needed
if current_version < SCHEMA_VERSION:
print(f"Schema upgrade required: v{current_version} -> v{SCHEMA_VERSION}")
if current_version == 0 or current_version == 1:
# Check if orchestration_logs exists with old schema
cursor.execute("""
SELECT sql FROM sqlite_master
WHERE type='table' AND name='orchestration_logs'
""")
result = cursor.fetchone()
if result and 'CHECK' in result[0]:
# Has old CHECK constraint, migrate
migrate_v1_to_v2(conn, cursor)
current_version = 2 # Advance version to enable subsequent migrations
# Handle v2→v3 migration (add development_plans table)
if current_version == 2:
print("🔄 Migrating schema from v2 to v3...")
# No data migration needed - just add new table
# Table will be created below with CREATE TABLE IF NOT EXISTS
print("✓ Migration to v3 complete (development_plans table added)")
current_version = 3 # Advance version to enable subsequent migrations
# Handle v3→v4 migration (add success_criteria table)
if current_version == 3:
print("🔄 Migrating schema from v3 to v4...")
# No data migration needed - just add new table
# Table will be created below with CREATE TABLE IF NOT EXISTS
print("✓ Migration to v4 complete (success_criteria table added)")
current_version = 4 # Advance version to enable subsequent migrations
# Handle v4→v5 migration (merge-on-approval architecture)
if current_version == 4:
print("🔄 Migrating schema from v4 to v5...")
# Check if sessions table exists (for fresh databases, skip all ALTER and let CREATE TABLE handle it)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sessions'")
sessions_exists = cursor.fetchone() is not None
if not sessions_exists:
print(" ⊘ Base tables don't exist yet - will be created with full schema below")
print("✓ Migration to v5 complete (fresh database, skipped)")
current_version = 5 # Skip to next migration, CREATE TABLE will handle it
else:
# 1. Add initial_branch to sessions
try:
cursor.execute("ALTER TABLE sessions ADD COLUMN initial_branch TEXT DEFAULT 'main'")
print(" ✓ Added sessions.initial_branch")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ sessions.initial_branch already exists")
else:
raise
# 2. Add feature_branch to task_groups
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN feature_branch TEXT")
print(" ✓ Added task_groups.feature_branch")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.feature_branch already exists")
else:
raise
# 3. Add merge_status to task_groups (without CHECK - SQLite limitation)
# NOTE: ALTER TABLE cannot add CHECK constraints in SQLite
# The CHECK constraint is applied in step 4 when we recreate the table
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN merge_status TEXT")
print(" ✓ Added task_groups.merge_status (CHECK constraint applied in step 4)")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.merge_status already exists")
else:
raise
# 4. Recreate task_groups with expanded status enum AND proper CHECK constraints
# This step applies CHECK constraints that couldn't be added via ALTER TABLE
cursor.execute("SELECT sql FROM sqlite_master WHERE name='task_groups'")
schema = cursor.fetchone()[0]
if 'approved_pending_merge' not in schema:
print(" Recreating task_groups with expanded status enum...")
# CRITICAL: Table recreation must be atomic to prevent orphan indexes
# Use explicit transaction with exclusive locking
# See research/sqlite-orphan-index-corruption-ultrathink.md for full root cause analysis
# Close any implicit transaction before starting explicit one
conn.commit()
try:
cursor.execute("BEGIN IMMEDIATE")
# Create new table with expanded status enum
cursor.execute("""
CREATE TABLE task_groups_new (
id TEXT NOT NULL,
session_id TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT CHECK(status IN (
'pending', 'in_progress', 'completed', 'failed',
'approved_pending_merge', 'merging'
)) DEFAULT 'pending',
assigned_to TEXT,
revision_count INTEGER DEFAULT 0,
last_review_status TEXT CHECK(last_review_status IN ('APPROVED', 'CHANGES_REQUESTED') OR last_review_status IS NULL),
feature_branch TEXT,
merge_status TEXT CHECK(merge_status IN ('pending', 'in_progress', 'merged', 'conflict', 'test_failure') OR merge_status IS NULL),
complexity INTEGER CHECK(complexity BETWEEN 1 AND 10),
initial_tier TEXT CHECK(initial_tier IN ('Developer', 'Senior Software Engineer')) DEFAULT 'Developer',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, session_id),
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
# Get existing columns in task_groups
cursor.execute("PRAGMA table_info(task_groups)")
existing_cols = [row[1] for row in cursor.fetchall()]
# Build column list for migration (only columns that exist)
all_cols = ['id', 'session_id', 'name', 'status', 'assigned_to', 'revision_count',
'last_review_status', 'feature_branch', 'merge_status', 'complexity',
'initial_tier', 'created_at', 'updated_at']
cols_to_copy = [c for c in all_cols if c in existing_cols]
cols_str = ', '.join(cols_to_copy)
# Copy data
cursor.execute(f"""
INSERT INTO task_groups_new ({cols_str})
SELECT {cols_str} FROM task_groups
""")
# Swap tables atomically
cursor.execute("DROP TABLE task_groups")
cursor.execute("ALTER TABLE task_groups_new RENAME TO task_groups")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_taskgroups_session ON task_groups(session_id, status)")
# Verify integrity before committing
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration task_groups v4→v5: Database integrity check failed after table recreation: {integrity}")
# Use connection methods for commit/rollback (clearer than SQL strings)
conn.commit()
# Force WAL checkpoint to ensure clean state
# Returns (busy, log_frames, checkpointed_frames)
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result:
busy, log_frames, checkpointed = checkpoint_result
if busy:
# Checkpoint couldn't fully complete - retry with backoff
for retry in range(3):
time.sleep(0.5 * (retry + 1))
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result and not checkpoint_result[0]:
print(f" ✓ WAL checkpoint succeeded after retry {retry + 1}")
break
else:
print(f" ⚠️ WAL checkpoint incomplete: busy={busy}, log={log_frames}, checkpointed={checkpointed}")
elif log_frames != checkpointed:
print(f" ⚠️ WAL checkpoint partial: {checkpointed}/{log_frames} frames checkpointed")
# Post-commit integrity verification (validates final on-disk state)
# This is ADDITIONAL to pre-commit check - both are needed:
# - Pre-commit: Enables atomic rollback if corrupt
# - Post-commit: Validates finalized disk state after WAL flush
post_integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if post_integrity != "ok":
print(f" ⚠️ Post-commit integrity check failed: {post_integrity}")
print(f" ⚠️ Database may be corrupted. Consider: rm {db_path}*")
# Refresh query planner statistics after major schema change
cursor.execute("ANALYZE task_groups;")
print(" ✓ Recreated task_groups with expanded status enum")
except Exception as e:
try:
conn.rollback()
except Exception as rollback_exc:
print(f" ! ROLLBACK failed: {rollback_exc}")
print(f" ✗ v4→v5 migration failed during task_groups recreation, rolled back: {e}")
raise
else:
print(" ⊘ task_groups status enum already expanded")
print("✓ Migration to v5 complete (merge-on-approval architecture)")
current_version = 5 # Advance version to enable subsequent migrations
# Handle v5→v6 migration (context packages for inter-agent communication)
if current_version == 5:
print("🔄 Migrating schema from v5 to v6...")
# Check if task_groups table exists (for fresh databases, skip ALTER)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='task_groups'")
task_groups_exists = cursor.fetchone() is not None
if not task_groups_exists:
print(" ⊘ Base tables don't exist yet - will be created with full schema below")
print("✓ Migration to v6 complete (fresh database, skipped)")
current_version = 6
else:
# 1. Add context_references to task_groups
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN context_references TEXT")
print(" ✓ Added task_groups.context_references")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.context_references already exists")
else:
raise
# 2. Create context_packages table (will be created below with IF NOT EXISTS)
# 3. Create context_package_consumers table (will be created below with IF NOT EXISTS)
print("✓ Migration to v6 complete (context packages for inter-agent communication)")
current_version = 6
# Migration from v6 to v7: Add specializations to task_groups
if current_version == 6:
print("🔄 Migrating schema from v6 to v7...")
# Check if task_groups table exists (for fresh databases, skip ALTER)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='task_groups'")
task_groups_exists = cursor.fetchone() is not None
if not task_groups_exists:
print(" ⊘ Base tables don't exist yet - will be created with full schema below")
print("✓ Migration to v7 complete (fresh database, skipped)")
current_version = 7
else:
# Add specializations column to task_groups
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN specializations TEXT")
print(" ✓ Added task_groups.specializations")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.specializations already exists")
else:
raise
# CRITICAL: Force WAL checkpoint after schema change
# Without this, subsequent writes can corrupt the schema catalog
# causing "orphan index" errors on sqlite_autoindex_task_groups_1
conn.commit()
# Returns (busy, log_frames, checkpointed_frames)
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result:
busy, log_frames, checkpointed = checkpoint_result
if busy:
# Checkpoint couldn't fully complete - retry with backoff
for retry in range(3):
time.sleep(0.5 * (retry + 1))
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result and not checkpoint_result[0]:
print(f" ✓ WAL checkpoint succeeded after retry {retry + 1}")
break
else:
print(f" ⚠️ WAL checkpoint incomplete: busy={busy}, log={log_frames}, checkpointed={checkpointed}")
elif log_frames != checkpointed:
print(f" ⚠️ WAL checkpoint partial: {checkpointed}/{log_frames} frames checkpointed")
# Post-commit integrity verification (validates final on-disk state)
post_integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if post_integrity != "ok":
print(f" ⚠️ Post-commit integrity check failed: {post_integrity}")
print(f" ⚠️ Database may be corrupted. Consider deleting and reinitializing.")
current_version = 7 # Advance version
# Refresh query planner statistics after schema change
cursor.execute("ANALYZE task_groups;")
print(" ✓ WAL checkpoint completed")
print("✓ Migration to v7 complete (specializations for tech stack loading)")
# Migration from v7 to v8: Add reasoning capture columns to orchestration_logs
if current_version == 7:
print("🔄 Migrating schema from v7 to v8...")
# Check if orchestration_logs table exists (for fresh databases, skip ALTER)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='orchestration_logs'")
logs_exists = cursor.fetchone() is not None
if not logs_exists:
print(" ⊘ Base tables don't exist yet - will be created with full schema below")
print("✓ Migration to v8 complete (fresh database, skipped)")
current_version = 8
else:
# Add log_type column (defaults to 'interaction' for existing rows)
try:
cursor.execute("""
ALTER TABLE orchestration_logs
ADD COLUMN log_type TEXT DEFAULT 'interaction'
""")
print(" ✓ Added orchestration_logs.log_type")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ orchestration_logs.log_type already exists")
else:
raise
# Add reasoning_phase column
try:
cursor.execute("""
ALTER TABLE orchestration_logs
ADD COLUMN reasoning_phase TEXT
""")
print(" ✓ Added orchestration_logs.reasoning_phase")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ orchestration_logs.reasoning_phase already exists")
else:
raise
# Add confidence_level column
try:
cursor.execute("""
ALTER TABLE orchestration_logs
ADD COLUMN confidence_level TEXT
""")
print(" ✓ Added orchestration_logs.confidence_level")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ orchestration_logs.confidence_level already exists")
else:
raise
# Add references_json column (JSON array of file paths consulted)
try:
cursor.execute("""
ALTER TABLE orchestration_logs
ADD COLUMN references_json TEXT
""")
print(" ✓ Added orchestration_logs.references_json")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ orchestration_logs.references_json already exists")
else:
raise
# Add redacted column (1 if secrets were redacted)
try:
cursor.execute("""
ALTER TABLE orchestration_logs
ADD COLUMN redacted INTEGER DEFAULT 0
""")
print(" ✓ Added orchestration_logs.redacted")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ orchestration_logs.redacted already exists")
else:
raise
# Add group_id column for reasoning context
try:
cursor.execute("""
ALTER TABLE orchestration_logs
ADD COLUMN group_id TEXT
""")
print(" ✓ Added orchestration_logs.group_id")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ orchestration_logs.group_id already exists")
else:
raise
# Create index for reasoning queries
try:
cursor.execute("""
CREATE INDEX idx_logs_reasoning
ON orchestration_logs(session_id, log_type, reasoning_phase)
WHERE log_type = 'reasoning'
""")
print(" ✓ Created idx_logs_reasoning index")
except sqlite3.OperationalError as e:
if "already exists" in str(e).lower():
print(" ⊘ idx_logs_reasoning index already exists")
else:
raise
# Create index for group-based reasoning queries
try:
cursor.execute("""
CREATE INDEX idx_logs_group_reasoning
ON orchestration_logs(session_id, group_id, log_type)
WHERE log_type = 'reasoning'
""")
print(" ✓ Created idx_logs_group_reasoning index")
except sqlite3.OperationalError as e:
if "already exists" in str(e).lower():
print(" ⊘ idx_logs_group_reasoning index already exists")
else:
raise
# Commit and checkpoint
conn.commit()
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result:
busy, log_frames, checkpointed = checkpoint_result
if busy:
for retry in range(3):
time.sleep(0.5 * (retry + 1))
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result and not checkpoint_result[0]:
print(f" ✓ WAL checkpoint succeeded after retry {retry + 1}")
break
else:
print(f" ⚠️ WAL checkpoint incomplete: busy={busy}")
# Post-commit integrity verification
post_integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if post_integrity != "ok":
print(f" ⚠️ Post-commit integrity check failed: {post_integrity}")
# Refresh query planner statistics
cursor.execute("ANALYZE orchestration_logs;")
print(" ✓ WAL checkpoint completed")
current_version = 8
print("✓ Migration to v8 complete (agent reasoning capture)")
# Migration from v8 to v9: Add event logging and scope tracking
if current_version == 8:
print("🔄 Migrating schema from v8 to v9...")
# Check if tables exist (for fresh databases, skip ALTER)
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='orchestration_logs'")
logs_exists = cursor.fetchone() is not None
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sessions'")
sessions_exists = cursor.fetchone() is not None
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='task_groups'")
task_groups_exists = cursor.fetchone() is not None
if not logs_exists and not sessions_exists:
print(" ⊘ Base tables don't exist yet - will be created with full schema below")
print("✓ Migration to v9 complete (fresh database, skipped)")
current_version = 9
else:
# CRITICAL: Recreate orchestration_logs to update CHECK constraint
# Old schema has CHECK(log_type IN ('interaction', 'reasoning'))
# New schema needs CHECK(log_type IN ('interaction', 'reasoning', 'event'))
# SQLite doesn't support ALTER TABLE to modify CHECK constraints
#
# SAFETY: Wrap in transaction to prevent data loss on error
try:
cursor.execute("BEGIN IMMEDIATE")
if logs_exists:
print(" - Recreating orchestration_logs to update CHECK constraint...")
# Get column info to handle variable schemas
# IMPORTANT: Use ordered list (by cid) to ensure SELECT/INSERT column alignment
cursor.execute("PRAGMA table_info(orchestration_logs)")
col_info = cursor.fetchall()
# Sort by cid (column 0) to ensure deterministic order
col_info_sorted = sorted(col_info, key=lambda x: x[0])
col_names = [row[1] for row in col_info_sorted] # Ordered list, not set
# Backup existing data with columns in consistent order
col_list = ', '.join(col_names)
cursor.execute(f"SELECT {col_list} FROM orchestration_logs")
logs_data = cursor.fetchall()
print(f" - Backed up {len(logs_data)} orchestration log entries")
# Drop indexes first (they reference the table)
cursor.execute("DROP INDEX IF EXISTS idx_logs_session")
cursor.execute("DROP INDEX IF EXISTS idx_logs_agent_type")
cursor.execute("DROP INDEX IF EXISTS idx_logs_reasoning")
cursor.execute("DROP INDEX IF EXISTS idx_logs_events")
# Drop old table
cursor.execute("DROP TABLE orchestration_logs")
# Create new table with updated CHECK constraint (includes 'event')
cursor.execute("""
CREATE TABLE orchestration_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
iteration INTEGER,
agent_type TEXT NOT NULL,
agent_id TEXT,
content TEXT NOT NULL,
log_type TEXT DEFAULT 'interaction'
CHECK(log_type IN ('interaction', 'reasoning', 'event')),
reasoning_phase TEXT
CHECK(reasoning_phase IS NULL OR reasoning_phase IN (
'understanding', 'approach', 'decisions', 'risks',
'blockers', 'pivot', 'completion'
)),
confidence_level TEXT
CHECK(confidence_level IS NULL OR confidence_level IN ('high', 'medium', 'low')),
references_json TEXT,
redacted INTEGER DEFAULT 0 CHECK(redacted IN (0, 1)),
group_id TEXT,
event_subtype TEXT,
event_payload TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
print(" ✓ Created orchestration_logs with updated CHECK constraint")
# Restore data - map old columns to new schema
if logs_data:
# Build insert for columns that exist in both old and new
new_cols = {'id', 'session_id', 'timestamp', 'iteration', 'agent_type',
'agent_id', 'content', 'log_type', 'reasoning_phase',
'confidence_level', 'references_json', 'redacted', 'group_id',
'event_subtype', 'event_payload'}
common_cols = [c for c in col_names if c in new_cols]
col_indices = {name: idx for idx, name in enumerate(col_names)}
placeholders = ', '.join(['?' for _ in common_cols])
insert_cols = ', '.join(common_cols)
for row in logs_data:
values = []
for c in common_cols:
val = row[col_indices[c]]
# Coalesce NULL log_type to 'interaction' (v8 ALTER TABLE didn't backfill existing rows)
if c == 'log_type' and val is None:
val = 'interaction'
values.append(val)
cursor.execute(f"""
INSERT INTO orchestration_logs ({insert_cols})
VALUES ({placeholders})
""", values)
print(f" ✓ Restored {len(logs_data)} orchestration log entries")
# Recreate indexes
cursor.execute("""
CREATE INDEX idx_logs_session
ON orchestration_logs(session_id, timestamp DESC)
""")
cursor.execute("""
CREATE INDEX idx_logs_agent_type
ON orchestration_logs(session_id, agent_type)
""")
cursor.execute("""
CREATE INDEX idx_logs_reasoning
ON orchestration_logs(session_id, log_type, reasoning_phase)
WHERE log_type = 'reasoning'
""")
print(" ✓ Recreated indexes")
# Commit the transaction for table recreation
cursor.execute("COMMIT")
except Exception as e:
cursor.execute("ROLLBACK")
print(f" ❌ Migration failed, rolled back: {e}")
raise
# Add metadata column to sessions (for original_scope)
if sessions_exists:
try:
cursor.execute("""
ALTER TABLE sessions
ADD COLUMN metadata TEXT
""")
print(" ✓ Added sessions.metadata")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ sessions.metadata already exists")
else:
raise
# Add item_count column to task_groups
if task_groups_exists:
try:
cursor.execute("""
ALTER TABLE task_groups
ADD COLUMN item_count INTEGER DEFAULT 1
""")
print(" ✓ Added task_groups.item_count")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.item_count already exists")
else:
raise
# Create index for event queries
try:
cursor.execute("""
CREATE INDEX idx_logs_events
ON orchestration_logs(session_id, log_type, event_subtype)
WHERE log_type = 'event'
""")
print(" ✓ Created idx_logs_events index")
except sqlite3.OperationalError as e:
if "already exists" in str(e).lower():
print(" ⊘ idx_logs_events index already exists")
else:
raise
# Commit and checkpoint
conn.commit()
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result:
busy, log_frames, checkpointed = checkpoint_result
if busy:
for retry in range(3):
time.sleep(0.5 * (retry + 1))
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result and not checkpoint_result[0]:
print(f" ✓ WAL checkpoint succeeded after retry {retry + 1}")
break
else:
print(f" ⚠️ WAL checkpoint incomplete: busy={busy}")
# Post-commit integrity verification
post_integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if post_integrity != "ok":
print(f" ⚠️ Post-commit integrity check failed: {post_integrity}")
# Refresh query planner statistics
cursor.execute("ANALYZE orchestration_logs;")
cursor.execute("ANALYZE sessions;")
cursor.execute("ANALYZE task_groups;")
print(" ✓ WAL checkpoint completed")
current_version = 9
print("✓ Migration to v9 complete (event logging and scope tracking)")
# Migration from v9 to v10: Context Engineering System tables
if current_version == 9:
print("🔄 Migrating schema from v9 to v10...")
# Wrap entire migration in transaction for atomicity
conn.commit() # Close any implicit transaction
try:
cursor.execute("BEGIN IMMEDIATE")
# T004: Extend context_packages with priority and summary columns
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='context_packages'")
if cursor.fetchone():
# Add priority column if it doesn't exist
cursor.execute("PRAGMA table_info(context_packages)")
existing_cols = {row[1] for row in cursor.fetchall()}
if 'priority' not in existing_cols:
cursor.execute("""
ALTER TABLE context_packages
ADD COLUMN priority TEXT NOT NULL DEFAULT 'medium'
CHECK(priority IN ('low', 'medium', 'high', 'critical'))
""")
# Backfill any NULLs (safety for older SQLite versions)
cursor.execute("UPDATE context_packages SET priority = 'medium' WHERE priority IS NULL")
print(" ✓ Added context_packages.priority")
else:
print(" ⊘ context_packages.priority already exists")
if 'summary' not in existing_cols:
cursor.execute("""
ALTER TABLE context_packages
ADD COLUMN summary TEXT
""")
print(" ✓ Added context_packages.summary")
else:
print(" ⊘ context_packages.summary already exists")
# Create composite index for relevance ranking (per data-model.md)
# IF NOT EXISTS handles the case where index already exists
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_packages_priority_ranking
ON context_packages(session_id, priority, created_at DESC)
""")
print(" ✓ Created idx_packages_priority_ranking composite index")
# Create error_patterns table for learning from failed-then-succeeded agents
# Uses composite primary key (pattern_hash, project_id) to allow same pattern across projects
cursor.execute("""
CREATE TABLE IF NOT EXISTS error_patterns (
pattern_hash TEXT NOT NULL,
project_id TEXT NOT NULL,
signature_json TEXT NOT NULL,
solution TEXT NOT NULL,
confidence REAL DEFAULT 0.5 CHECK(confidence >= 0.0 AND confidence <= 1.0),
occurrences INTEGER DEFAULT 1 CHECK(occurrences >= 1),
lang TEXT,
last_seen TEXT DEFAULT (datetime('now')),
created_at TEXT DEFAULT (datetime('now')),
ttl_days INTEGER DEFAULT 90 CHECK(ttl_days > 0),
PRIMARY KEY (pattern_hash, project_id)
)
""")
print(" ✓ Created error_patterns table")
# Create indexes for error_patterns
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_patterns_project
ON error_patterns(project_id, lang)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_patterns_ttl
ON error_patterns(last_seen, ttl_days)
""")
print(" ✓ Created error_patterns indexes")
# Create strategies table for successful approaches
cursor.execute("""
CREATE TABLE IF NOT EXISTS strategies (
strategy_id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
topic TEXT NOT NULL,
insight TEXT NOT NULL,
helpfulness INTEGER DEFAULT 0 CHECK(helpfulness >= 0),
lang TEXT,
framework TEXT,
last_seen TEXT DEFAULT (datetime('now')),
created_at TEXT DEFAULT (datetime('now'))
)
""")
print(" ✓ Created strategies table")
# Create indexes for strategies
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_strategies_project
ON strategies(project_id, framework)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_strategies_topic
ON strategies(topic)
""")
print(" ✓ Created strategies indexes")
# Create consumption_scope table for iteration-aware package tracking
cursor.execute("""
CREATE TABLE IF NOT EXISTS consumption_scope (
scope_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
group_id TEXT NOT NULL,
agent_type TEXT NOT NULL CHECK(agent_type IN ('developer', 'qa_expert', 'tech_lead', 'senior_software_engineer', 'investigator')),
iteration INTEGER NOT NULL CHECK(iteration >= 0),
package_id INTEGER NOT NULL,
consumed_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
FOREIGN KEY (package_id) REFERENCES context_packages(id) ON DELETE CASCADE
)
""")
print(" ✓ Created consumption_scope table")
# Create indexes for consumption_scope
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_consumption_session
ON consumption_scope(session_id, group_id, agent_type)
""")
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_consumption_unique
ON consumption_scope(session_id, group_id, agent_type, iteration, package_id)
""")
print(" ✓ Created consumption_scope indexes")
# Verify integrity before committing
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v9→v10: Database integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
except Exception as e:
try:
conn.rollback()
except Exception as rollback_exc:
print(f" ! ROLLBACK failed: {rollback_exc}")
print(f" ✗ v9→v10 migration failed, rolled back: {e}")
raise
# WAL checkpoint after successful commit
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result:
busy, log_frames, checkpointed = checkpoint_result
if busy:
for retry in range(3):
time.sleep(0.5 * (retry + 1))
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result and not checkpoint_result[0]:
print(f" ✓ WAL checkpoint succeeded after retry {retry + 1}")
break
else:
# Log the latest checkpoint result, not the stale one
final_busy = checkpoint_result[0] if checkpoint_result else busy
print(f" ⚠️ WAL checkpoint incomplete: busy={final_busy}")
# Post-commit integrity verification
post_integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if post_integrity != "ok":
print(f" ⚠️ Post-commit integrity check failed: {post_integrity}")
# Refresh query planner statistics for newly created tables
cursor.execute("ANALYZE error_patterns;")
cursor.execute("ANALYZE strategies;")
cursor.execute("ANALYZE consumption_scope;")
# context_packages is analyzed later after CREATE TABLE IF NOT EXISTS
print(" ✓ WAL checkpoint completed")
current_version = 10
print("✓ Migration to v10 complete (context engineering system tables)")
# v10 → v11: Skill outputs multi-invocation support
if current_version == 10:
print("\n--- Migrating v10 → v11 (skill outputs multi-invocation) ---")
# Check if skill_outputs table exists (may not exist in fresh DBs during sequential migration)
table_exists = cursor.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='skill_outputs'
""").fetchone()
if not table_exists:
# Table will be created later with new columns - skip migration
print(" ⊘ skill_outputs table will be created with new columns")
else:
try:
# Use BEGIN IMMEDIATE to acquire exclusive lock for DDL safety
cursor.execute("BEGIN IMMEDIATE")
# Check existing columns in skill_outputs
columns = {row[1] for row in cursor.execute("PRAGMA table_info(skill_outputs)").fetchall()}
# Add agent_type column
if 'agent_type' not in columns:
cursor.execute("""
ALTER TABLE skill_outputs
ADD COLUMN agent_type TEXT
""")
print(" ✓ Added skill_outputs.agent_type")
else:
print(" ⊘ skill_outputs.agent_type already exists")
# Add group_id column
if 'group_id' not in columns:
cursor.execute("""
ALTER TABLE skill_outputs
ADD COLUMN group_id TEXT
""")
print(" ✓ Added skill_outputs.group_id")
else:
print(" ⊘ skill_outputs.group_id already exists")
# Add iteration column (default 1 for existing rows)
if 'iteration' not in columns:
cursor.execute("""
ALTER TABLE skill_outputs
ADD COLUMN iteration INTEGER DEFAULT 1
""")
print(" ✓ Added skill_outputs.iteration")
else:
print(" ⊘ skill_outputs.iteration already exists")
# Create composite index for efficient lookups
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_skill_agent_group
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration)
""")
print(" ✓ Created idx_skill_agent_group composite index")
# Verify integrity
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v10→v11: Integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v10→v11 migration failed, rolled back: {e}")
raise
current_version = 11
print("✓ Migration to v11 complete (skill outputs multi-invocation)")
# v11 → v12: Add UNIQUE constraint to skill_outputs for race condition prevention
if current_version == 11:
print("\n--- Migrating v11 → v12 (skill_outputs UNIQUE constraint) ---")
# Check if skill_outputs table exists
table_exists = cursor.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='skill_outputs'
""").fetchone()
if not table_exists:
# Table will be created later with UNIQUE constraint - skip migration
print(" ⊘ skill_outputs table will be created with UNIQUE constraint")
else:
try:
# Use BEGIN IMMEDIATE for exclusive lock during DDL
cursor.execute("BEGIN IMMEDIATE")
# Check if UNIQUE index already exists
existing_index = cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='index' AND name='idx_skill_unique_iteration'
""").fetchone()
if not existing_index:
# SQLite doesn't support ADD CONSTRAINT for UNIQUE on existing table
# Create a UNIQUE INDEX instead (functionally equivalent)
cursor.execute("""
CREATE UNIQUE INDEX idx_skill_unique_iteration
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration)
""")
print(" ✓ Created UNIQUE index idx_skill_unique_iteration")
else:
print(" ⊘ UNIQUE index idx_skill_unique_iteration already exists")
# Create DESC index for "latest" queries optimization
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_skill_latest
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration DESC)
""")
print(" ✓ Created idx_skill_latest (DESC) for latest queries")
# Verify integrity
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v11→v12: Integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
except sqlite3.IntegrityError as e:
conn.rollback()
if "UNIQUE constraint failed" in str(e):
print(f" ⚠️ UNIQUE constraint violation found - handling duplicates...")
# Handle duplicate iterations by renumbering
cursor.execute("BEGIN IMMEDIATE")
cursor.execute("""
UPDATE skill_outputs SET iteration = (
SELECT COUNT(*)
FROM skill_outputs s2
WHERE s2.session_id = skill_outputs.session_id
AND s2.skill_name = skill_outputs.skill_name
AND COALESCE(s2.agent_type, '') = COALESCE(skill_outputs.agent_type, '')
AND COALESCE(s2.group_id, '') = COALESCE(skill_outputs.group_id, '')
AND s2.timestamp <= skill_outputs.timestamp
)
""")
cursor.execute("""
CREATE UNIQUE INDEX idx_skill_unique_iteration
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_skill_latest
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration DESC)
""")
conn.commit()
print(" ✓ Fixed duplicate iterations and created UNIQUE index")
else:
print(f" ✗ v11→v12 migration failed: {e}")
raise
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v11→v12 migration failed, rolled back: {e}")
raise
current_version = 12
print("✓ Migration to v12 complete (skill_outputs UNIQUE constraint)")
# v12 → v13: Deterministic orchestration tables
if current_version == 12:
print("\n--- Migrating v12 → v13 (deterministic orchestration) ---")
try:
cursor.execute("BEGIN IMMEDIATE")
# Create workflow_transitions table (seeded from workflow/transitions.json)
cursor.execute("""
CREATE TABLE IF NOT EXISTS workflow_transitions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
current_agent TEXT NOT NULL,
response_status TEXT NOT NULL,
next_agent TEXT,
action TEXT NOT NULL,
include_context TEXT,
escalation_check INTEGER DEFAULT 0,
model_override TEXT,
fallback_agent TEXT,
bypass_qa INTEGER DEFAULT 0,
max_parallel INTEGER,
then_action TEXT,
UNIQUE(current_agent, response_status)
)
""")
# Add index for performance (matches fresh DB path)
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_wt_agent
ON workflow_transitions(current_agent)
""")
print(" ✓ Created workflow_transitions table with index")
# Create agent_markers table (seeded from workflow/agent-markers.json)
cursor.execute("""
CREATE TABLE IF NOT EXISTS agent_markers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_type TEXT NOT NULL UNIQUE,
required_markers TEXT NOT NULL,
workflow_markers TEXT
)
""")
print(" ✓ Created agent_markers table")
# Create workflow_special_rules table (seeded from workflow/transitions.json _special_rules)
cursor.execute("""
CREATE TABLE IF NOT EXISTS workflow_special_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rule_name TEXT NOT NULL UNIQUE,
description TEXT,
config TEXT NOT NULL
)
""")
print(" ✓ Created workflow_special_rules table")
# Verify integrity
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v12→v13: Integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v12→v13 migration failed, rolled back: {e}")
raise
current_version = 13
print("✓ Migration to v13 complete (deterministic orchestration tables)")
# v13 → v14: Add escalation tracking columns to task_groups
if current_version == 13:
print("\n--- Migrating v13 → v14 (escalation tracking columns) ---")
# Check if task_groups table exists (may not exist in fresh DBs during sequential migration)
table_exists = cursor.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='task_groups'
""").fetchone()
if not table_exists:
# Table will be created later with new columns - skip migration
print(" ⊘ task_groups table will be created with new columns")
else:
try:
# Add security_sensitive column
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN security_sensitive INTEGER DEFAULT 0")
print(" ✓ Added task_groups.security_sensitive")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.security_sensitive already exists")
else:
raise
# Add qa_attempts column for QA failure escalation
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN qa_attempts INTEGER DEFAULT 0")
print(" ✓ Added task_groups.qa_attempts")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.qa_attempts already exists")
else:
raise
# Add tl_review_attempts column for TL review loop tracking
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN tl_review_attempts INTEGER DEFAULT 0")
print(" ✓ Added task_groups.tl_review_attempts")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.tl_review_attempts already exists")
else:
raise
conn.commit()
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v13→v14 migration failed, rolled back: {e}")
raise
current_version = 14
print("✓ Migration to v14 complete (escalation tracking columns)")
# v14 → v15: Add component_path for version-specific prompt building
if current_version == 14:
print("\n--- Migrating v14 → v15 (component_path for version context) ---")
# Check if task_groups table exists
table_exists = cursor.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='task_groups'
""").fetchone()
if not table_exists:
# Table will be created later with new columns - skip migration
print(" ⊘ task_groups table will be created with component_path column")
else:
try:
cursor.execute("BEGIN IMMEDIATE")
# Add component_path column for monorepo component binding
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN component_path TEXT")
print(" ✓ Added task_groups.component_path")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.component_path already exists")
else:
raise
# Verify integrity before commit
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v14→v15: Integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
# WAL checkpoint for clean state
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result:
busy, log_frames, checkpointed = checkpoint_result
if busy:
for retry in range(3):
time.sleep(0.5 * (retry + 1))
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result and not checkpoint_result[0]:
print(f" ✓ WAL checkpoint succeeded after retry {retry + 1}")
break
else:
print(f" ⚠️ WAL checkpoint incomplete: busy={busy}")
# Post-commit integrity verification
post_integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if post_integrity != "ok":
print(f" ⚠️ Post-commit integrity check failed: {post_integrity}")
# Refresh query planner statistics
cursor.execute("ANALYZE task_groups;")
print(" ✓ ANALYZE completed")
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v14→v15 migration failed, rolled back: {e}")
raise
current_version = 15
print("✓ Migration to v15 complete (component_path for version context)")
# v15 → v16: Add review iteration tracking columns to task_groups
if current_version == 15:
print("\n--- Migrating v15 → v16 (review iteration tracking) ---")
# Check if task_groups table exists
table_exists = cursor.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='task_groups'
""").fetchone()
if not table_exists:
# Table will be created later with new columns - skip migration
print(" ⊘ task_groups table will be created with review iteration columns")
else:
try:
cursor.execute("BEGIN IMMEDIATE")
# Add review_iteration column (current iteration in feedback loop, starts at 1)
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN review_iteration INTEGER DEFAULT 1")
print(" ✓ Added task_groups.review_iteration")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.review_iteration already exists")
else:
raise
# Add no_progress_count column (consecutive iterations with 0 fixes)
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN no_progress_count INTEGER DEFAULT 0")
print(" ✓ Added task_groups.no_progress_count")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.no_progress_count already exists")
else:
raise
# Add blocking_issues_count column (count of unresolved CRITICAL/HIGH issues)
try:
cursor.execute("ALTER TABLE task_groups ADD COLUMN blocking_issues_count INTEGER DEFAULT 0")
print(" ✓ Added task_groups.blocking_issues_count")
except sqlite3.OperationalError as e:
if "duplicate column" in str(e).lower():
print(" ⊘ task_groups.blocking_issues_count already exists")
else:
raise
# Backfill NULL values in existing rows (ALTER TABLE DEFAULT doesn't populate existing rows)
backfill_ri = cursor.execute(
"UPDATE task_groups SET review_iteration = 1 WHERE review_iteration IS NULL"
).rowcount
if backfill_ri > 0:
print(f" ✓ Backfilled review_iteration=1 for {backfill_ri} rows")
backfill_npc = cursor.execute(
"UPDATE task_groups SET no_progress_count = 0 WHERE no_progress_count IS NULL"
).rowcount
if backfill_npc > 0:
print(f" ✓ Backfilled no_progress_count=0 for {backfill_npc} rows")
backfill_bic = cursor.execute(
"UPDATE task_groups SET blocking_issues_count = 0 WHERE blocking_issues_count IS NULL"
).rowcount
if backfill_bic > 0:
print(f" ✓ Backfilled blocking_issues_count=0 for {backfill_bic} rows")
# Verify integrity before commit
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v15→v16: Integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
# WAL checkpoint for clean state
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result:
busy, log_frames, checkpointed = checkpoint_result
if busy:
for retry in range(3):
time.sleep(0.5 * (retry + 1))
checkpoint_result = cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchone()
if checkpoint_result and not checkpoint_result[0]:
print(f" ✓ WAL checkpoint succeeded after retry {retry + 1}")
break
else:
print(f" ⚠️ WAL checkpoint incomplete: busy={busy}")
# Post-commit integrity verification
post_integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if post_integrity != "ok":
print(f" ⚠️ Post-commit integrity check failed: {post_integrity}")
# Refresh query planner statistics
cursor.execute("ANALYZE task_groups;")
print(" ✓ ANALYZE completed")
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v15→v16 migration failed, rolled back: {e}")
raise
current_version = 16
print("✓ Migration to v16 complete (review iteration tracking)")
# v16 → v17: Add idempotency_key column to orchestration_logs for event deduplication
if current_version == 16:
print("\n--- Migrating v16 → v17 (event idempotency support) ---")
# Check if orchestration_logs table exists
table_exists = cursor.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='orchestration_logs'
""").fetchone()
if not table_exists:
# Table will be created later with new column - skip migration
print(" ⊘ orchestration_logs table will be created with idempotency_key column")
else:
# Check if column already exists
cols = cursor.execute("PRAGMA table_info(orchestration_logs)").fetchall()
col_names = {col[1] for col in cols}
try:
# Begin transaction for migration
conn.execute("BEGIN IMMEDIATE")
# Add idempotency_key column if missing
if 'idempotency_key' not in col_names:
cursor.execute("""
ALTER TABLE orchestration_logs
ADD COLUMN idempotency_key TEXT
""")
print(" ✓ Added orchestration_logs.idempotency_key")
else:
print(" ⊘ orchestration_logs.idempotency_key already exists")
# Create partial unique index for idempotency enforcement
try:
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_logs_idempotency
ON orchestration_logs(session_id, event_subtype, idempotency_key)
WHERE idempotency_key IS NOT NULL AND log_type = 'event'
""")
print(" ✓ Created idx_logs_idempotency unique index")
except sqlite3.OperationalError as e:
if "already exists" in str(e):
print(" ⊘ idx_logs_idempotency index already exists")
else:
raise
# Verify integrity before commit
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v16→v17: Integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
# WAL checkpoint for clean state
try:
cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);")
except Exception:
pass # WAL checkpoint is optional
# Refresh query planner statistics
cursor.execute("ANALYZE orchestration_logs;")
print(" ✓ ANALYZE completed")
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v16→v17 migration failed, rolled back: {e}")
raise
current_version = 17
print("✓ Migration to v17 complete (event idempotency support)")
# v17 → v18: Investigation state isolation + group-aware dedup
if current_version == 17:
print("\n--- Migrating v17 → v18 (investigation state isolation) ---")
# Check if state_snapshots table exists
table_exists = cursor.execute("""
SELECT name FROM sqlite_master WHERE type='table' AND name='state_snapshots'
""").fetchone()
if not table_exists:
# Table will be created later with new schema - skip migration
print(" ⊘ state_snapshots table will be created with new schema")
else:
try:
# Begin transaction for table rebuild
conn.execute("BEGIN IMMEDIATE")
# Backup existing data with SQL window deduplication
# Keep only the latest row per (session_id, state_type) to avoid UNIQUE violations
cursor.execute("""
SELECT id, session_id, timestamp, state_type, state_data
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY session_id, state_type
ORDER BY timestamp DESC
) as rn
FROM state_snapshots
) WHERE rn = 1
""")
state_data = cursor.fetchall()
# Also get total count to show dedup info
cursor.execute("SELECT COUNT(*) FROM state_snapshots")
total_count = cursor.fetchone()[0]
if total_count != len(state_data):
print(f" - Backed up {len(state_data)} unique state entries (deduplicated from {total_count})")
else:
print(f" - Backed up {len(state_data)} state snapshot entries")
# Drop old table and indexes
cursor.execute("DROP INDEX IF EXISTS idx_state_session_type")
cursor.execute("DROP TABLE state_snapshots")
# Create new table with group_id and expanded CHECK constraint
cursor.execute("""
CREATE TABLE state_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
group_id TEXT NOT NULL DEFAULT 'global',
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
state_type TEXT CHECK(state_type IN ('pm', 'orchestrator', 'group_status', 'investigation')),
state_data TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
print(" ✓ Created state_snapshots with group_id and 'investigation' state_type")
# Create UNIQUE index for upsert support
cursor.execute("""
CREATE UNIQUE INDEX idx_state_unique
ON state_snapshots(session_id, state_type, group_id)
""")
print(" ✓ Created idx_state_unique for upsert support")
# Create performance index
cursor.execute("""
CREATE INDEX idx_state_session_type_group
ON state_snapshots(session_id, state_type, group_id, timestamp DESC)
""")
print(" ✓ Created idx_state_session_type_group performance index")
# Restore data with group_id='global' backfill
if state_data:
for row in state_data:
cursor.execute("""
INSERT INTO state_snapshots (id, session_id, group_id, timestamp, state_type, state_data)
VALUES (?, ?, 'global', ?, ?, ?)
""", (row[0], row[1], row[2], row[3], row[4]))
print(f" ✓ Restored {len(state_data)} entries with group_id='global'")
# Update event idempotency index to include group_id
cursor.execute("DROP INDEX IF EXISTS idx_logs_idempotency")
cursor.execute("""
CREATE UNIQUE INDEX idx_logs_idempotency
ON orchestration_logs(session_id, event_subtype, group_id, idempotency_key)
WHERE idempotency_key IS NOT NULL AND log_type = 'event'
""")
print(" ✓ Recreated idx_logs_idempotency with group_id")
# Fix F: Backfill NULL group_id to 'global' for legacy events
# See: research/domain-skill-migration-phase4-ultrathink.md
# This ensures queries with group_id='global' predicate find old events
result = cursor.execute("""
UPDATE orchestration_logs
SET group_id = 'global'
WHERE log_type = 'event' AND (group_id IS NULL OR group_id = '')
""")
backfill_count = result.rowcount
if backfill_count > 0:
print(f" ✓ Backfilled {backfill_count} events with group_id='global'")
# Verify integrity before commit
integrity = cursor.execute("PRAGMA integrity_check;").fetchone()[0]
if integrity != "ok":
raise sqlite3.IntegrityError(f"Migration v17→v18: Integrity check failed: {integrity}")
conn.commit()
print(" ✓ Migration transaction committed")
# WAL checkpoint for clean state
try:
cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);")
except Exception:
pass # WAL checkpoint is optional
# Refresh query planner statistics
cursor.execute("ANALYZE state_snapshots;")
cursor.execute("ANALYZE orchestration_logs;")
print(" ✓ ANALYZE completed")
except Exception as e:
try:
conn.rollback()
except Exception:
pass
print(f" ✗ v17→v18 migration failed, rolled back: {e}")
raise
current_version = 18
print("✓ Migration to v18 complete (investigation state isolation)")
# Record version upgrade
cursor.execute("""
INSERT OR REPLACE INTO schema_version (version, description)
VALUES (?, ?)
""", (SCHEMA_VERSION, f"Schema v{SCHEMA_VERSION}: Investigation state isolation + group-aware dedup"))
conn.commit()
print(f"✓ Schema upgraded to v{SCHEMA_VERSION}")
elif current_version == SCHEMA_VERSION:
print(f"✓ Schema is up-to-date (v{SCHEMA_VERSION})")
print("\nCreating/verifying BAZINGA database schema...")
# Sessions table
# Extended in v9 to support metadata (JSON) for original_scope tracking
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
mode TEXT CHECK(mode IN ('simple', 'parallel')),
original_requirements TEXT,
status TEXT CHECK(status IN ('active', 'completed', 'failed')) DEFAULT 'active',
initial_branch TEXT DEFAULT 'main',
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
print("✓ Created sessions table")
# Orchestration logs table (replaces orchestration-log.md)
# Extended in v8 to support agent reasoning capture
# Extended in v9 to support event logging (pm_bazinga, scope_change, validator_verdict)
# Extended in v17 to support idempotency_key for event deduplication
# CHECK constraints enforce valid enumeration values at database layer
cursor.execute("""
CREATE TABLE IF NOT EXISTS orchestration_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
iteration INTEGER,
agent_type TEXT NOT NULL,
agent_id TEXT,
content TEXT NOT NULL,
log_type TEXT DEFAULT 'interaction'
CHECK(log_type IN ('interaction', 'reasoning', 'event')),
reasoning_phase TEXT
CHECK(reasoning_phase IS NULL OR reasoning_phase IN (
'understanding', 'approach', 'decisions', 'risks',
'blockers', 'pivot', 'completion'
)),
confidence_level TEXT
CHECK(confidence_level IS NULL OR confidence_level IN ('high', 'medium', 'low')),
references_json TEXT,
redacted INTEGER DEFAULT 0 CHECK(redacted IN (0, 1)),
group_id TEXT,
event_subtype TEXT,
event_payload TEXT,
idempotency_key TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_logs_session
ON orchestration_logs(session_id, timestamp DESC)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_logs_agent_type
ON orchestration_logs(session_id, agent_type)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_logs_reasoning
ON orchestration_logs(session_id, log_type, reasoning_phase)
WHERE log_type = 'reasoning'
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_logs_group_reasoning
ON orchestration_logs(session_id, group_id, log_type)
WHERE log_type = 'reasoning'
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_logs_events
ON orchestration_logs(session_id, log_type, event_subtype)
WHERE log_type = 'event'
""")
# v18: Updated to include group_id for cross-group isolation
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_logs_idempotency
ON orchestration_logs(session_id, event_subtype, group_id, idempotency_key)
WHERE idempotency_key IS NOT NULL AND log_type = 'event'
""")
print("✓ Created orchestration_logs table with indexes")
# State snapshots table (replaces JSON state files)
# Extended in v18 to support group_id for investigation state isolation
# Extended in v18 to support 'investigation' state_type
cursor.execute("""
CREATE TABLE IF NOT EXISTS state_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
group_id TEXT NOT NULL DEFAULT 'global',
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
state_type TEXT CHECK(state_type IN ('pm', 'orchestrator', 'group_status', 'investigation')),
state_data TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
# UNIQUE index for upsert support (allows ON CONFLICT)
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_state_unique
ON state_snapshots(session_id, state_type, group_id)
""")
# Performance index for queries
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_state_session_type_group
ON state_snapshots(session_id, state_type, group_id, timestamp DESC)
""")
print("✓ Created state_snapshots table with indexes")
# Task groups table (normalized from pm_state.json)
# PRIMARY KEY: Composite (id, session_id) allows same group ID across sessions
# Extended in v9 to support item_count for progress tracking
# Extended in v14 to support security_sensitive, qa_attempts, tl_review_attempts
# Extended in v15 to support component_path for version-specific prompt building
# Extended in v16 to support review_iteration, no_progress_count, blocking_issues_count
cursor.execute("""
CREATE TABLE IF NOT EXISTS task_groups (
id TEXT NOT NULL,
session_id TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT CHECK(status IN (
'pending', 'in_progress', 'completed', 'failed',
'approved_pending_merge', 'merging'
)) DEFAULT 'pending',
assigned_to TEXT,
revision_count INTEGER DEFAULT 0,
last_review_status TEXT CHECK(last_review_status IN ('APPROVED', 'CHANGES_REQUESTED', NULL)),
feature_branch TEXT,
merge_status TEXT CHECK(merge_status IN ('pending', 'in_progress', 'merged', 'conflict', 'test_failure', NULL)),
complexity INTEGER CHECK(complexity BETWEEN 1 AND 10),
initial_tier TEXT CHECK(initial_tier IN ('Developer', 'Senior Software Engineer')) DEFAULT 'Developer',
context_references TEXT,
specializations TEXT,
item_count INTEGER DEFAULT 1,
security_sensitive INTEGER DEFAULT 0,
qa_attempts INTEGER DEFAULT 0,
tl_review_attempts INTEGER DEFAULT 0,
component_path TEXT,
review_iteration INTEGER DEFAULT 1 CHECK(review_iteration >= 1),
no_progress_count INTEGER DEFAULT 0 CHECK(no_progress_count >= 0),
blocking_issues_count INTEGER DEFAULT 0 CHECK(blocking_issues_count >= 0),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, session_id),
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_taskgroups_session
ON task_groups(session_id, status)
""")
print("✓ Created task_groups table with indexes")
# Token usage tracking
cursor.execute("""
CREATE TABLE IF NOT EXISTS token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
agent_type TEXT NOT NULL,
agent_id TEXT,
tokens_estimated INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tokens_session
ON token_usage(session_id, agent_type)
""")
print("✓ Created token_usage table with indexes")
# Skill outputs table (replaces individual JSON files)
# v11: Added agent_type, group_id, iteration for multi-invocation support
# v12: Added UNIQUE constraint on iteration for race condition prevention
cursor.execute("""
CREATE TABLE IF NOT EXISTS skill_outputs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
skill_name TEXT NOT NULL,
output_data TEXT NOT NULL,
agent_type TEXT,
group_id TEXT,
iteration INTEGER DEFAULT 1,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_skill_session
ON skill_outputs(session_id, skill_name, timestamp DESC)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_skill_agent_group
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration)
""")
# v12: UNIQUE index for race condition prevention
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_skill_unique_iteration
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration)
""")
# v12: DESC index for "latest" query optimization
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_skill_latest
ON skill_outputs(session_id, skill_name, agent_type, group_id, iteration DESC)
""")
print("✓ Created skill_outputs table with indexes")
# REMOVED: Configuration table - No use case defined
# See research/empty-tables-analysis.md for details
# Table creation commented out as of 2025-11-21
# REMOVED: Decisions table - Redundant with orchestration_logs
# See research/empty-tables-analysis.md for details
# Table creation commented out as of 2025-11-21
# Development plans table (for multi-phase orchestrations)
cursor.execute("""
CREATE TABLE IF NOT EXISTS development_plans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT UNIQUE NOT NULL,
original_prompt TEXT NOT NULL,
plan_text TEXT NOT NULL,
phases TEXT NOT NULL,
current_phase INTEGER,
total_phases INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_devplans_session
ON development_plans(session_id)
""")
print("✓ Created development_plans table with indexes")
# Success criteria table (for BAZINGA validation)
cursor.execute("""
CREATE TABLE IF NOT EXISTS success_criteria (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
criterion TEXT NOT NULL,
status TEXT CHECK(status IN ('pending', 'met', 'blocked', 'failed')) DEFAULT 'pending',
actual TEXT,
evidence TEXT,
required_for_completion BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
cursor.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_criterion
ON success_criteria(session_id, criterion)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_criteria_session_status
ON success_criteria(session_id, status)
""")
print("✓ Created success_criteria table with indexes")
# Context packages table (for inter-agent communication)
cursor.execute("""
CREATE TABLE IF NOT EXISTS context_packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
group_id TEXT,
package_type TEXT NOT NULL CHECK(package_type IN ('research', 'failures', 'decisions', 'handoff', 'investigation')),
file_path TEXT NOT NULL,
producer_agent TEXT NOT NULL,
priority TEXT NOT NULL DEFAULT 'medium' CHECK(priority IN ('low', 'medium', 'high', 'critical')),
summary TEXT NOT NULL,
size_bytes INTEGER,
version INTEGER DEFAULT 1,
supersedes_id INTEGER,
scope TEXT DEFAULT 'group' CHECK(scope IN ('group', 'global')),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
FOREIGN KEY (supersedes_id) REFERENCES context_packages(id)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cp_session ON context_packages(session_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cp_group ON context_packages(group_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cp_type ON context_packages(package_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cp_priority ON context_packages(priority)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cp_scope ON context_packages(scope)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cp_created ON context_packages(created_at)")
# Composite index for relevance ranking queries (per data-model.md)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_packages_priority_ranking ON context_packages(session_id, priority, created_at DESC)")
print("✓ Created context_packages table with indexes")
# Context package consumers join table (for per-agent consumption tracking)
cursor.execute("""
CREATE TABLE IF NOT EXISTS context_package_consumers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL,
agent_type TEXT NOT NULL,
consumed_at TIMESTAMP,
iteration INTEGER DEFAULT 1,
FOREIGN KEY (package_id) REFERENCES context_packages(id) ON DELETE CASCADE,
UNIQUE(package_id, agent_type, iteration)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cpc_package ON context_package_consumers(package_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cpc_agent ON context_package_consumers(agent_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cpc_pending ON context_package_consumers(consumed_at) WHERE consumed_at IS NULL")
print("✓ Created context_package_consumers table with indexes")
# Error patterns table (for context engineering - learning from failed-then-succeeded agents)
# Uses composite primary key (pattern_hash, project_id) to allow same pattern across projects
cursor.execute("""
CREATE TABLE IF NOT EXISTS error_patterns (
pattern_hash TEXT NOT NULL,
project_id TEXT NOT NULL,
signature_json TEXT NOT NULL,
solution TEXT NOT NULL,
confidence REAL DEFAULT 0.5 CHECK(confidence >= 0.0 AND confidence <= 1.0),
occurrences INTEGER DEFAULT 1 CHECK(occurrences >= 1),
lang TEXT,
last_seen TEXT DEFAULT (datetime('now')),
created_at TEXT DEFAULT (datetime('now')),
ttl_days INTEGER DEFAULT 90 CHECK(ttl_days > 0),
PRIMARY KEY (pattern_hash, project_id)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_patterns_project ON error_patterns(project_id, lang)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_patterns_ttl ON error_patterns(last_seen, ttl_days)")
print("✓ Created error_patterns table with indexes")
# Strategies table (for context engineering - successful approaches from completions)
cursor.execute("""
CREATE TABLE IF NOT EXISTS strategies (
strategy_id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
topic TEXT NOT NULL,
insight TEXT NOT NULL,
helpfulness INTEGER DEFAULT 0 CHECK(helpfulness >= 0),
lang TEXT,
framework TEXT,
last_seen TEXT DEFAULT (datetime('now')),
created_at TEXT DEFAULT (datetime('now'))
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_strategies_project ON strategies(project_id, framework)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_strategies_topic ON strategies(topic)")
print("✓ Created strategies table with indexes")
# Consumption scope table (for context engineering - iteration-aware package tracking)
cursor.execute("""
CREATE TABLE IF NOT EXISTS consumption_scope (
scope_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
group_id TEXT NOT NULL,
agent_type TEXT NOT NULL CHECK(agent_type IN ('developer', 'qa_expert', 'tech_lead', 'senior_software_engineer', 'investigator')),
iteration INTEGER NOT NULL CHECK(iteration >= 0),
package_id INTEGER NOT NULL,
consumed_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
FOREIGN KEY (package_id) REFERENCES context_packages(id) ON DELETE CASCADE
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_consumption_session ON consumption_scope(session_id, group_id, agent_type)")
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_consumption_unique ON consumption_scope(session_id, group_id, agent_type, iteration, package_id)")
print("✓ Created consumption_scope table with indexes")
# Workflow transitions table (seeded from workflow/transitions.json via bazinga/config symlink)
cursor.execute("""
CREATE TABLE IF NOT EXISTS workflow_transitions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
current_agent TEXT NOT NULL,
response_status TEXT NOT NULL,
next_agent TEXT,
action TEXT NOT NULL,
include_context TEXT,
escalation_check INTEGER DEFAULT 0,
model_override TEXT,
fallback_agent TEXT,
bypass_qa INTEGER DEFAULT 0,
max_parallel INTEGER,
then_action TEXT,
UNIQUE(current_agent, response_status)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_wt_agent ON workflow_transitions(current_agent)")
print("✓ Created workflow_transitions table with indexes")
# Agent markers table (seeded from workflow/agent-markers.json via bazinga/config symlink)
cursor.execute("""
CREATE TABLE IF NOT EXISTS agent_markers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_type TEXT NOT NULL UNIQUE,
required_markers TEXT NOT NULL,
workflow_markers TEXT
)
""")
print("✓ Created agent_markers table")
# Workflow special rules table (seeded from workflow/transitions.json _special_rules)
cursor.execute("""
CREATE TABLE IF NOT EXISTS workflow_special_rules (
id INTE
... (truncated)
```
### scripts/init_session.py
```python
#!/usr/bin/env python3
"""
Initialize a BAZINGA session with all required setup.
This script ensures the deterministic orchestration infrastructure is ready:
1. Database exists and has correct schema
2. Workflow configs are seeded (transitions, markers, special rules)
3. Session artifacts directory exists
Usage:
python3 .claude/skills/bazinga-db/scripts/init_session.py [--session-id ID]
This should be called at the START of every BAZINGA orchestration session,
BEFORE spawning PM. It's idempotent - safe to run multiple times.
"""
import argparse
import json
import sqlite3
import subprocess
import sys
from datetime import datetime
from pathlib import Path
def get_project_root():
"""Detect project root by looking for .claude directory or bazinga directory."""
script_dir = Path(__file__).resolve().parent
current = script_dir
for _ in range(10):
if (current / ".claude").is_dir() or (current / "bazinga").is_dir():
return current
parent = current.parent
if parent == current:
break
current = parent
cwd = Path.cwd()
if (cwd / ".claude").is_dir() or (cwd / "bazinga").is_dir():
return cwd
return cwd
PROJECT_ROOT = get_project_root()
def ensure_database(db_path: Path) -> bool:
"""Ensure database exists and has correct schema."""
init_script = PROJECT_ROOT / ".claude" / "skills" / "bazinga-db" / "scripts" / "init_db.py"
if not init_script.exists():
print(f"ERROR: init_db.py not found at {init_script}", file=sys.stderr)
return False
# Run init_db.py - it's idempotent (handles migrations, won't re-create tables)
result = subprocess.run(
[sys.executable, str(init_script), str(db_path)],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"ERROR: Database initialization failed:", file=sys.stderr)
print(result.stderr, file=sys.stderr)
return False
# Check if any output indicates success
if "already initialized" in result.stdout.lower() or "initialized" in result.stdout.lower():
print("✓ Database ready", file=sys.stderr)
else:
print("✓ Database initialized", file=sys.stderr)
return True
def ensure_config_seeded(db_path: Path) -> bool:
"""Ensure workflow configs are seeded into database."""
seed_script = PROJECT_ROOT / ".claude" / "skills" / "config-seeder" / "scripts" / "seed_configs.py"
if not seed_script.exists():
print(f"ERROR: seed_configs.py not found at {seed_script}", file=sys.stderr)
return False
# Check if config is already seeded by checking BOTH workflow_transitions AND agent_markers
try:
conn = sqlite3.connect(str(db_path), timeout=5.0) # 5s busy timeout for concurrency
cursor = conn.cursor()
# Check transitions count
cursor.execute("SELECT COUNT(*) FROM workflow_transitions")
transitions_count = cursor.fetchone()[0]
# Check markers count - both are required for prompt-builder/marker validation
cursor.execute("SELECT COUNT(*) FROM agent_markers")
markers_count = cursor.fetchone()[0]
conn.close()
if transitions_count > 0 and markers_count > 0:
print(f"✓ Config already seeded ({transitions_count} transitions, {markers_count} markers)", file=sys.stderr)
return True
elif transitions_count > 0 or markers_count > 0:
print(f"⚠ Partial config detected ({transitions_count} transitions, {markers_count} markers) - reseeding", file=sys.stderr)
# Fall through to reseed
except sqlite3.OperationalError:
# Table doesn't exist - need to seed
pass
# Run seed_configs.py
result = subprocess.run(
[sys.executable, str(seed_script), "--all", "--db", str(db_path)],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"ERROR: Config seeding failed:", file=sys.stderr)
print(result.stderr, file=sys.stderr)
return False
# Print seed output
if result.stdout:
for line in result.stdout.strip().split("\n"):
print(f" {line}", file=sys.stderr)
print("✓ Config seeded", file=sys.stderr)
return True
def ensure_artifacts_dir(session_id: str) -> Path:
"""Ensure session artifacts directory exists."""
artifacts_dir = PROJECT_ROOT / "bazinga" / "artifacts" / session_id
artifacts_dir.mkdir(parents=True, exist_ok=True)
skills_dir = artifacts_dir / "skills"
skills_dir.mkdir(exist_ok=True)
print(f"✓ Artifacts directory ready: {artifacts_dir}", file=sys.stderr)
return artifacts_dir
def verify_ready(db_path: Path) -> dict:
"""Verify all components are ready and return status."""
status = {
"db_exists": db_path.exists(),
"transitions_count": 0,
"markers_count": 0,
"rules_count": 0,
"ready": False
}
if not status["db_exists"]:
return status
try:
conn = sqlite3.connect(str(db_path), timeout=5.0) # 5s busy timeout for concurrency
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM workflow_transitions")
status["transitions_count"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM agent_markers")
status["markers_count"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM workflow_special_rules")
status["rules_count"] = cursor.fetchone()[0]
conn.close()
# Ready if we have all required config
# Note: rules_count > 0 is required because special rules control
# testing_mode behavior, escalation triggers, and security enforcement
status["ready"] = (
status["transitions_count"] > 0 and
status["markers_count"] > 0 and
status["rules_count"] > 0
)
except sqlite3.OperationalError as e:
status["error"] = str(e)
return status
def main():
parser = argparse.ArgumentParser(
description="Initialize BAZINGA session infrastructure",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
This script ensures deterministic orchestration is ready:
1. Database exists with correct schema
2. Workflow configs are seeded
3. Session artifacts directory exists
Run this BEFORE spawning PM in any orchestration session.
"""
)
parser.add_argument("--session-id", type=str, default=None,
help="Session ID (auto-generated if not provided)")
parser.add_argument("--db", type=str, default=None,
help="Database path (default: bazinga/bazinga.db)")
parser.add_argument("--project-root", type=str, default=None,
help="Override detected project root")
parser.add_argument("--check-only", action="store_true",
help="Only check readiness, don't initialize")
parser.add_argument("--json", action="store_true",
help="Output status as JSON")
args = parser.parse_args()
global PROJECT_ROOT
if args.project_root:
PROJECT_ROOT = Path(args.project_root)
db_path = Path(args.db) if args.db else PROJECT_ROOT / "bazinga" / "bazinga.db"
session_id = args.session_id or f"bazinga_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if args.check_only:
status = verify_ready(db_path)
if args.json:
print(json.dumps(status))
else:
if status["ready"]:
print(f"✓ Session infrastructure ready", file=sys.stderr)
print(f" Transitions: {status['transitions_count']}", file=sys.stderr)
print(f" Markers: {status['markers_count']}", file=sys.stderr)
print(f" Rules: {status['rules_count']}", file=sys.stderr)
else:
print(f"✗ Session infrastructure NOT ready", file=sys.stderr)
if not status["db_exists"]:
print(f" Database missing: {db_path}", file=sys.stderr)
elif "error" in status:
print(f" Error: {status['error']}", file=sys.stderr)
else:
print(f" Missing config - run without --check-only to initialize", file=sys.stderr)
sys.exit(0 if status["ready"] else 1)
print(f"🚀 Initializing BAZINGA session: {session_id}", file=sys.stderr)
print(f" Project root: {PROJECT_ROOT}", file=sys.stderr)
print(f" Database: {db_path}", file=sys.stderr)
# Step 1: Ensure database
if not ensure_database(db_path):
print("❌ Session initialization FAILED: database setup failed", file=sys.stderr)
sys.exit(1)
# Step 2: Ensure config seeded
if not ensure_config_seeded(db_path):
print("❌ Session initialization FAILED: config seeding failed", file=sys.stderr)
sys.exit(1)
# Step 3: Ensure artifacts directory
ensure_artifacts_dir(session_id)
# Step 4: Verify everything is ready
status = verify_ready(db_path)
if not status["ready"]:
print("❌ Session initialization FAILED: verification failed", file=sys.stderr)
print(f" Status: {status}", file=sys.stderr)
sys.exit(1)
print(f"✅ Session initialization COMPLETE", file=sys.stderr)
print(f" Session ID: {session_id}", file=sys.stderr)
print(f" Transitions: {status['transitions_count']}", file=sys.stderr)
print(f" Markers: {status['markers_count']}", file=sys.stderr)
print(f" Rules: {status['rules_count']}", file=sys.stderr)
# Output session ID to stdout for capture
print(session_id)
if __name__ == "__main__":
main()
```
### references/schema.md
```markdown
# BAZINGA Database Schema Reference
This document provides complete reference documentation for the BAZINGA database schema.
## Database Configuration
- **Engine**: SQLite 3
- **Journal Mode**: WAL (Write-Ahead Logging) for better concurrency
- **Foreign Keys**: Enabled for referential integrity
- **Location**: `/home/user/bazinga/bazinga/bazinga.db`
## Tables Overview
| Table | Purpose | Key Features |
|-------|---------|-------------|
| `sessions` | Track orchestration sessions | Primary session metadata |
| `orchestration_logs` | Agent interaction logs | Replaces orchestration-log.md |
| `state_snapshots` | State history | Replaces JSON state files |
| `task_groups` | PM task management | Normalized from pm_state.json |
| `token_usage` | Token tracking | Per-agent token consumption |
| `skill_outputs` | Skill results | Replaces skill JSON files |
| `configuration` | System config | Replaces config JSON files |
| `decisions` | Orchestrator decisions | Decision audit trail |
| `model_config` | Agent model assignments | Dynamic model selection |
| `context_packages` | Inter-agent context | Research, failures, decisions, handoffs |
| `context_package_consumers` | Consumer tracking | Join table for per-agent consumption |
| `workflow_transitions` | Deterministic routing | State machine for agent workflow |
| `agent_markers` | Prompt validation | Required markers per agent type |
| `workflow_special_rules` | Routing rules | Testing mode, escalation, security rules |
---
## Table Schemas
### sessions
Tracks orchestration sessions from creation to completion.
```sql
CREATE TABLE sessions (
session_id TEXT PRIMARY KEY,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
mode TEXT CHECK(mode IN ('simple', 'parallel')),
original_requirements TEXT,
status TEXT CHECK(status IN ('active', 'completed', 'failed')) DEFAULT 'active',
initial_branch TEXT DEFAULT 'main',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
```
**Columns:**
- `session_id`: Unique session identifier (e.g., `bazinga_20250112_143022`)
- `start_time`: Session start timestamp
- `end_time`: Session completion timestamp (NULL if active)
- `mode`: Execution mode (`simple` or `parallel`)
- `original_requirements`: Original user request text
- `status`: Current session status
- `initial_branch`: Base branch all work merges back to (captured at session start)
- `created_at`: Record creation timestamp
**Usage Example:**
```python
# Create new session
db.create_session('bazinga_20250112_143022', 'parallel', 'Add authentication feature')
# Update session status
db.update_session_status('bazinga_20250112_143022', 'completed')
```
---
### orchestration_logs
Stores all agent interactions and reasoning (replaces `orchestration-log.md`). Extended in schema v8 to support agent reasoning capture.
```sql
CREATE TABLE orchestration_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
iteration INTEGER,
agent_type TEXT NOT NULL,
agent_id TEXT,
content TEXT NOT NULL,
-- Reasoning capture columns (v8)
log_type TEXT DEFAULT 'interaction', -- 'interaction', 'reasoning', or 'event'
reasoning_phase TEXT, -- understanding, approach, decisions, risks, blockers, pivot, completion
confidence_level TEXT, -- high, medium, low
references_json TEXT, -- JSON array of files consulted
redacted INTEGER DEFAULT 0, -- 1 if secrets were redacted
group_id TEXT, -- Task group for reasoning context
-- Event columns (v17)
event_subtype TEXT, -- pm_bazinga, scope_change, validator_verdict, tl_issues, etc.
event_payload TEXT, -- JSON string event payload
idempotency_key TEXT, -- Prevents duplicate events: {session}|{group}|{type}|{iteration}
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
-- Indexes
CREATE INDEX idx_logs_session ON orchestration_logs(session_id, timestamp DESC);
CREATE INDEX idx_logs_agent_type ON orchestration_logs(session_id, agent_type);
-- Reasoning-specific indexes (partial indexes for efficiency)
CREATE INDEX idx_logs_reasoning ON orchestration_logs(session_id, log_type, reasoning_phase)
WHERE log_type = 'reasoning';
CREATE INDEX idx_logs_group_reasoning ON orchestration_logs(session_id, group_id, log_type)
WHERE log_type = 'reasoning';
-- Event idempotency index (v17) - prevents duplicate events with same key
CREATE UNIQUE INDEX idx_logs_idempotency
ON orchestration_logs(session_id, event_subtype, group_id, idempotency_key)
WHERE idempotency_key IS NOT NULL AND log_type = 'event';
```
**Columns:**
- `id`: Auto-increment primary key
- `session_id`: Foreign key to sessions table
- `timestamp`: When the interaction occurred
- `iteration`: Orchestration iteration number
- `agent_type`: Type of agent (accepts any agent type for extensibility: `pm`, `developer`, `qa_expert`, `tech_lead`, `orchestrator`, `investigator`, `requirements_engineer`, `senior_software_engineer`, or any future agent types)
- `agent_id`: Specific agent instance (e.g., `developer_1`)
- `content`: Full agent response text or reasoning content
- `log_type`: Entry type - `interaction` (default) for normal logs, `reasoning` for reasoning capture
- `reasoning_phase`: Phase of reasoning (only for log_type='reasoning'):
- `understanding`: Initial problem comprehension
- `approach`: Strategy selection
- `decisions`: Key choices made
- `risks`: Identified risks/concerns
- `blockers`: Issues preventing progress
- `pivot`: Strategy changes mid-execution
- `completion`: Final summary/outcome
- `confidence_level`: Agent's confidence in reasoning (`high`, `medium`, `low`)
- `references_json`: JSON array of file paths consulted during reasoning
- `redacted`: 1 if secrets were detected and redacted from content
- `group_id`: Task group ID for associating reasoning with specific work
- `event_subtype`: (v17) Event type for log_type='event': `pm_bazinga`, `scope_change`, `validator_verdict`, `tl_issues`, `tl_issue_responses`, `investigation_iteration`, etc.
- `event_payload`: (v17) JSON string event payload (secrets are scanned and redacted)
- `idempotency_key`: (v17) Prevents duplicate events. Recommended format: `{session_id}|{group_id}|{event_type}|{iteration}`
**Indexes:**
- `idx_logs_session`: Fast session-based queries sorted by time
- `idx_logs_agent_type`: Filter by agent type efficiently
- `idx_logs_reasoning`: Efficient reasoning queries by phase (partial index)
- `idx_logs_group_reasoning`: Efficient reasoning queries by group (partial index)
- `idx_logs_idempotency`: (v17) Unique constraint for event idempotency - prevents duplicate events with same key. Uses INSERT-first pattern with IntegrityError catch for race-safe concurrent writes.
**Usage Example - Interactions:**
```python
# Log agent interaction
db.log_interaction(
session_id='bazinga_123',
agent_type='developer',
content='Implemented authentication...',
iteration=5,
agent_id='developer_1'
)
# Query recent logs
logs = db.get_logs('bazinga_123', limit=10, agent_type='developer')
```
**Usage Example - Reasoning Capture:**
```python
# Save agent reasoning (auto-redacts secrets)
result = db.save_reasoning(
session_id='bazinga_123',
group_id='group_a',
agent_type='developer',
reasoning_phase='understanding',
content='Analyzing HIN OAuth2 requirements...',
confidence='high',
references=['src/auth/oauth.py', 'docs/hin-spec.md']
)
# Get reasoning entries for a group
reasoning = db.get_reasoning(
session_id='bazinga_123',
group_id='group_a',
agent_type='developer',
phase='understanding'
)
# Get full reasoning timeline
timeline = db.reasoning_timeline(
session_id='bazinga_123',
group_id='group_a',
format='markdown'
)
```
---
### state_snapshots
Stores state snapshots over time (replaces `pm_state.json`, `orchestrator_state.json`, etc.).
```sql
CREATE TABLE state_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
state_type TEXT CHECK(state_type IN ('pm', 'orchestrator', 'group_status', 'investigation')),
group_id TEXT DEFAULT 'global', -- Isolation key for parallel mode (v18)
state_data TEXT NOT NULL, -- JSON format
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
UNIQUE(session_id, state_type, group_id) -- UPSERT support (v18)
)
-- Indexes
CREATE INDEX idx_state_session_type ON state_snapshots(session_id, state_type, timestamp DESC);
CREATE INDEX idx_state_group ON state_snapshots(session_id, state_type, group_id);
```
**Columns:**
- `id`: Auto-increment primary key
- `session_id`: Foreign key to sessions table
- `timestamp`: When the state was saved
- `state_type`: Type of state (`pm`, `orchestrator`, `group_status`, `investigation`)
- `group_id`: Group isolation key for parallel mode (default: `'global'`). Session-level states (pm, orchestrator) use 'global'. Investigation states use the task group ID.
- `state_data`: Complete state as JSON string
**UPSERT Behavior (v18):**
The `UNIQUE(session_id, state_type, group_id)` constraint enables UPSERT. Saving state with the same session/type/group replaces the previous state rather than creating duplicates.
**Usage Example:**
```python
# Save PM state (session-level, uses default group_id='global')
pm_state = {
'mode': 'parallel',
'iteration': 5,
'task_groups': [...]
}
db.save_state('bazinga_123', 'pm', pm_state)
# Retrieve latest PM state
current_state = db.get_latest_state('bazinga_123', 'pm')
# Save investigation state (group-specific)
inv_state = {
'iteration': 3,
'status': 'root_cause_found',
'hypotheses': [...]
}
db.save_state('bazinga_123', 'investigation', inv_state, group_id='AUTH')
# Retrieve investigation state for specific group
inv_state = db.get_latest_state('bazinga_123', 'investigation', group_id='AUTH')
```
**CLI Usage:**
```bash
# Save state with group_id
python3 .../bazinga_db.py --quiet save-state "sess_123" "investigation" \
--state-file /tmp/state.json --group-id "AUTH"
# Get state with group_id
python3 .../bazinga_db.py --quiet get-state "sess_123" "investigation" --group-id "AUTH"
```
---
### task_groups
Normalized task group tracking (extracted from `pm_state.json`).
```sql
CREATE TABLE task_groups (
id TEXT NOT NULL,
session_id TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT CHECK(status IN (
'pending', 'in_progress', 'completed', 'failed',
'approved_pending_merge', 'merging'
)) DEFAULT 'pending',
assigned_to TEXT,
revision_count INTEGER DEFAULT 0,
last_review_status TEXT CHECK(last_review_status IN ('APPROVED', 'CHANGES_REQUESTED', NULL)),
feature_branch TEXT,
merge_status TEXT CHECK(merge_status IN ('pending', 'in_progress', 'merged', 'conflict', 'test_failure', NULL)),
complexity INTEGER CHECK(complexity BETWEEN 1 AND 10),
initial_tier TEXT CHECK(initial_tier IN ('Developer', 'Senior Software Engineer')) DEFAULT 'Developer',
context_references TEXT, -- JSON array of context package IDs relevant to this group
specializations TEXT, -- JSON array of specialization file paths (e.g., ["bazinga/templates/specializations/01-languages/typescript.md"])
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, session_id),
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
-- Indexes
CREATE INDEX idx_taskgroups_session ON task_groups(session_id, status);
```
**Columns:**
- `id`: Unique task group identifier (e.g., `group_a`)
- `session_id`: Foreign key to sessions table
- `name`: Human-readable task group name
- `status`: Current status (`pending`, `in_progress`, `completed`, `failed`, `approved_pending_merge`, `merging`)
- `assigned_to`: Agent ID assigned to this group
- `revision_count`: Number of revision cycles (for escalation)
- `last_review_status`: Tech Lead review result (APPROVED or CHANGES_REQUESTED)
- `feature_branch`: Developer's feature branch for this group (e.g., `feature/group-A-jwt-auth`)
- `merge_status`: Tracks merge state (`pending`, `in_progress`, `merged`, `conflict`, `test_failure`, NULL)
- `complexity`: Task complexity score (1-10), set by PM
- `initial_tier`: Initial implementation tier (`Developer` or `Senior Software Engineer`), set by PM
- `context_references`: JSON array of context package IDs relevant to this group (e.g., `[1, 3, 5]`)
- `specializations`: JSON array of specialization file paths assigned by PM (e.g., `["bazinga/templates/specializations/01-languages/typescript.md", "bazinga/templates/specializations/02-frameworks-frontend/nextjs.md"]`)
- `created_at`: When task group was created
- `updated_at`: Last modification timestamp
**Status Flow (Merge-on-Approval):**
```
pending → in_progress → approved_pending_merge → merging → completed
↘ in_progress (conflict, back to dev)
```
**Merge Status Flow:**
```
NULL (not yet approved)
↓
pending (TL approved, waiting for merge)
↓
in_progress (Developer performing merge)
↓
merged (success)
OR conflict (git merge conflicts → dev fixes conflicts)
OR test_failure (tests failed after merge → dev fixes tests)
```
**Usage Example:**
```python
# Create task group with full PM fields
db.create_task_group(
'group_a', 'bazinga_123', 'Authentication',
status='pending',
complexity=7, # 1-3=Low (Dev), 4-6=Medium (SSE), 7-10=High (SSE)
initial_tier='Senior Software Engineer',
item_count=5,
component_path='backend/auth/',
specializations=['bazinga/templates/specializations/01-languages/typescript.md']
)
# Update task group with complexity (requires session_id for composite key)
db.update_task_group(
'group_a', 'bazinga_123',
status='completed',
complexity=5,
last_review_status='APPROVED',
specializations=['bazinga/templates/specializations/03-frameworks-backend/express.md']
)
# Get task groups (includes specializations, complexity, initial_tier)
groups = db.get_task_groups('bazinga_123', status='in_progress')
# Returns: [{'id': 'group_a', 'complexity': 7, 'initial_tier': 'Senior Software Engineer', ...}]
```
---
### token_usage
Tracks token consumption per agent.
```sql
CREATE TABLE token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
agent_type TEXT NOT NULL,
agent_id TEXT,
tokens_estimated INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
-- Indexes
CREATE INDEX idx_tokens_session ON token_usage(session_id, agent_type);
```
**Columns:**
- `id`: Auto-increment primary key
- `session_id`: Foreign key to sessions table
- `timestamp`: When tokens were consumed
- `agent_type`: Type of agent
- `agent_id`: Specific agent instance
- `tokens_estimated`: Estimated token count
**Usage Example:**
```python
# Log token usage
db.log_tokens('bazinga_123', 'developer', 15000, agent_id='developer_1')
# Get token summary
summary = db.get_token_summary('bazinga_123', by='agent_type')
# Returns: {'pm': 5000, 'developer': 25000, 'qa': 8000, 'total': 38000}
```
---
### skill_outputs
Stores skill execution outputs (replaces individual JSON files).
```sql
CREATE TABLE skill_outputs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
skill_name TEXT NOT NULL,
output_data TEXT NOT NULL, -- JSON format
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
-- Indexes
CREATE INDEX idx_skill_session ON skill_outputs(session_id, skill_name, timestamp DESC);
```
**Columns:**
- `id`: Auto-increment primary key
- `session_id`: Foreign key to sessions table
- `timestamp`: When skill output was saved
- `skill_name`: Name of skill (e.g., `security_scan`, `test_coverage`)
- `output_data`: Complete output as JSON string
**Usage Example:**
```python
# Save skill output
security_results = {'vulnerabilities': [...], 'severity': 'high'}
db.save_skill_output('bazinga_123', 'security_scan', security_results)
# Retrieve latest skill output
results = db.get_skill_output('bazinga_123', 'security_scan')
```
---
### configuration
System-wide configuration storage (replaces `skills_config.json`, `testing_config.json`, etc.).
```sql
CREATE TABLE configuration (
key TEXT PRIMARY KEY,
value TEXT NOT NULL, -- JSON format
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
```
**Columns:**
- `key`: Configuration key (e.g., `skills_config`, `testing_mode`)
- `value`: Configuration value as JSON string
- `updated_at`: Last update timestamp
**Usage Example:**
```python
# Set configuration
db.set_config('testing_mode', {'framework': 'full', 'coverage_threshold': 80})
# Get configuration
config = db.get_config('testing_mode')
```
---
### decisions
Audit trail of orchestrator decisions.
```sql
CREATE TABLE decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
iteration INTEGER,
decision_type TEXT NOT NULL,
decision_data TEXT NOT NULL, -- JSON format
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
-- Indexes
CREATE INDEX idx_decisions_session ON decisions(session_id, timestamp DESC);
```
**Columns:**
- `id`: Auto-increment primary key
- `session_id`: Foreign key to sessions table
- `timestamp`: When decision was made
- `iteration`: Orchestration iteration number
- `decision_type`: Type of decision (e.g., `spawn_agent`, `escalate_model`)
- `decision_data`: Decision details as JSON string
---
### model_config
Stores agent model assignments for dynamic model selection.
```sql
CREATE TABLE model_config (
agent_role TEXT PRIMARY KEY,
model TEXT NOT NULL,
rationale TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
-- Default data (keep in sync with bazinga/model_selection.json)
INSERT INTO model_config (agent_role, model, rationale) VALUES
('developer', 'sonnet', 'Balanced capability for implementation tasks'),
('senior_software_engineer', 'opus', 'Complex failures requiring deep reasoning'),
('qa_expert', 'sonnet', 'Test generation and validation'),
('tech_lead', 'opus', 'Architectural decisions - non-negotiable'),
('project_manager', 'opus', 'Strategic planning - non-negotiable'),
('investigator', 'opus', 'Root cause analysis'),
('validator', 'sonnet', 'BAZINGA verification');
```
**Columns:**
- `agent_role`: Agent role identifier (primary key)
- `model`: Model name (e.g., `haiku`, `sonnet`, `opus`)
- `rationale`: Explanation for model choice
- `updated_at`: Last update timestamp
**Usage Example:**
```python
# Get all model assignments
models = db.get_model_config()
# Returns: {'developer': 'sonnet', 'senior_software_engineer': 'opus', ...}
# Get model for specific agent (before update)
model = db.get_agent_model('developer')
# Returns: 'sonnet'
# Update model for an agent
db.set_model_config('developer', 'opus', 'Upgrading for complex project')
# Get model after update
model = db.get_agent_model('developer')
# Returns: 'opus'
```
**Why This Table:**
- Allows runtime model updates without code changes
- Future-proof for new model releases (Claude 4, etc.)
- Single source of truth for model assignments
- Orchestrator queries this at initialization
---
### context_packages
Stores context packages for inter-agent communication (research, failures, decisions, handoffs).
```sql
CREATE TABLE context_packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
group_id TEXT, -- NULL for global/session-wide packages
package_type TEXT NOT NULL CHECK(package_type IN ('research', 'failures', 'decisions', 'handoff', 'investigation')),
file_path TEXT NOT NULL,
producer_agent TEXT NOT NULL,
priority TEXT DEFAULT 'medium' CHECK(priority IN ('low', 'medium', 'high', 'critical')),
summary TEXT NOT NULL, -- Brief description for routing (max 200 chars)
size_bytes INTEGER, -- File size for budget decisions
version INTEGER DEFAULT 1,
supersedes_id INTEGER, -- Previous version if updated
scope TEXT DEFAULT 'group' CHECK(scope IN ('group', 'global')),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(session_id) ON DELETE CASCADE,
FOREIGN KEY (supersedes_id) REFERENCES context_packages(id)
)
-- Indexes
CREATE INDEX idx_cp_session ON context_packages(session_id);
CREATE INDEX idx_cp_group ON context_packages(group_id);
CREATE INDEX idx_cp_type ON context_packages(package_type);
CREATE INDEX idx_cp_priority ON context_packages(priority);
CREATE INDEX idx_cp_scope ON context_packages(scope);
```
**Columns:**
- `id`: Auto-increment primary key
- `session_id`: Foreign key to sessions table
- `group_id`: Task group ID (NULL for global packages)
- `package_type`: Type of context (`research`, `failures`, `decisions`, `handoff`, `investigation`)
- `file_path`: Path to the context package markdown file
- `producer_agent`: Agent that created the package
- `priority`: Routing priority (`low`, `medium`, `high`, `critical`)
- `summary`: Brief description for spawn prompts (max 200 chars)
- `size_bytes`: File size for token budget decisions
- `version`: Package version (incremented on updates)
- `supersedes_id`: Reference to previous version if updated
- `scope`: Whether package is group-specific or session-wide
- `created_at`: When package was created
**Usage Example:**
```python
# Create context package
db.save_context_package(
session_id='bazinga_123',
group_id='group_a',
package_type='research',
file_path='bazinga/artifacts/bazinga_123/context/research-group_a-hin.md',
producer_agent='requirements_engineer',
consumers=['developer', 'senior_software_engineer'],
priority='high',
summary='HIN OAuth2 endpoints, scopes, security requirements'
)
# Get packages for agent spawn
packages = db.get_context_packages(
session_id='bazinga_123',
group_id='group_a',
agent_type='developer',
limit=3
)
```
---
### context_package_consumers
Join table for tracking per-agent consumption of context packages.
```sql
CREATE TABLE context_package_consumers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL,
agent_type TEXT NOT NULL,
consumed_at TIMESTAMP, -- NULL = not yet consumed
iteration INTEGER DEFAULT 1, -- Which iteration of the agent consumed it
FOREIGN KEY (package_id) REFERENCES context_packages(id) ON DELETE CASCADE,
UNIQUE(package_id, agent_type, iteration)
)
-- Indexes
CREATE INDEX idx_cpc_package ON context_package_consumers(package_id);
CREATE INDEX idx_cpc_agent ON context_package_consumers(agent_type);
CREATE INDEX idx_cpc_pending ON context_package_consumers(consumed_at) WHERE consumed_at IS NULL;
```
**Columns:**
- `id`: Auto-increment primary key
- `package_id`: Foreign key to context_packages table
- `agent_type`: Type of agent that can consume (`developer`, `qa_expert`, etc.)
- `consumed_at`: When the package was consumed (NULL if pending)
- `iteration`: Which iteration of the agent consumed it (allows re-consumption)
**Usage Example:**
```python
# Mark package as consumed
db.mark_context_consumed(
package_id=1,
agent_type='developer',
iteration=2
)
# Get pending packages for agent
pending = db.get_pending_context(
session_id='bazinga_123',
agent_type='developer',
group_id='group_a'
)
```
**Why Join Table (Not JSON Array):**
- Proper indexing for efficient lookups
- Per-consumer tracking (multiple agents can consume same package)
- Supports iteration-based re-consumption
- Clean queries without string pattern matching
---
## Query Examples
### Get Dashboard Overview
```python
snapshot = db.get_dashboard_snapshot('bazinga_123')
# Returns complete dashboard state in one query
```
### Filter Logs by Time Range
```python
logs = db.get_logs(
session_id='bazinga_123',
since='2025-01-12 14:00:00',
limit=100
)
```
### Get Incomplete Tasks
```python
tasks = db.get_task_groups('bazinga_123', status='in_progress')
```
### Token Usage Analysis
```python
by_type = db.get_token_summary('bazinga_123', by='agent_type')
by_agent = db.get_token_summary('bazinga_123', by='agent_id')
```
### Custom Analytics Query
```python
results = db.query("""
SELECT agent_type, COUNT(*) as interaction_count,
AVG(LENGTH(content)) as avg_response_length
FROM orchestration_logs
WHERE session_id = ?
GROUP BY agent_type
""", ('bazinga_123',))
```
---
## Migration from Files
| Old File | New Table | Migration Path |
|----------|-----------|----------------|
| `orchestration-log.md` | `orchestration_logs` | Parse markdown, insert rows |
| `pm_state.json` | `state_snapshots` + `task_groups` | JSON to normalized tables |
| `orchestrator_state.json` | `state_snapshots` | JSON to single row |
| `group_status.json` | `task_groups` | JSON to table rows |
| `security_scan.json` | `skill_outputs` | JSON to single row |
| `sessions_history.json` | `sessions` | JSON array to table rows |
---
## Performance Considerations
### WAL Mode Benefits
- **Concurrent Reads**: Multiple readers don't block each other
- **Non-blocking Reads**: Reads don't block writes (and vice versa)
- **Better Performance**: ~2-5x faster than default journal mode
### Index Usage
All high-frequency queries have supporting indexes:
- Session-based queries: `idx_logs_session`, `idx_state_session_type`, `idx_taskgroups_session`
- Time-ordered queries: Timestamps in descending order for recent data
- Filtering queries: Agent type, skill name indexes
### Connection Management
- Connection timeout: 30 seconds (handles lock contention)
- Foreign keys enabled: Ensures referential integrity
- Row factory: `sqlite3.Row` for dict-like access
---
## Backup and Maintenance
### Backup Database
```bash
sqlite3 bazinga.db ".backup bazinga_backup.db"
```
### Vacuum (Reclaim Space)
```bash
sqlite3 bazinga.db "VACUUM"
```
### Check Integrity
```bash
sqlite3 bazinga.db "PRAGMA integrity_check"
```
---
## Deterministic Orchestration Tables (v13)
These tables support the deterministic prompt building and workflow routing system.
### workflow_transitions
Stores state machine transitions for deterministic workflow routing. Seeded from `workflow/transitions.json` at session start.
```sql
CREATE TABLE workflow_transitions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
current_agent TEXT NOT NULL,
response_status TEXT NOT NULL,
next_agent TEXT,
action TEXT NOT NULL,
include_context TEXT, -- JSON array of context types to include
escalation_check INTEGER DEFAULT 0, -- 1 to check revision count for escalation
model_override TEXT, -- Override model for next agent
fallback_agent TEXT, -- Fallback if primary agent unavailable
bypass_qa INTEGER DEFAULT 0, -- 1 to skip QA (e.g., RE tasks)
max_parallel INTEGER, -- Max parallel spawns for batch actions
then_action TEXT, -- Action after primary (e.g., 'check_phase')
UNIQUE(current_agent, response_status)
)
-- Indexes
CREATE INDEX idx_wt_agent ON workflow_transitions(current_agent);
```
**Columns:**
- `current_agent`: Agent that produced the response (developer, qa_expert, tech_lead, etc.)
- `response_status`: Status code from agent (READY_FOR_QA, PASS, APPROVED, etc.)
- `next_agent`: Agent to spawn next (NULL for end states)
- `action`: Action type (`spawn`, `respawn`, `spawn_batch`, `validate_then_end`, `pause_for_user`, `end_session`)
- `include_context`: JSON array of context types to pass (e.g., `["dev_output", "test_results"]`)
- `escalation_check`: If 1, check revision_count against escalation threshold
- `model_override`: Override model (e.g., `opus` for escalation)
- `fallback_agent`: Alternative agent if primary unavailable
- `bypass_qa`: If 1, skip QA Expert (for RE tasks)
- `max_parallel`: Maximum parallel spawns for `spawn_batch`
- `then_action`: Secondary action (e.g., `check_phase` after merge)
**Usage Example:**
```python
# Get next transition (called by workflow_router.py)
transition = db.get_transition('developer', 'READY_FOR_QA')
# Returns: {'next_agent': 'qa_expert', 'action': 'spawn', ...}
```
---
### agent_markers
Stores required markers that MUST be present in agent prompts. Seeded from `workflow/agent-markers.json` at session start.
```sql
CREATE TABLE agent_markers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_type TEXT NOT NULL UNIQUE,
required_markers TEXT NOT NULL, -- JSON array of required strings
workflow_markers TEXT -- JSON array of workflow-related strings
)
```
**Columns:**
- `agent_type`: Type of agent (developer, qa_expert, tech_lead, etc.)
- `required_markers`: JSON array of strings that MUST appear in prompt
- `workflow_markers`: JSON array of workflow-related strings (informational)
**Usage Example:**
```python
# Get markers for validation (called by prompt_builder.py)
markers = db.get_markers('developer')
# Returns: {'required': ['NO DELEGATION', 'READY_FOR_QA', ...], 'workflow': [...]}
# Validate prompt contains all markers
missing = [m for m in markers['required'] if m not in prompt]
if missing:
raise ValueError(f"Prompt missing markers: {missing}")
```
---
### workflow_special_rules
Stores special routing rules (testing mode, escalation, security). Seeded from `transitions.json` `_special_rules` at session start.
```sql
CREATE TABLE workflow_special_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rule_name TEXT NOT NULL UNIQUE,
description TEXT,
config TEXT NOT NULL -- JSON object with rule configuration
)
```
**Columns:**
- `rule_name`: Unique rule identifier (e.g., `testing_mode_disabled`, `escalation_after_failures`)
- `description`: Human-readable description
- `config`: JSON object with rule-specific configuration
**Usage Example:**
```python
# Get escalation rule (called by workflow_router.py)
rule = db.get_special_rule('escalation_after_failures')
# Returns: {'threshold': 2, 'escalation_agent': 'senior_software_engineer'}
# Check if escalation needed
if revision_count >= rule['threshold']:
next_agent = rule['escalation_agent']
```
**Available Rules:**
- `testing_mode_disabled`: Skip QA entirely when testing is disabled
- `testing_mode_minimal`: Skip QA Expert when testing is minimal
- `escalation_after_failures`: Escalate to SSE after N failures
- `security_sensitive`: Force SSE + mandatory TL review for security tasks
- `research_tasks`: Route to RE with limited parallelism
```
### references/command_examples.md
```markdown
# BAZINGA-DB Command Examples
This document provides practical examples of using the bazinga-db skill commands.
## Command Format
All commands follow this pattern:
```bash
python3 /path/to/bazinga_db.py --db /path/to/bazinga.db <command> [arguments...]
```
For brevity, examples below use (relative paths for portability):
```bash
$DB_SCRIPT = .claude/skills/bazinga-db/scripts/bazinga_db.py
$DB_PATH = bazinga/bazinga.db
```
---
## Session Management
### Create New Session
```bash
python3 $DB_SCRIPT --db $DB_PATH create-session \
"bazinga_20250112_143022" \
"parallel" \
"Add user authentication feature with OAuth2"
```
### Update Session Status
```bash
# Mark session as completed
python3 $DB_SCRIPT --db $DB_PATH update-session-status \
"bazinga_20250112_143022" \
"completed"
# Mark as failed
python3 $DB_SCRIPT --db $DB_PATH update-session-status \
"bazinga_20250112_143022" \
"failed"
```
### Get Session Details
```bash
python3 $DB_SCRIPT --db $DB_PATH get-session \
"bazinga_20250112_143022"
```
---
## Logging Agent Interactions
### Log PM Interaction
```bash
python3 $DB_SCRIPT --db $DB_PATH log-interaction \
"bazinga_123" \
"pm" \
"Analyzed requirements and created 3 task groups..." \
1 \
"pm_main"
```
### Log Developer Interaction
```bash
python3 $DB_SCRIPT --db $DB_PATH log-interaction \
"bazinga_123" \
"developer" \
"Implemented authentication controller..." \
5 \
"developer_1"
```
### Log Orchestrator Decision
```bash
python3 $DB_SCRIPT --db $DB_PATH log-interaction \
"bazinga_123" \
"orchestrator" \
"Spawning 2 developers in parallel for Groups A and B" \
3
```
---
## State Management
### Save PM State
```bash
python3 $DB_SCRIPT --db $DB_PATH save-state \
"bazinga_123" \
"pm" \
'{"mode":"parallel","iteration":3,"task_groups":[{"id":"group_a","status":"completed"}]}'
```
### Save Orchestrator State
```bash
python3 $DB_SCRIPT --db $DB_PATH save-state \
"bazinga_123" \
"orchestrator" \
'{"phase":"development","active_agents":["developer_1","developer_2"],"iteration":10}'
```
### Retrieve Latest State
```bash
# Get PM state
python3 $DB_SCRIPT --db $DB_PATH get-state \
"bazinga_123" \
"pm"
# Get orchestrator state
python3 $DB_SCRIPT --db $DB_PATH get-state \
"bazinga_123" \
"orchestrator"
```
---
## Task Group Management
### Create Task Group
```bash
# Basic creation
python3 $DB_SCRIPT --db $DB_PATH create-task-group \
"group_a" \
"bazinga_123" \
"Authentication Implementation" \
"pending"
# Full creation with all PM fields
python3 $DB_SCRIPT --db $DB_PATH create-task-group \
"group_a" \
"bazinga_123" \
"Authentication Implementation" \
"pending" \
--complexity 7 \
--initial_tier "Senior Software Engineer" \
--item_count 5 \
--component-path "backend/auth/" \
--specializations '["bazinga/templates/specializations/01-languages/python.md"]'
```
### Update Task Group Status
```bash
python3 $DB_SCRIPT --db $DB_PATH update-task-group \
"group_a" \
"bazinga_123" \
--status "completed" \
--last_review_status "APPROVED"
# Update complexity score
python3 $DB_SCRIPT --db $DB_PATH update-task-group \
"group_a" \
"bazinga_123" \
--complexity 5
```
### Increment Revision Count
```bash
python3 $DB_SCRIPT --db $DB_PATH update-task-group \
"group_a" \
"bazinga_123" \
--revision_count 2
```
### Assign Task Group
```bash
python3 $DB_SCRIPT --db $DB_PATH update-task-group \
"group_a" \
"bazinga_123" \
--assigned_to "developer_1"
```
---
## Reading Logs
### Stream Recent Logs (Markdown Format)
```bash
# Get last 50 logs
python3 $DB_SCRIPT --db $DB_PATH stream-logs \
"bazinga_123"
# With pagination
python3 $DB_SCRIPT --db $DB_PATH stream-logs \
"bazinga_123" \
50 \
100 # offset
```
### Get Logs (JSON Format)
```bash
# Recent logs
python3 $DB_SCRIPT --db $DB_PATH get-logs \
"bazinga_123" \
--limit 10
# Filter by agent type
python3 $DB_SCRIPT --db $DB_PATH get-logs \
"bazinga_123" \
--agent-type developer \
--limit 20
# Time-range query
python3 $DB_SCRIPT --db $DB_PATH get-logs \
"bazinga_123" \
--since "2025-01-12 14:00:00" \
--limit 100
```
---
## Token Usage Tracking
### Log Token Usage
```bash
python3 $DB_SCRIPT --db $DB_PATH log-tokens \
"bazinga_123" \
"developer" \
15000 \
"developer_1"
```
### Get Token Summary
```bash
# By agent type
python3 $DB_SCRIPT --db $DB_PATH token-summary \
"bazinga_123" \
agent_type
# By agent ID
python3 $DB_SCRIPT --db $DB_PATH token-summary \
"bazinga_123" \
agent_id
```
Example output:
```json
{
"pm": 5000,
"developer": 25000,
"qa": 8000,
"tech_lead": 7000,
"total": 45000
}
```
---
## Skill Outputs
### Save Skill Output
```bash
python3 $DB_SCRIPT --db $DB_PATH save-skill-output \
"bazinga_123" \
"security_scan" \
'{"vulnerabilities":5,"severity":"medium","details":[...]}'
```
### Retrieve Skill Output
```bash
python3 $DB_SCRIPT --db $DB_PATH get-skill-output \
"bazinga_123" \
"security_scan"
```
---
## Configuration
**REMOVED:** Configuration table no longer exists (2025-11-21).
See `research/empty-tables-analysis.md` for details.
Use file-based configuration instead:
- Skills config: `bazinga/skills_config.json`
- Testing config: `bazinga/testing_config.json`
---
## Dashboard Data
### Get Complete Dashboard Snapshot
```bash
python3 $DB_SCRIPT --db $DB_PATH dashboard-snapshot \
"bazinga_123"
```
Returns:
```json
{
"session": {...},
"orchestrator_state": {...},
"pm_state": {...},
"task_groups": [...],
"token_summary": {...},
"recent_logs": [...]
}
```
---
## Advanced Queries
### Custom SQL Query
```bash
# Agent interaction counts
python3 $DB_SCRIPT --db $DB_PATH query \
"SELECT agent_type, COUNT(*) as count FROM orchestration_logs WHERE session_id = 'bazinga_123' GROUP BY agent_type"
# Average response length
python3 $DB_SCRIPT --db $DB_PATH query \
"SELECT agent_type, AVG(LENGTH(content)) as avg_length FROM orchestration_logs WHERE session_id = 'bazinga_123' GROUP BY agent_type"
```
---
## Common Workflows
### Orchestrator Spawn Workflow
```bash
# 1. Log orchestrator decision
python3 $DB_SCRIPT --db $DB_PATH log-interaction \
"$SESSION_ID" "orchestrator" "Spawning developer for group_a" $ITERATION
# 2. Update orchestrator state
python3 $DB_SCRIPT --db $DB_PATH save-state \
"$SESSION_ID" "orchestrator" \
"{\"active_agents\":[\"developer_1\"],\"iteration\":$ITERATION}"
# 3. Update task group
python3 $DB_SCRIPT --db $DB_PATH update-task-group \
"group_a" "$SESSION_ID" --status "in_progress" --assigned_to "developer_1"
```
### Developer Completion Workflow
```bash
# 1. Log developer completion
python3 $DB_SCRIPT --db $DB_PATH log-interaction \
"$SESSION_ID" "developer" "Implementation completed" $ITERATION "developer_1"
# 2. Log token usage
python3 $DB_SCRIPT --db $DB_PATH log-tokens \
"$SESSION_ID" "developer" 15000 "developer_1"
# 3. Update task group
python3 $DB_SCRIPT --db $DB_PATH update-task-group \
"group_a" "$SESSION_ID" --status "completed"
```
### Tech Lead Review Workflow
```bash
# 1. Log tech lead review
python3 $DB_SCRIPT --db $DB_PATH log-interaction \
"$SESSION_ID" "tech_lead" "Code review: Changes requested" $ITERATION
# 2. Update task group with review result
python3 $DB_SCRIPT --db $DB_PATH update-task-group \
"group_a" \
"$SESSION_ID" \
--last_review_status "CHANGES_REQUESTED" \
--revision_count 1
# 3. Save skill outputs (if skills ran)
python3 $DB_SCRIPT --db $DB_PATH save-skill-output \
"$SESSION_ID" "test_coverage" '{"coverage":75,"threshold":80}'
```
---
## Error Handling
### Check Database Exists
```bash
if [ ! -f "$DB_PATH" ]; then
echo "Database not initialized. Run init_db.py first."
python3 /path/to/init_db.py "$DB_PATH"
fi
```
### Retry on Lock
```bash
# The client has 30-second timeout built-in
# No manual retry needed - SQLite handles lock contention automatically
```
### Validate JSON Before Saving
```bash
# Use jq to validate JSON
echo '{"test":"data"}' | jq . && \
python3 $DB_SCRIPT --db $DB_PATH save-state "session" "pm" '{"test":"data"}'
```
---
## Integration with Bash Tool
When using from Claude Code's Bash tool:
```bash
# Set up variables (relative paths for portability)
DB_SCRIPT=".claude/skills/bazinga-db/scripts/bazinga_db.py"
DB_PATH="bazinga/bazinga.db"
SESSION_ID="bazinga_20250112_143022"
# Example: Log interaction and update state in one command chain
python3 "$DB_SCRIPT" --db "$DB_PATH" log-interaction \
"$SESSION_ID" "pm" "Created task breakdown" 1 && \
python3 "$DB_SCRIPT" --db "$DB_PATH" save-state \
"$SESSION_ID" "pm" '{"iteration":1,"mode":"parallel"}'
```
---
## Python API Usage (For Dashboard)
The `BazingaDB` class can be imported directly:
```python
from bazinga_db import BazingaDB
db = BazingaDB('/home/user/bazinga/bazinga/bazinga.db')
# Query logs
logs = db.get_logs('bazinga_123', limit=10, agent_type='developer')
# Get dashboard data
snapshot = db.get_dashboard_snapshot('bazinga_123')
# Stream logs in markdown
markdown = db.stream_logs('bazinga_123', limit=50)
```
```