Back to skills
SkillHub ClubAnalyze Data & AIFull StackData / AI

flux

Publish events and query shared world state via Flux state engine. Use when agents need to share observations, coordinate on shared data, or track entity state across systems.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,086
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
B77.6

Install command

npx @skill-hub/cli install openclaw-skills-flux

Repository

openclaw/skills

Skill path: skills/eckmantechllc/flux

Publish events and query shared world state via Flux state engine. Use when agents need to share observations, coordinate on shared data, or track entity state across systems.

Open repository

Best for

Primary workflow: Analyze Data & AI.

Technical facets: Full Stack, Data / AI.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install flux into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding flux to shared team environments
  • Use flux for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: flux
description: Publish events and query shared world state via Flux state engine. Use when agents need to share observations, coordinate on shared data, or track entity state across systems.
metadata:
  {
    "openclaw":
      {
        "emoji": "⚡",
        "requires": { "env": ["FLUX_TOKEN"] },
        "primaryEnv": "FLUX_TOKEN",
        "optionalEnv": ["FLUX_URL", "FLUX_ADMIN_TOKEN"],
      },
  }
---

# Flux Skill

Flux is a persistent, shared, event-sourced world state engine. Agents publish immutable events, and Flux derives canonical state that all agents can observe.

## Key Concepts

- **Events**: Immutable observations (temperature readings, status changes, etc.)
- **Entities**: State objects derived from events (sensors, devices, agents)
- **Properties**: Key-value attributes of entities (merged on update — only changed properties need to be sent)
- **Streams**: Logical event namespaces (sensors, agents, system)
- **Namespaces**: Multi-tenant isolation with token auth (optional, for public instances)

## Prerequisites

**Public instance:** `https://api.flux-universe.com` (namespace purchased at flux-universe.com — name auto-assigned at purchase, e.g. `dawn-coral`)
**Local instance:** `http://localhost:3000` (default, override with `FLUX_URL` env var)

Authentication: Set `FLUX_TOKEN` to your bearer token. Required for the public instance. Optional for local instances with auth disabled.

## Namespace Prefix

All entity IDs must be prefixed with your namespace:
`yournamespace/entity-name`

Example with namespace `dawn-coral`:
```bash
./scripts/flux.sh publish sensors agent-01 dawn-coral/sensor-01 \
  '{"temperature":22.5}'
./scripts/flux.sh get dawn-coral/sensor-01
```

Entity IDs without a namespace prefix will be rejected on auth-enabled instances.

## Getting Started

First, verify your connection:
```bash
./scripts/flux.sh health
```

Then check the directory to see what's available on the Flux Universe:
```bash
./scripts/flux.sh get flux-core/directory
```

The directory lists all active namespaces, entity counts, and total entities — a good way to discover what data is flowing through the system.

## Scripts

Use the provided bash script in the `scripts/` directory:
- `flux.sh` - Main CLI tool

## Common Operations

### Publish Event
```bash
./scripts/flux.sh publish <stream> <source> <entity_id> <properties_json>

# Replace dawn-coral with your namespace
# Example: Publish sensor reading
./scripts/flux.sh publish sensors agent-01 dawn-coral/temp-sensor-01 '{"temperature":22.5,"unit":"celsius"}'
```

### Query Entity State
```bash
./scripts/flux.sh get <entity_id>

# Replace dawn-coral with your namespace
# Example: Get current sensor state
./scripts/flux.sh get dawn-coral/temp-sensor-01
```

### List All Entities
```bash
./scripts/flux.sh list

# Filter by prefix
./scripts/flux.sh list --prefix scada/
```

### Delete Entity
```bash
./scripts/flux.sh delete <entity_id>

# Example: Remove old test entity
./scripts/flux.sh delete test/old-entity
```

### Batch Publish Events
```bash
# Replace dawn-coral with your namespace
./scripts/flux.sh batch '[
  {"stream":"sensors","source":"agent-01","payload":{"entity_id":"dawn-coral/sensor-01","properties":{"temp":22}}},
  {"stream":"sensors","source":"agent-01","payload":{"entity_id":"dawn-coral/sensor-02","properties":{"temp":23}}}
]'
```

### Check Connector Status
```bash
./scripts/flux.sh connectors
```

### Admin Config
```bash
# Read runtime config
./scripts/flux.sh admin-config

# Update (requires FLUX_ADMIN_TOKEN)
./scripts/flux.sh admin-config '{"rate_limit_per_namespace_per_minute": 5000}'
```

## Use Cases

### Multi-Agent Coordination
Agents publish observations to shared entities:
```bash
# Replace dawn-coral with your namespace
# Agent A observes temperature
flux.sh publish sensors agent-a dawn-coral/room-101 '{"temperature":22.5}'

# Agent B queries current state
flux.sh get dawn-coral/room-101
# Returns: {"temperature":22.5,...}
```

### Status Tracking
Track service/system state:
```bash
# Replace dawn-coral with your namespace
# Publish status change
flux.sh publish system monitor dawn-coral/api-gateway '{"status":"healthy","uptime":3600}'

# Query current status
flux.sh get dawn-coral/api-gateway
```

## API Endpoints

**Event Ingestion:**
- `POST /api/events` — Publish single event (1 MB limit)
- `POST /api/events/batch` — Publish multiple events (10 MB limit)

**State Query:**
- `GET /api/state/entities` — List all entities (supports `?prefix=` and `?namespace=` filters)
- `GET /api/state/entities/:id` — Get specific entity

**Entity Management:**
- `DELETE /api/state/entities/:id` — Delete single entity
- `POST /api/state/entities/delete` — Batch delete (by namespace/prefix/IDs)

**Real-time Updates:**
- `GET /api/ws` — WebSocket subscription

**Connectors:**
- `GET /api/connectors` — List connectors and status
- `POST /api/connectors/:name/token` — Store PAT credential
- `DELETE /api/connectors/:name/token` — Remove credential

**Admin:**
- `GET /api/admin/config` — Read runtime config
- `PUT /api/admin/config` — Update runtime config (requires FLUX_ADMIN_TOKEN)

**Namespaces (auth mode only):**
- `POST /api/namespaces` — Register namespace (returns auth token)

## Notes

- Events auto-generate UUIDs (no need to provide eventId)
- Properties **merge** on updates — only send changed properties, existing ones are preserved
- Timestamp field must be epoch milliseconds (i64) — required by the API, auto-generated by flux.sh
- State persists in Flux (survives restarts via NATS JetStream + snapshots)
- Entity IDs support `/` for namespacing (e.g., `scada/pump-01`)


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/flux.sh

```bash
#!/usr/bin/env bash
# Flux CLI helper

FLUX_URL="${FLUX_URL:-https://api.flux-universe.com}"
FLUX_TOKEN="${FLUX_TOKEN:-}"
FLUX_ADMIN_TOKEN="${FLUX_ADMIN_TOKEN:-}"

# Helper function for API calls
api_call() {
    local method=$1
    local endpoint=$2
    local data=$3
    local auth_header=""

    if [[ -n "$FLUX_TOKEN" ]]; then
        auth_header="-H \"Authorization: Bearer ${FLUX_TOKEN}\""
    fi

    local -a cmd=(curl -s -X "$method" "${FLUX_URL}${endpoint}" -H "Content-Type: application/json")

    if [[ -n "$FLUX_TOKEN" ]]; then
        cmd+=(-H "Authorization: Bearer ${FLUX_TOKEN}")
    fi

    if [[ -n "$data" ]]; then
        cmd+=(-d "$data")
    fi

    "${cmd[@]}"
}

# Format JSON output (pretty print if jq available)
format_output() {
    if command -v jq &> /dev/null; then
        jq '.'
    else
        python3 -m json.tool 2>/dev/null || cat
    fi
}

# Commands
case "${1:-}" in
    publish)
        stream="$2"
        source="$3"
        entity_id="$4"
        properties="$5"

        if [[ -z "$stream" || -z "$source" || -z "$entity_id" || -z "$properties" ]]; then
            echo "Usage: flux.sh publish STREAM SOURCE ENTITY_ID PROPERTIES_JSON"
            echo ""
            echo "Example:"
            echo "  flux.sh publish sensors agent-01 temp-sensor-01 '{\"temperature\":22.5}'"
            exit 1
        fi

        timestamp=$(date +%s)000  # Unix epoch milliseconds

        payload=$(cat <<EOF
{
  "entity_id": "${entity_id}",
  "properties": ${properties}
}
EOF
)

        event=$(cat <<EOF
{
  "stream": "${stream}",
  "source": "${source}",
  "timestamp": ${timestamp},
  "payload": ${payload}
}
EOF
)

        echo "Publishing event to Flux..."
        api_call POST "/api/events" "$event" | format_output
        ;;

    batch)
        events="$2"

        if [[ -z "$events" ]]; then
            echo "Usage: flux.sh batch EVENTS_JSON_ARRAY"
            exit 1
        fi

        batch_payload="{\"events\": ${events}}"
        echo "Publishing batch to Flux..."
        api_call POST "/api/events/batch" "$batch_payload" | format_output
        ;;

    get)
        entity_id="$2"

        if [[ -z "$entity_id" ]]; then
            echo "Usage: flux.sh get ENTITY_ID"
            exit 1
        fi

        # URL-encode slashes in entity_id
        encoded_id=$(echo "$entity_id" | sed 's|/|%2F|g')
        api_call GET "/api/state/entities/${encoded_id}" | format_output
        ;;

    list)
        query=""
        if [[ "$2" == "--prefix" && -n "$3" ]]; then
            query="?prefix=$3"
        elif [[ "$2" == "--namespace" && -n "$3" ]]; then
            query="?namespace=$3"
        fi
        api_call GET "/api/state/entities${query}" | format_output
        ;;

    delete)
        entity_id="$2"

        if [[ -z "$entity_id" ]]; then
            echo "Usage: flux.sh delete ENTITY_ID"
            exit 1
        fi

        encoded_id=$(echo "$entity_id" | sed 's|/|%2F|g')
        echo "Deleting entity ${entity_id}..."
        api_call DELETE "/api/state/entities/${encoded_id}" | format_output
        ;;

    batch-delete)
        filter="$2"

        if [[ -z "$filter" ]]; then
            echo "Usage: flux.sh batch-delete FILTER_JSON"
            echo ""
            echo "Examples:"
            echo '  flux.sh batch-delete '"'"'{"prefix":"loadtest-"}'"'"
            echo '  flux.sh batch-delete '"'"'{"namespace":"matt"}'"'"
            echo '  flux.sh batch-delete '"'"'{"entity_ids":["id1","id2"]}'"'"
            exit 1
        fi

        echo "Batch deleting entities..."
        api_call POST "/api/state/entities/delete" "$filter" | format_output
        ;;

    connectors)
        api_call GET "/api/connectors" | format_output
        ;;

    admin-config)
        if [[ -n "$2" ]]; then
            # Update config
            if [[ -z "$FLUX_ADMIN_TOKEN" ]]; then
                echo "Error: FLUX_ADMIN_TOKEN not set"
                exit 1
            fi
            curl -s -X PUT "${FLUX_URL}/api/admin/config" \
                -H "Content-Type: application/json" \
                -H "Authorization: Bearer ${FLUX_ADMIN_TOKEN}" \
                -d "$2" | format_output
        else
            # Read config
            api_call GET "/api/admin/config" | format_output
        fi
        ;;

    health)
        echo "Testing Flux connection at ${FLUX_URL}..."
        response=$(api_call GET "/api/state/entities")

        if [[ $? -eq 0 && -n "$response" ]]; then
            echo "✓ Flux is reachable"
            entity_count=$(echo "$response" | python3 -c "import sys,json;print(len(json.load(sys.stdin)))" 2>/dev/null || echo "?")
            echo "  Entities in state: ${entity_count}"
        else
            echo "✗ Failed to reach Flux at ${FLUX_URL}"
            exit 1
        fi
        ;;

    *)
        echo "Flux CLI - Interact with Flux state engine"
        echo ""
        echo "Usage: flux.sh COMMAND [ARGS]"
        echo ""
        echo "Commands:"
        echo "  publish STREAM SOURCE ENTITY_ID PROPERTIES_JSON"
        echo "      Publish event to create/update entity"
        echo ""
        echo "  get ENTITY_ID"
        echo "      Query current state of entity"
        echo ""
        echo "  list [--prefix PREFIX] [--namespace NS]"
        echo "      List all entities (optional filter)"
        echo ""
        echo "  delete ENTITY_ID"
        echo "      Delete a single entity"
        echo ""
        echo "  batch-delete FILTER_JSON"
        echo "      Batch delete by prefix/namespace/IDs"
        echo ""
        echo "  batch EVENTS_JSON_ARRAY"
        echo "      Publish multiple events at once"
        echo ""
        echo "  connectors"
        echo "      List connector status"
        echo ""
        echo "  admin-config [UPDATE_JSON]"
        echo "      Read or update runtime config"
        echo ""
        echo "  health"
        echo "      Test connection to Flux"
        echo ""
        echo "Environment:"
        echo "  FLUX_URL=${FLUX_URL}"
        echo "  FLUX_TOKEN=(auth token for write operations)"
        echo "  FLUX_ADMIN_TOKEN=(admin token for config updates)"
        exit 1
        ;;
esac

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### README.md

```markdown
# Flux Skill for OpenClaw

An OpenClaw skill that enables agents to publish events and query shared world state via [Flux](https://github.com/EckmanTechLLC/flux) — a persistent, event-sourced state engine.

## What is Flux?

Flux is a state engine that ingests immutable events and derives canonical world state. Multiple agents can:
- Publish observations as events
- Query current state of entities
- Coordinate through shared state
- Audit full event history

**Key difference from pub/sub:** Flux owns state derivation. Agents don't process raw events—they observe Flux's canonical state.

## Installation

### Local Testing (Option 1)

Copy skill to OpenClaw workspace:
```bash
# On OpenClaw VM
mkdir -p ~/workspace/skills
scp -r <your-host>:/path/to/flux-interact ~/workspace/skills/
```

OpenClaw will auto-discover on next startup.

### ClawHub Install (Option 2)

Install from: https://clawhub.ai/EckmanTechLLC/flux

## Prerequisites

1. **Flux:** public instance at `https://api.flux-universe.com` (default), or local at `http://localhost:3000` (set `FLUX_URL` to override)
2. **curl** installed (required)
3. **jq** recommended (optional, has fallback)

Verify connection:
```bash
cd ~/workspace/skills/flux-interact
./scripts/flux.sh health
```

## Usage Examples

> **Note:** entity IDs must be prefixed with your namespace (e.g. `dawn-coral/sensor-01`, not `sensor-01`)

### For OpenClaw Agents

Agents can naturally use Flux via the skill:

**User:** "What's the temperature of sensor-01?"

**Agent:** Let me check the current state in Flux.
```bash
./scripts/flux.sh get temp-sensor-01
```

**User:** "Record that room-101 temperature is 22.5 celsius"

**Agent:** I'll publish that observation to Flux.
```bash
./scripts/flux.sh publish sensors assistant room-101 '{"temperature":22.5,"unit":"celsius"}'
```

**User:** "Show me all entities we're tracking"

**Agent:**
```bash
./scripts/flux.sh list
```

### Direct CLI Usage

```bash
# Publish sensor reading
./scripts/flux.sh publish sensors agent-01 temp-sensor-01 '{"temperature":22.5,"unit":"celsius"}'

# Query entity state
./scripts/flux.sh get temp-sensor-01

# List all entities
./scripts/flux.sh list

# Test connection
./scripts/flux.sh health
```

## Use Cases

### Multi-Agent Coordination

Multiple OpenClaw instances share state via Flux:

```bash
# Agent A on VM1 observes
flux.sh publish sensors agent-a room-101 '{"temperature":22.5}'

# Agent B on VM2 queries
flux.sh get room-101
# Returns: {"temperature":22.5,...}
```

### Status Tracking

Monitor services across infrastructure:

```bash
# Publish service status
flux.sh publish system monitor api-gateway '{"status":"healthy","latency_ms":45}'

# Query current status
flux.sh get api-gateway
```

### Event Sourcing

All state changes are auditable:
- Events immutable (persisted in NATS)
- State derived from event history
- Can replay/debug past states

## Configuration

### Custom Flux URL

Default is `https://api.flux-universe.com`. Override for local instances:
```bash
export FLUX_URL=http://localhost:3000
./scripts/flux.sh health
```

## File Structure

```
flux-interact/
├── SKILL.md              # Skill definition for OpenClaw
├── scripts/
│   └── flux.sh          # CLI tool (bash + curl)
├── references/
│   └── api.md           # Full API documentation
└── README.md            # This file
```

## How It Works

1. **OpenClaw loads SKILL.md** when Flux is mentioned
2. **Agent reads instructions** on how to use scripts
3. **Agent calls flux.sh** with appropriate command
4. **Script makes HTTP requests** to Flux API
5. **Returns JSON** for agent to process

## Architecture

```
OpenClaw Agent
    ↓
SKILL.md instructions
    ↓
flux.sh script
    ↓
curl → Flux API (https://api.flux-universe.com or http://localhost:3000)
    ↓
Event Ingestion → NATS → State Engine → Query Response
```

## Limitations

- No WebSocket subscription support in flux.sh (use wscat or a WebSocket client directly against `GET /api/ws`)
- State is domain-agnostic (no schema validation)

## Contributing

This skill is part of [Flux](https://github.com/EckmanTechLLC/flux).

## License

*To be determined*

```

### _meta.json

```json
{
  "owner": "eckmantechllc",
  "slug": "flux",
  "displayName": "Flux",
  "latest": {
    "version": "2.3.0",
    "publishedAt": 1772144949174,
    "commit": "https://github.com/openclaw/skills/commit/270e2c814a8fe823a2ad5c828f55b613ef02fcdb"
  },
  "history": [
    {
      "version": "1.0.1",
      "publishedAt": 1771610463912,
      "commit": "https://github.com/openclaw/skills/commit/9ef15e3de5e4c4915d0badadae254798ee610483"
    },
    {
      "version": "1.1.0",
      "publishedAt": 1771092153456,
      "commit": "https://github.com/openclaw/skills/commit/78aa6ad38b00d9504366c96c44232c70d7e1e17b"
    },
    {
      "version": "1.0.0",
      "publishedAt": 1770836688466,
      "commit": "https://github.com/openclaw/skills/commit/41eddaeb80aca0ca184c221bfdcbba65455ad996"
    }
  ]
}

```

### references/api.md

```markdown
# Flux API Reference

**Base URL:** `https://api.flux-universe.com` (or `http://localhost:3000` for local instances)

---

## Event Ingestion

### POST /api/events

Publish single event to Flux.

**Request:**
```json
{
  "stream": "sensors",
  "source": "agent-01",
  "timestamp": 1739290200000,
  "payload": {
    "entity_id": "temp-sensor-01",
    "properties": {
      "temperature": 22.5,
      "unit": "celsius"
    }
  }
}
```

**Optional Fields:**
- `eventId` - Auto-generated UUIDv7 if omitted
- `timestamp` - Required. Unix epoch milliseconds
  (e.g. `date +%s000` in bash, `int(time.time()*1000)` in Python).
  flux.sh generates this automatically.
- `key` - Optional ordering/grouping hint
- `schema` - Optional schema metadata

**Response:**
```json
{
  "eventId": "019c4da0-3e28-75c3-991b-ae7b2576485a",
  "stream": "sensors"
}
```

---

### POST /api/events/batch

Publish multiple events at once.

**Request:**
```json
{
  "events": [
    {
      "stream": "sensors",
      "source": "agent-01",
      "payload": {
        "entity_id": "sensor-01",
        "properties": {"temp": 22}
      }
    },
    {
      "stream": "sensors",
      "source": "agent-01",
      "payload": {
        "entity_id": "sensor-02",
        "properties": {"temp": 23}
      }
    }
  ]
}
```

**Response:**
```json
{
  "successful": 2,
  "failed": 0,
  "results": [
    {
      "eventId": "019c4da0-...",
      "stream": "sensors"
    },
    {
      "eventId": "019c4da1-...",
      "stream": "sensors"
    }
  ]
}
```

---

## State Query

### GET /api/state/entities

List all entities in current world state.

**Response:**
```json
[
  {
    "id": "temp-sensor-01",
    "properties": {
      "temperature": 22.5,
      "unit": "celsius"
    },
    "lastUpdated": "2026-02-11T16:54:33.260296395+00:00"
  },
  {
    "id": "temp-sensor-02",
    "properties": {
      "temperature": 23.8,
      "unit": "celsius"
    },
    "lastUpdated": "2026-02-11T16:55:12.123456789+00:00"
  }
]
```

---

### GET /api/state/entities/:id

Get specific entity by ID.

**Response (200 OK):**
```json
{
  "id": "temp-sensor-01",
  "properties": {
    "temperature": 22.5,
    "unit": "celsius",
    "location": "lab-A"
  },
  "lastUpdated": "2026-02-11T16:54:33.260296395+00:00"
}
```

**Response (404 Not Found):**
```json
{
  "error": "Entity not found"
}
```

---

## State Derivation Model

### How Events Become State

1. **Event Published:**
   ```json
   {
     "stream": "sensors",
     "payload": {
       "entity_id": "sensor-01",
       "properties": {"temperature": 22.5}
     }
   }
   ```

2. **Flux Processes Event:**
   - Validates envelope
   - Persists to NATS JetStream
   - State engine consumes event

3. **State Derived:**
   - Entity `sensor-01` created/updated
   - Property `temperature` set to `22.5`
   - `lastUpdated` timestamp recorded

4. **Query Returns Current State:**
   ```json
   {
     "id": "sensor-01",
     "properties": {"temperature": 22.5},
     "lastUpdated": "2026-02-11T..."
   }
   ```

### Property Updates

Properties merge on updates (last write wins per property):

```bash
# Event 1: Set temperature and unit
{"entity_id": "sensor-01", "properties": {"temperature": 22.5, "unit": "celsius"}}

# Event 2: Update temperature only
{"entity_id": "sensor-01", "properties": {"temperature": 23.0}}

# Result: Both properties preserved
{"temperature": 23.0, "unit": "celsius"}
```

---

## WebSocket Subscription

Real-time state updates via WebSocket (use wscat or a WebSocket client — not included in flux.sh):

**Endpoint:** `wss://api.flux-universe.com/api/ws` (or `ws://localhost:3000/api/ws` for local instances)

**Subscribe Message:**
```json
{
  "type": "subscribe",
  "entity_id": "temp-sensor-01"
}
```

**Update Notification:**
```json
{
  "type": "state_update",
  "entity_id": "temp-sensor-01",
  "property": "temperature",
  "value": 24.0,
  "timestamp": "2026-02-11T16:54:33.260296395+00:00"
}
```

---

## Error Responses

**400 Bad Request:**
```json
{
  "error": "stream is required"
}
```

**500 Internal Server Error:**
```json
{
  "error": "Failed to publish event to NATS"
}
```

---

## Architecture Notes

**Event Flow:**
```
Agent → POST /api/events → Validation → NATS JetStream → State Engine → In-Memory State (DashMap)
                                                                               ↓
Agent ← GET /api/state/entities ← Query API ← In-Memory State
```

**Key Characteristics:**
- Events are immutable (never modified)
- State is derived (not directly written)
- NATS provides durability (events persist)
- State engine rebuilds from events on restart
- Multiple agents observe same canonical state

```