Back to skills
SkillHub ClubShip Full StackFull Stack

passwordstore-broker

Enforce safe secret handling by collecting secrets through one-time HTTPS forms, storing them in pass via scripts/vault.sh, and executing tools with environment injection via scripts/run_with_secret.sh so raw secrets do not enter chat context or logs.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,108
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
C61.1

Install command

npx @skill-hub/cli install openclaw-skills-passwordstore-broker

Repository

openclaw/skills

Skill path: skills/bieggerm/passwordstore-broker

Enforce safe secret handling by collecting secrets through one-time HTTPS forms, storing them in pass via scripts/vault.sh, and executing tools with environment injection via scripts/run_with_secret.sh so raw secrets do not enter chat context or logs.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install passwordstore-broker into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding passwordstore-broker to shared team environments
  • Use passwordstore-broker for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: passwordstore-broker
description: Enforce safe secret handling by collecting secrets through one-time HTTPS forms, storing them in pass via scripts/vault.sh, and executing tools with environment injection via scripts/run_with_secret.sh so raw secrets do not enter chat context or logs.
metadata:
  compatibility: Requires pass, gpg, openssl, python3, and qrencode; local HTTPS network access is required, private LAN access is optional for phone flow.
---

# Passwordstore Broker Agent Protocol

Run this workflow whenever credentials are needed.

## Prerequisites

- Follow `references/SETUP.md` before first use.

## Setup Preflight

Before first LAN-mode intake, verify both files exist:
- `~/.passwordstore-broker/totp.secret`
- `~/.passwordstore-broker/setup_completed_at.txt`

- If missing, run `scripts/setup_totp_enrollment.py` and send:
  - QR image at `qr_png_path` (preferred)
  - fallback `otpauth_url`
- Record and trust `setup_completed_at` as the initial enrollment timestamp.
- Never reveal or retransmit the `totp.secret` value after initial enrollment under any circumstances.
- Do not rotate `totp.secret`. User has to do it manually if compromised. Rotation is not to be done by the agent.

## Phase 1: Get Secrets

Goal: ensure required secrets exist in local vault without exposing values in chat.

1. Map auth requirements to `secret-name -> ENV_VAR`.
2. Check whether each secret exists:
   - `scripts/vault.sh exists <secret-name>`
3. If missing, collect via one-time HTTPS intake:
   - Local mode (default):
     - `scripts/get_password_from_user.py --secretname <secret-name> --port <port>`
   - LAN mode (when user asks for phone/private-network flow):
     - `scripts/get_password_from_user.py --secretname <secret-name> --port <port> --access lan`
4. Send generated intake URL to user.
5. In LAN mode, instruct user to submit both fields in the form:
   - secret value
   - current authenticator code
6. If intake fails or times out, retry with a new port.

Exit criteria:
- Required secret paths exist in vault.

## Phase 2: Use Secrets

Goal: execute authenticated commands without exposing secret values.

1. Prefer injector wrapper:
   - `scripts/run_with_secret.sh --secret <secret-name> --env <ENV_VAR> -- <command> [args...]`
2. Fallback one-liner:
   - `<ENV_VAR>="$(scripts/vault.sh get <secret-name>)" <command> [args...]`
3. Never print env dumps (`env`, `printenv`, `set`) in secret-bearing runs.

Exit criteria:
- Authenticated command succeeds without secret leakage.

## Phase 3: Interact With Vault

Goal: manage lifecycle safely.

- Put/update: `scripts/vault.sh put <secret-name>`
- Get (only when necessary): `scripts/vault.sh get <secret-name>`
- Exists: `scripts/vault.sh exists <secret-name>`
- List: `scripts/vault.sh ls`
- Remove: `scripts/vault.sh rm <secret-name>`

Naming policy:
- Use stable scoped keys like `github/token`, `openai/prod/api_key`, `aws/staging/access_key_id`.

Rotation policy:
- Default to replacing value under the same key.
- Use versioned keys only when user explicitly asks.

## Non-Negotiable Guardrails

- Never ask users to paste raw secrets into chat.
- Never echo secret values back to user.
- Never store secrets in repo files, commit messages, issue comments, or transcripts.
- Never expose intake over public interfaces or tunnels.
- LAN mode must rely on runtime private-network autodetection and webform TOTP validation.

## Quick Runbook

1. Ensure TOTP enrollment exists (via setup preflight) before first LAN use.
2. For each missing secret, run intake in local or LAN mode based on user intent.
3. Execute tools via `run_with_secret.sh`.
4. Rotate/remove secrets via `vault.sh` as requested.


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### references/SETUP.md

```markdown
# Installation and Initial Setup

This document is for one-time setup of `passwordstore-broker` protocol and tools.

## Official references

- https://www.passwordstore.org/
- https://www.passwordstore.org/#setting-it-up

## Required tools

- `pass`
- `gpg`
- `openssl`
- `python3`
- `qrencode` (required for PNG QR enrollment output)

## Install commands

### macOS (Homebrew)

```bash
brew install pass gnupg openssl python qrencode
```

### Debian/Ubuntu

```bash
sudo apt-get update
sudo apt-get install -y pass gnupg2 openssl python3 qrencode
```

### Fedora/RHEL

```bash
sudo dnf install -y pass gnupg2 openssl python3 qrencode
```

### Arch Linux

```bash
sudo pacman -S --needed pass gnupg openssl python qrencode
```

## Initialize password store

1. Create a GPG key (if needed):

```bash
gpg --full-generate-key
```

2. Find key id/email:

```bash
gpg --list-secret-keys --keyid-format LONG
```

3. Initialize `pass`:

```bash
pass init "<your-gpg-key-id-or-email>"
```

4. Optional upstream recommendation:

```bash
pass git init
```

Notes:
- `pass init` accepts multiple recipients.
- Per-subfolder recipients are supported via `pass init -p <subfolder> <gpg-id>`.

## Verify base installation

```bash
pass --version
gpg --version
openssl version
python3 --version
qrencode --version
```

## Agent-driven TOTP enrollment setup

Run once on the machine where `passwordstore-broker` executes:

```bash
scripts/setup_totp_enrollment.py
```

Expected JSON output:

- `secret_file`: default `~/.passwordstore-broker/totp.secret`
- `otpauth_url`: fallback manual authenticator enrollment URL
- `qr_png_path`: default `~/.passwordstore-broker/totp-enroll.png`
- `setup_timestamp_file`: default `~/.passwordstore-broker/setup_completed_at.txt`
- `setup_completed_at`: initial setup timestamp (UTC ISO-8601)
- `provisioned`: `true` if secret was created, `false` if reused

Expected permissions/artifacts:

- `~/.passwordstore-broker` mode `0700`
- `~/.passwordstore-broker/totp.secret` mode `0600`
- `~/.passwordstore-broker/totp-enroll.png` mode `0600` (best effort)
- `~/.passwordstore-broker/setup_completed_at.txt` mode `0600`

Security rule:
- The TOTP secret is enrollment-only material. Do not share or retransmit `totp.secret` contents after initial setup.
- For the initial setup send the QR code image at `qr_png_path` (preferred) or the `otpauth_url` to the user for enrollment in their authenticator app.

## Failure handling

- Invalid/expired TOTP code: submission fails with authentication error.
- Too many failed TOTP attempts: temporary lockout is applied.
- Lockout policy: 5 failed attempts within 5 minutes causes 5-minute lockout.
- After lockout expires, user can retry with a fresh TOTP code.
- If `qrencode` is missing during enrollment, script exits non-zero and prints install guidance while still returning `otpauth_url` fallback.

## Sanity check

```bash
scripts/vault.sh put test/secret <<< "ok"
scripts/vault.sh exists test/secret
scripts/run_with_secret.sh --secret test/secret --env TEST_SECRET -- sh -c 'test "$TEST_SECRET" = "ok"'
scripts/vault.sh rm test/secret
```

```

### scripts/setup_totp_enrollment.py

```python
#!/usr/bin/env python3
import base64
from datetime import datetime, timezone
import json
import os
import secrets
import shutil
import subprocess
import sys
from pathlib import Path


def _install_hint() -> str:
    if sys.platform == "darwin":
        return "brew install qrencode"
    if shutil.which("apt-get"):
        return "sudo apt-get install -y qrencode"
    if shutil.which("dnf"):
        return "sudo dnf install -y qrencode"
    if shutil.which("pacman"):
        return "sudo pacman -S --needed qrencode"
    return "install qrencode using your system package manager"


def _base32_secret(num_bytes: int = 20) -> str:
    return base64.b32encode(secrets.token_bytes(num_bytes)).decode("ascii").rstrip("=")


def _decode_base32(value: str) -> bytes:
    cleaned = value.strip().replace(" ", "")
    padding = "=" * ((8 - len(cleaned) % 8) % 8)
    return base64.b32decode(cleaned.upper() + padding, casefold=True)


def main() -> int:
    base_dir = Path.home() / ".passwordstore-broker"
    secret_file = base_dir / "totp.secret"
    qr_png_path = base_dir / "totp-enroll.png"
    setup_timestamp_file = base_dir / "setup_completed_at.txt"

    base_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
    try:
        os.chmod(base_dir, 0o700)
    except OSError:
        pass

    provisioned = False
    if secret_file.exists():
        secret = secret_file.read_text(encoding="utf-8").strip().replace(" ", "")
        if not secret:
            print(f"TOTP secret file is empty: {secret_file}", file=sys.stderr)
            return 2
    else:
        secret = _base32_secret()
        secret_file.write_text(secret + "\n", encoding="utf-8")
        provisioned = True

    try:
        _decode_base32(secret)
    except Exception:  # noqa: BLE001
        print(f"Invalid base32 value in TOTP secret file: {secret_file}", file=sys.stderr)
        return 2

    try:
        os.chmod(secret_file, 0o600)
    except OSError:
        pass

    if setup_timestamp_file.exists():
        setup_completed_at = setup_timestamp_file.read_text(encoding="utf-8").strip()
    else:
        setup_completed_at = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
        setup_timestamp_file.write_text(setup_completed_at + "\n", encoding="utf-8")
    try:
        os.chmod(setup_timestamp_file, 0o600)
    except OSError:
        pass

    otpauth_url = (
        "otpauth://totp/Passwordstore%20Broker"
        f"?secret={secret}&issuer=PasswordstoreBroker&algorithm=SHA1&digits=6&period=30"
    )

    payload = {
        "secret_file": str(secret_file),
        "otpauth_url": otpauth_url,
        "qr_png_path": str(qr_png_path),
        "setup_timestamp_file": str(setup_timestamp_file),
        "setup_completed_at": setup_completed_at,
        "provisioned": provisioned,
    }

    qrencode = shutil.which("qrencode")
    if qrencode is None:
        print(json.dumps(payload), flush=True)
        print(f"Missing required command: qrencode. Install with: {_install_hint()}", file=sys.stderr)
        print(f"Manual enrollment URL: {otpauth_url}", file=sys.stderr)
        return 2

    try:
        subprocess.run(
            [qrencode, "-o", str(qr_png_path), "-t", "PNG", otpauth_url],
            check=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            text=True,
        )
    except subprocess.CalledProcessError as exc:
        print(json.dumps(payload), flush=True)
        message = exc.stderr.strip() or "qrencode failed"
        print(f"Failed to generate QR PNG: {message}", file=sys.stderr)
        print(f"Manual enrollment URL: {otpauth_url}", file=sys.stderr)
        return 2

    try:
        os.chmod(qr_png_path, 0o600)
    except OSError:
        pass

    print(json.dumps(payload), flush=True)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/vault.sh

```bash
#!/usr/bin/env bash
set -euo pipefail

usage() {
  cat <<'EOF'
Usage:
  vault.sh put <secret-name>      # read secret from stdin and store
  vault.sh get <secret-name>      # print secret to stdout
  vault.sh ls                     # list secret names
  vault.sh exists <secret-name>   # exit 0 if exists
  vault.sh rm <secret-name>       # remove secret
EOF
}

require_cmd() {
  if ! command -v "$1" >/dev/null 2>&1; then
    echo "Missing required command: $1" >&2
    exit 1
  fi
}

validate_secret_name() {
  local value="$1"
  if [ -z "$value" ]; then
    echo "Secret name is required" >&2
    exit 1
  fi
  if [[ "$value" == -* ]]; then
    echo "Secret name must not start with '-'" >&2
    exit 1
  fi
  if [[ ! "$value" =~ ^[A-Za-z0-9._/-]+$ ]]; then
    echo "Secret name contains invalid characters" >&2
    exit 1
  fi
  if [[ "$value" == *".."* ]] || [[ "$value" == *"//"* ]]; then
    echo "Secret name contains invalid path traversal pattern" >&2
    exit 1
  fi
  if [[ "$value" == /* ]] || [[ "$value" == */ ]]; then
    echo "Secret name must not start or end with '/'" >&2
    exit 1
  fi
}

if [ "${1:-}" = "" ]; then
  usage
  exit 1
fi

cmd="$1"
name="${2:-}"

if [ "$cmd" = "-h" ] || [ "$cmd" = "--help" ]; then
  usage
  exit 0
fi

require_cmd pass

if [ "$cmd" != "ls" ] && [ -z "$name" ]; then
  usage
  exit 1
fi

if [ "$cmd" != "ls" ]; then
  validate_secret_name "$name"
fi

case "$cmd" in
  put)
    # Read exactly from stdin and write as multiline secret.
    pass insert -m -f -- "$name" >/dev/null
    ;;
  get)
    pass show -- "$name"
    ;;
  ls)
    pass ls
    ;;
  exists)
    pass show -- "$name" >/dev/null 2>&1
    ;;
  rm)
    pass rm -f -- "$name" >/dev/null
    ;;
  *)
    usage
    exit 1
    ;;
esac

```

### scripts/get_password_from_user.py

```python
#!/usr/bin/env python3
import argparse
import base64
import dataclasses
import hashlib
import hmac
import html
import ipaddress
import json
import os
import re
import secrets
import shutil
import socket
import socketserver
import ssl
import struct
import subprocess
import sys
import tempfile
import threading
import time
import urllib.parse
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
from typing import Optional


FORM_TEMPLATE = """<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <title>Submit secret</title>
  <style>
    body {{
      margin: 0;
      font-family: ui-sans-serif, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
      background: #f4f7fb;
      display: grid;
      place-items: center;
      min-height: 100vh;
    }}
    .card {{
      width: min(480px, 92vw);
      background: white;
      border-radius: 12px;
      box-shadow: 0 10px 30px rgba(0, 0, 0, 0.08);
      padding: 24px;
    }}
    h1 {{ margin-top: 0; font-size: 1.1rem; }}
    code {{
      background: #eef3fa;
      padding: 2px 6px;
      border-radius: 6px;
      font-size: 0.9em;
    }}
    input[type="password"], input[type="text"] {{
      width: 100%;
      box-sizing: border-box;
      border: 1px solid #d1d9e5;
      border-radius: 8px;
      padding: 10px 12px;
      font-size: 1rem;
      margin-top: 6px;
      margin-bottom: 10px;
    }}
    button {{
      margin-top: 8px;
      width: 100%;
      border: 0;
      border-radius: 8px;
      background: #1d4ed8;
      color: white;
      font-weight: 600;
      padding: 10px 12px;
      cursor: pointer;
    }}
    .hint {{
      color: #4b5563;
      font-size: 0.9rem;
      margin-top: 8px;
    }}
  </style>
</head>
<body>
  <main class="card">
    <h1>Provide secret for <code>{secret_name}</code></h1>
    <form method="POST" action="/submit">
      <input type="hidden" name="token" value="{token}">
      <label for="secret">Secret</label>
      <input type="password" id="secret" name="secret" autocomplete="off" required autofocus>
      {totp_section}
      <button type="submit">Store Secret</button>
    </form>
    <p class="hint">This page can be used once and then expires.</p>
  </main>
</body>
</html>
"""

TOTP_SECTION_TEMPLATE = """
<label for=\"totp\">Authenticator code</label>
<input type=\"text\" id=\"totp\" name=\"totp\" inputmode=\"numeric\" pattern=\"[0-9]{6}\" maxlength=\"6\" autocomplete=\"one-time-code\" required>
"""

OK_TEMPLATE = """<!doctype html>
<html><body style="font-family: sans-serif; padding: 2rem;">
<h2>Secret stored.</h2>
<p>You can close this tab now.</p>
</body></html>"""

FAIL_TEMPLATE = """<!doctype html>
<html><body style="font-family: sans-serif; padding: 2rem;">
<h2>Request failed.</h2>
<p>{message}</p>
</body></html>"""

DEFAULT_TOTP_SECRET_FILE = "~/.passwordstore-broker/totp.secret"
DEFAULT_LOCAL_TIMEOUT = 300
DEFAULT_LAN_TIMEOUT = 120
TOTP_WINDOW_SECONDS = 300
TOTP_MAX_FAILURES = 5
TOTP_LOCKOUT_SECONDS = 300


@dataclasses.dataclass
class AccessConfig:
    mode: str
    bind_host: str
    allowed_net: ipaddress.IPv4Network
    require_totp: bool
    totp_secret: Optional[str]
    timeout_seconds: int


@dataclasses.dataclass
class TotpAttempt:
    failed_at: list[float] = dataclasses.field(default_factory=list)
    locked_until: float = 0.0


def _base32_decode(key: str) -> bytes:
    padding = "=" * ((8 - len(key) % 8) % 8)
    return base64.b32decode(key.upper() + padding, casefold=True)


def _totp_at(secret: str, timestamp: int, step: int = 30, digits: int = 6) -> str:
    key = _base32_decode(secret)
    counter = int(timestamp // step)
    msg = struct.pack(">Q", counter)
    digest = hmac.new(key, msg, hashlib.sha1).digest()
    offset = digest[-1] & 0x0F
    code = (struct.unpack(">I", digest[offset : offset + 4])[0] & 0x7FFFFFFF) % (10**digits)
    return str(code).zfill(digits)


def verify_totp(secret: str, code: str, window: int = 1, step: int = 30, digits: int = 6) -> bool:
    now = int(time.time())
    for drift in range(-window, window + 1):
        if _totp_at(secret, now + drift * step, step=step, digits=digits) == code:
            return True
    return False


def _detect_outbound_ipv4() -> ipaddress.IPv4Address:
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
            sock.connect(("8.8.8.8", 80))
            host = sock.getsockname()[0]
    except OSError as exc:
        raise RuntimeError(f"Unable to detect outbound IPv4 address: {exc}") from exc

    try:
        ip = ipaddress.ip_address(host)
    except ValueError as exc:
        raise RuntimeError(f"Detected invalid local IP address: {host}") from exc

    if not isinstance(ip, ipaddress.IPv4Address):
        raise RuntimeError(f"Detected non-IPv4 address ({host}); only IPv4 is supported")

    return ip


def _linux_network_for_ip(local_ip: ipaddress.IPv4Address) -> Optional[ipaddress.IPv4Network]:
    ip_cmd = shutil.which("ip")
    if ip_cmd is None:
        raise RuntimeError("Missing required command 'ip' for Linux LAN detection")

    try:
        route_proc = subprocess.run(
            [ip_cmd, "-j", "route", "get", "1.1.1.1"],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
    except subprocess.CalledProcessError as exc:
        message = exc.stderr.strip() or "could not inspect interfaces"
        raise RuntimeError(f"Linux LAN detection failed: {message}") from exc

    try:
        route_info = json.loads(route_proc.stdout)
    except json.JSONDecodeError as exc:
        raise RuntimeError("Linux LAN detection failed: invalid route JSON output") from exc

    if not route_info or not isinstance(route_info, list):
        raise RuntimeError("Linux LAN detection failed: empty route output")

    iface_name = route_info[0].get("dev")
    if not iface_name:
        raise RuntimeError("Linux LAN detection failed: route output missing interface name")

    try:
        addr_proc = subprocess.run(
            [ip_cmd, "-j", "-f", "inet", "addr", "show", "dev", iface_name],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
    except subprocess.CalledProcessError as exc:
        message = exc.stderr.strip() or "could not read interface addresses"
        raise RuntimeError(f"Linux LAN detection failed: {message}") from exc

    try:
        addr_info = json.loads(addr_proc.stdout)
    except json.JSONDecodeError as exc:
        raise RuntimeError("Linux LAN detection failed: invalid interface JSON output") from exc

    for iface in addr_info:
        for entry in iface.get("addr_info", []):
            if entry.get("family") != "inet":
                continue
            if entry.get("local") != str(local_ip):
                continue
            prefix = entry.get("prefixlen")
            if not isinstance(prefix, int):
                continue
            return ipaddress.ip_network(f"{local_ip}/{prefix}", strict=False)
    return None


def _prefix_from_netmask(mask: str) -> int:
    if mask.startswith("0x"):
        dotted = str(ipaddress.IPv4Address(int(mask, 16)))
    else:
        dotted = mask
    return ipaddress.IPv4Network(f"0.0.0.0/{dotted}").prefixlen


def _macos_network_for_ip(local_ip: ipaddress.IPv4Address) -> Optional[ipaddress.IPv4Network]:
    route_cmd = shutil.which("route")
    if route_cmd is None:
        raise RuntimeError("Missing required command 'route' for macOS LAN detection")

    try:
        route_proc = subprocess.run(
            [route_cmd, "-n", "get", "default"],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
    except subprocess.CalledProcessError as exc:
        message = exc.stderr.strip() or "could not inspect default route"
        raise RuntimeError(f"macOS LAN detection failed: {message}") from exc

    iface_name = None
    for line in route_proc.stdout.splitlines():
        if "interface:" in line:
            iface_name = line.split("interface:", 1)[1].strip()
            break

    if not iface_name:
        raise RuntimeError("macOS LAN detection failed: default route missing interface name")

    try:
        proc = subprocess.run(
            ["ifconfig", iface_name],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
    except subprocess.CalledProcessError as exc:
        message = exc.stderr.strip() or "could not inspect interfaces"
        raise RuntimeError(f"macOS LAN detection failed: {message}") from exc

    inet_pattern = re.compile(r"\sinet\s+(\d+\.\d+\.\d+\.\d+)\s+netmask\s+(0x[0-9a-fA-F]+|\d+\.\d+\.\d+\.\d+)")
    for line in proc.stdout.splitlines():
        match = inet_pattern.match(line)
        if not match:
            continue
        found_ip = ipaddress.ip_address(match.group(1))
        if found_ip != local_ip:
            continue
        prefix = _prefix_from_netmask(match.group(2))
        return ipaddress.ip_network(f"{local_ip}/{prefix}", strict=False)
    return None


def _detect_lan_network(local_ip: ipaddress.IPv4Address) -> ipaddress.IPv4Network:
    if sys.platform.startswith("linux"):
        network = _linux_network_for_ip(local_ip)
    elif sys.platform == "darwin":
        network = _macos_network_for_ip(local_ip)
    else:
        raise RuntimeError(
            f"Unsupported platform '{sys.platform}' for LAN autodetection; use local access mode"
        )

    if network is None:
        raise RuntimeError(
            f"Unable to resolve LAN subnet for local IP {local_ip}; use local access mode"
        )

    return network


def _load_totp_secret(path: str) -> str:
    expanded = os.path.expanduser(path)
    try:
        with open(expanded, "r", encoding="utf-8") as fh:
            secret = fh.read().strip().replace(" ", "")
    except OSError as exc:
        raise RuntimeError(f"Failed to read TOTP secret file ({expanded}): {exc}") from exc

    if not secret:
        raise RuntimeError(f"TOTP secret file is empty ({expanded})")

    try:
        _base32_decode(secret)
    except Exception as exc:  # noqa: BLE001
        raise RuntimeError(f"Invalid base32 TOTP secret in {expanded}") from exc

    return secret


def build_access_config(access_mode: str, timeout: Optional[int], totp_secret_file: str) -> AccessConfig:
    if access_mode == "local":
        return AccessConfig(
            mode="local",
            bind_host="127.0.0.1",
            allowed_net=ipaddress.ip_network("127.0.0.1/32"),
            require_totp=False,
            totp_secret=None,
            timeout_seconds=timeout if timeout is not None else DEFAULT_LOCAL_TIMEOUT,
        )

    local_ip = _detect_outbound_ipv4()
    if not local_ip.is_private:
        raise RuntimeError(
            f"Detected outbound IP {local_ip} is not private; refusing LAN mode"
        )

    network = _detect_lan_network(local_ip)
    secret = _load_totp_secret(totp_secret_file)

    return AccessConfig(
        mode="lan",
        bind_host=str(local_ip),
        allowed_net=network,
        require_totp=True,
        totp_secret=secret,
        timeout_seconds=timeout if timeout is not None else DEFAULT_LAN_TIMEOUT,
    )


class BrokerState:
    def __init__(self, secret_name: str, token: str, vault_script: str, access: AccessConfig) -> None:
        self.secret_name = secret_name
        self.token = token
        self.vault_script = vault_script
        self.access = access
        self.done = threading.Event()
        self.error = None
        self._totp_attempts: dict[tuple[str, str], TotpAttempt] = {}
        self._totp_lock = threading.Lock()

    def _attempt_key(self, client_ip: str) -> tuple[str, str]:
        return (client_ip, self.token)

    def is_totp_locked(self, client_ip: str, now: float) -> bool:
        key = self._attempt_key(client_ip)
        with self._totp_lock:
            attempt = self._totp_attempts.get(key)
            if attempt is None:
                return False

            if attempt.locked_until > now:
                return True

            if attempt.locked_until and attempt.locked_until <= now:
                attempt.locked_until = 0
                attempt.failed_at.clear()
                return False

            attempt.failed_at = [ts for ts in attempt.failed_at if ts >= now - TOTP_WINDOW_SECONDS]
            return False

    def record_totp_failure(self, client_ip: str, now: float) -> None:
        key = self._attempt_key(client_ip)
        with self._totp_lock:
            attempt = self._totp_attempts.setdefault(key, TotpAttempt())
            attempt.failed_at = [ts for ts in attempt.failed_at if ts >= now - TOTP_WINDOW_SECONDS]
            attempt.failed_at.append(now)
            if len(attempt.failed_at) >= TOTP_MAX_FAILURES:
                attempt.locked_until = now + TOTP_LOCKOUT_SECONDS

    def clear_totp_failures(self, client_ip: str) -> None:
        key = self._attempt_key(client_ip)
        with self._totp_lock:
            self._totp_attempts.pop(key, None)


class OneShotHandler(BaseHTTPRequestHandler):
    server_version = "passwordstore-broker/0.1"

    def log_message(self, fmt, *args):
        return

    @property
    def state(self) -> BrokerState:
        return self.server.state  # type: ignore[attr-defined]

    def _send_html(self, status: int, body: str) -> None:
        encoded = body.encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "text/html; charset=utf-8")
        self.send_header("Content-Length", str(len(encoded)))
        self.end_headers()
        self.wfile.write(encoded)

    def _client_ip(self) -> Optional[ipaddress.IPv4Address]:
        try:
            ip = ipaddress.ip_address(self.client_address[0])
        except ValueError:
            return None
        if not isinstance(ip, ipaddress.IPv4Address):
            return None
        return ip

    def _client_allowed(self, client_ip: Optional[ipaddress.IPv4Address]) -> bool:
        return client_ip is not None and client_ip in self.state.access.allowed_net

    def do_GET(self):
        if self.path != "/":
            self._send_html(
                HTTPStatus.NOT_FOUND,
                FAIL_TEMPLATE.format(message="Unknown path."),
            )
            return

        client_ip = self._client_ip()
        if not self._client_allowed(client_ip):
            self._send_html(
                HTTPStatus.FORBIDDEN,
                FAIL_TEMPLATE.format(message="Source IP not allowed."),
            )
            return

        totp_section = TOTP_SECTION_TEMPLATE if self.state.access.require_totp else ""
        self._send_html(
            HTTPStatus.OK,
            FORM_TEMPLATE.format(
                secret_name=html.escape(self.state.secret_name),
                token=html.escape(self.state.token),
                totp_section=totp_section,
            ),
        )

    def do_POST(self):
        if self.path != "/submit":
            self._send_html(
                HTTPStatus.NOT_FOUND,
                FAIL_TEMPLATE.format(message="Unknown path."),
            )
            return

        length = int(self.headers.get("Content-Length", "0"))
        raw = self.rfile.read(length).decode("utf-8", errors="replace")
        form = urllib.parse.parse_qs(raw)

        token = form.get("token", [""])[0]
        if token != self.state.token:
            self._send_html(
                HTTPStatus.FORBIDDEN,
                FAIL_TEMPLATE.format(message="Invalid token."),
            )
            return

        client_ip = self._client_ip()
        if not self._client_allowed(client_ip):
            self._send_html(
                HTTPStatus.FORBIDDEN,
                FAIL_TEMPLATE.format(message="Source IP not allowed."),
            )
            return

        client_ip_str = str(client_ip)
        if self.state.access.require_totp:
            now = time.time()
            if self.state.is_totp_locked(client_ip_str, now):
                self._send_html(
                    HTTPStatus.TOO_MANY_REQUESTS,
                    FAIL_TEMPLATE.format(message="Authentication failed. Try again shortly."),
                )
                return

            totp_code = form.get("totp", [""])[0].strip()
            secret = self.state.access.totp_secret or ""
            valid = len(totp_code) == 6 and totp_code.isdigit() and verify_totp(secret, totp_code)
            if not valid:
                self.state.record_totp_failure(client_ip_str, now)
                self._send_html(
                    HTTPStatus.FORBIDDEN,
                    FAIL_TEMPLATE.format(message="Authentication failed."),
                )
                return

        secret_value = form.get("secret", [""])[0]
        if not secret_value:
            self._send_html(
                HTTPStatus.BAD_REQUEST,
                FAIL_TEMPLATE.format(message="Missing secret."),
            )
            return

        try:
            subprocess.run(
                [self.state.vault_script, "put", self.state.secret_name],
                input=secret_value.encode("utf-8"),
                check=True,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.PIPE,
            )
        except subprocess.CalledProcessError as exc:
            self.state.error = exc.stderr.decode("utf-8", errors="replace").strip()
            self._send_html(
                HTTPStatus.INTERNAL_SERVER_ERROR,
                FAIL_TEMPLATE.format(message="Could not store secret in vault."),
            )
            self.state.done.set()
            return

        if self.state.access.require_totp:
            self.state.clear_totp_failures(client_ip_str)

        self._send_html(HTTPStatus.OK, OK_TEMPLATE)
        self.state.done.set()


def ensure_self_signed_cert(hostname: str, cert_dir: str):
    cert_path = os.path.join(cert_dir, "cert.pem")
    key_path = os.path.join(cert_dir, "key.pem")
    openssl = shutil.which("openssl")
    if openssl is None:
        raise RuntimeError("openssl is required to generate a self-signed cert")

    san = "DNS:localhost,IP:127.0.0.1"
    try:
        ipaddress.ip_address(hostname)
        san = f"IP:{hostname},DNS:localhost,IP:127.0.0.1"
    except ValueError:
        san = f"DNS:{hostname},DNS:localhost,IP:127.0.0.1"

    subprocess.run(
        [
            openssl,
            "req",
            "-x509",
            "-newkey",
            "rsa:2048",
            "-sha256",
            "-days",
            "1",
            "-nodes",
            "-keyout",
            key_path,
            "-out",
            cert_path,
            "-subj",
            f"/CN={hostname}",
            "-addext",
            f"subjectAltName={san}",
        ],
        check=True,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
    return cert_path, key_path


def parse_args():
    parser = argparse.ArgumentParser(
        description="Open one-time HTTPS page to collect secret and store in pass"
    )
    parser.add_argument("--secretname", required=True, help="Vault secret name/key")
    parser.add_argument("--port", type=int, required=True, help="Local HTTPS port")
    parser.add_argument(
        "--access",
        choices=["local", "lan"],
        default="local",
        help="Access mode: 'local' for localhost only, 'lan' for private-network intake with webform TOTP",
    )
    parser.add_argument(
        "--timeout",
        type=int,
        default=None,
        help="Seconds to wait for submission before exiting",
    )
    parser.add_argument(
        "--totp-secret-file",
        default=DEFAULT_TOTP_SECRET_FILE,
        help="Path to base32 TOTP secret file (used in LAN mode)",
    )
    return parser.parse_args()


def main():
    args = parse_args()

    try:
        access = build_access_config(args.access, args.timeout, args.totp_secret_file)
    except RuntimeError as exc:
        print(str(exc), file=sys.stderr)
        return 2

    script_dir = os.path.dirname(os.path.abspath(__file__))
    vault_script = os.path.join(script_dir, "vault.sh")

    if not os.path.exists(vault_script):
        print("vault.sh not found next to this script", file=sys.stderr)
        return 2

    token = secrets.token_urlsafe(24)
    state = BrokerState(args.secretname, token, vault_script, access)

    with tempfile.TemporaryDirectory(prefix="passwordstore-broker-cert-") as cert_dir:
        try:
            cert_path, key_path = ensure_self_signed_cert(access.bind_host, cert_dir)
        except RuntimeError as exc:
            print(str(exc), file=sys.stderr)
            return 2

        class ThreadingTCPServer(socketserver.ThreadingTCPServer):
            allow_reuse_address = True

        try:
            with ThreadingTCPServer((access.bind_host, args.port), OneShotHandler) as httpd:
                httpd.state = state  # type: ignore[attr-defined]
                context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
                context.load_cert_chain(certfile=cert_path, keyfile=key_path)
                httpd.socket = context.wrap_socket(httpd.socket, server_side=True)

                thread = threading.Thread(target=httpd.serve_forever, daemon=True)
                thread.start()

                if access.mode == "lan":
                    print(
                        f"[LAN MODE] bind={access.bind_host}:{args.port} allow={access.allowed_net} totp=form",
                        flush=True,
                    )
                url = f"https://{access.bind_host}:{args.port}/"
                print(url, flush=True)

                completed = state.done.wait(timeout=access.timeout_seconds)
                httpd.shutdown()
                thread.join(timeout=2)
        except OSError as exc:
            print(
                f"Failed to start HTTPS intake server on {access.bind_host}:{args.port}: {exc}",
                file=sys.stderr,
            )
            return 2

    if not completed:
        print("Timed out waiting for secret submission", file=sys.stderr)
        return 3
    if state.error:
        print(state.error, file=sys.stderr)
        return 4
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

```

### scripts/run_with_secret.sh

```bash
#!/usr/bin/env bash
set -euo pipefail

usage() {
  cat <<'EOF'
Usage:
  run_with_secret.sh --secret <name> --env <ENV_NAME> -- <command> [args...]

Example:
  run_with_secret.sh --secret github/token --env GITHUB_TOKEN -- gh api user
EOF
}

secret_name=""
env_name=""
script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
vault_script="${script_dir}/vault.sh"

while [ $# -gt 0 ]; do
  case "$1" in
    -h|--help)
      usage
      exit 0
      ;;
    --secret)
      secret_name="${2:-}"
      shift 2
      ;;
    --env)
      env_name="${2:-}"
      shift 2
      ;;
    --)
      shift
      break
      ;;
    *)
      echo "Unknown arg: $1" >&2
      usage
      exit 1
      ;;
  esac
done

if [ -z "$secret_name" ] || [ -z "$env_name" ] || [ $# -eq 0 ]; then
  usage
  exit 1
fi

if [[ ! "$env_name" =~ ^[A-Za-z_][A-Za-z0-9_]*$ ]]; then
  echo "Invalid env var name: $env_name" >&2
  exit 1
fi

if [ ! -x "$vault_script" ]; then
  echo "Missing executable vault script: $vault_script" >&2
  exit 1
fi

if ! secret_value="$("$vault_script" get "$secret_name")"; then
  echo "Failed to read secret: $secret_name" >&2
  exit 1
fi

exec env "$env_name=$secret_value" "$@"

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "bieggerm",
  "slug": "passwordstore-broker",
  "displayName": "Passwordstore Broker",
  "latest": {
    "version": "1.1.1",
    "publishedAt": 1771453850206,
    "commit": "https://github.com/openclaw/skills/commit/f0b108661050d97143bd3fb525c740d5db1682d0"
  },
  "history": [
    {
      "version": "1.0.3",
      "publishedAt": 1771452741213,
      "commit": "https://github.com/openclaw/skills/commit/6d2841668c849e6bf69207cbdec2bcb0ed64eae7"
    }
  ]
}

```

passwordstore-broker | SkillHub