flux
Publish events and query shared world state via Flux state engine. Use when agents need to share observations, coordinate on shared data, or track entity state across systems.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-flux
Repository
Skill path: skills/eckmantechllc/flux
Publish events and query shared world state via Flux state engine. Use when agents need to share observations, coordinate on shared data, or track entity state across systems.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Data / AI.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install flux into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding flux to shared team environments
- Use flux for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: flux
description: Publish events and query shared world state via Flux state engine. Use when agents need to share observations, coordinate on shared data, or track entity state across systems.
metadata:
{
"openclaw":
{
"emoji": "⚡",
"requires": { "env": ["FLUX_TOKEN"] },
"primaryEnv": "FLUX_TOKEN",
"optionalEnv": ["FLUX_URL", "FLUX_ADMIN_TOKEN"],
},
}
---
# Flux Skill
Flux is a persistent, shared, event-sourced world state engine. Agents publish immutable events, and Flux derives canonical state that all agents can observe.
## Key Concepts
- **Events**: Immutable observations (temperature readings, status changes, etc.)
- **Entities**: State objects derived from events (sensors, devices, agents)
- **Properties**: Key-value attributes of entities (merged on update — only changed properties need to be sent)
- **Streams**: Logical event namespaces (sensors, agents, system)
- **Namespaces**: Multi-tenant isolation with token auth (optional, for public instances)
## Prerequisites
**Public instance:** `https://api.flux-universe.com` (namespace purchased at flux-universe.com — name auto-assigned at purchase, e.g. `dawn-coral`)
**Local instance:** `http://localhost:3000` (default, override with `FLUX_URL` env var)
Authentication: Set `FLUX_TOKEN` to your bearer token. Required for the public instance. Optional for local instances with auth disabled.
## Namespace Prefix
All entity IDs must be prefixed with your namespace:
`yournamespace/entity-name`
Example with namespace `dawn-coral`:
```bash
./scripts/flux.sh publish sensors agent-01 dawn-coral/sensor-01 \
'{"temperature":22.5}'
./scripts/flux.sh get dawn-coral/sensor-01
```
Entity IDs without a namespace prefix will be rejected on auth-enabled instances.
## Getting Started
First, verify your connection:
```bash
./scripts/flux.sh health
```
Then check the directory to see what's available on the Flux Universe:
```bash
./scripts/flux.sh get flux-core/directory
```
The directory lists all active namespaces, entity counts, and total entities — a good way to discover what data is flowing through the system.
## Scripts
Use the provided bash script in the `scripts/` directory:
- `flux.sh` - Main CLI tool
## Common Operations
### Publish Event
```bash
./scripts/flux.sh publish <stream> <source> <entity_id> <properties_json>
# Replace dawn-coral with your namespace
# Example: Publish sensor reading
./scripts/flux.sh publish sensors agent-01 dawn-coral/temp-sensor-01 '{"temperature":22.5,"unit":"celsius"}'
```
### Query Entity State
```bash
./scripts/flux.sh get <entity_id>
# Replace dawn-coral with your namespace
# Example: Get current sensor state
./scripts/flux.sh get dawn-coral/temp-sensor-01
```
### List All Entities
```bash
./scripts/flux.sh list
# Filter by prefix
./scripts/flux.sh list --prefix scada/
```
### Delete Entity
```bash
./scripts/flux.sh delete <entity_id>
# Example: Remove old test entity
./scripts/flux.sh delete test/old-entity
```
### Batch Publish Events
```bash
# Replace dawn-coral with your namespace
./scripts/flux.sh batch '[
{"stream":"sensors","source":"agent-01","payload":{"entity_id":"dawn-coral/sensor-01","properties":{"temp":22}}},
{"stream":"sensors","source":"agent-01","payload":{"entity_id":"dawn-coral/sensor-02","properties":{"temp":23}}}
]'
```
### Check Connector Status
```bash
./scripts/flux.sh connectors
```
### Admin Config
```bash
# Read runtime config
./scripts/flux.sh admin-config
# Update (requires FLUX_ADMIN_TOKEN)
./scripts/flux.sh admin-config '{"rate_limit_per_namespace_per_minute": 5000}'
```
## Use Cases
### Multi-Agent Coordination
Agents publish observations to shared entities:
```bash
# Replace dawn-coral with your namespace
# Agent A observes temperature
flux.sh publish sensors agent-a dawn-coral/room-101 '{"temperature":22.5}'
# Agent B queries current state
flux.sh get dawn-coral/room-101
# Returns: {"temperature":22.5,...}
```
### Status Tracking
Track service/system state:
```bash
# Replace dawn-coral with your namespace
# Publish status change
flux.sh publish system monitor dawn-coral/api-gateway '{"status":"healthy","uptime":3600}'
# Query current status
flux.sh get dawn-coral/api-gateway
```
## API Endpoints
**Event Ingestion:**
- `POST /api/events` — Publish single event (1 MB limit)
- `POST /api/events/batch` — Publish multiple events (10 MB limit)
**State Query:**
- `GET /api/state/entities` — List all entities (supports `?prefix=` and `?namespace=` filters)
- `GET /api/state/entities/:id` — Get specific entity
**Entity Management:**
- `DELETE /api/state/entities/:id` — Delete single entity
- `POST /api/state/entities/delete` — Batch delete (by namespace/prefix/IDs)
**Real-time Updates:**
- `GET /api/ws` — WebSocket subscription
**Connectors:**
- `GET /api/connectors` — List connectors and status
- `POST /api/connectors/:name/token` — Store PAT credential
- `DELETE /api/connectors/:name/token` — Remove credential
**Admin:**
- `GET /api/admin/config` — Read runtime config
- `PUT /api/admin/config` — Update runtime config (requires FLUX_ADMIN_TOKEN)
**Namespaces (auth mode only):**
- `POST /api/namespaces` — Register namespace (returns auth token)
## Notes
- Events auto-generate UUIDs (no need to provide eventId)
- Properties **merge** on updates — only send changed properties, existing ones are preserved
- Timestamp field must be epoch milliseconds (i64) — required by the API, auto-generated by flux.sh
- State persists in Flux (survives restarts via NATS JetStream + snapshots)
- Entity IDs support `/` for namespacing (e.g., `scada/pump-01`)
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/flux.sh
```bash
#!/usr/bin/env bash
# Flux CLI helper
FLUX_URL="${FLUX_URL:-https://api.flux-universe.com}"
FLUX_TOKEN="${FLUX_TOKEN:-}"
FLUX_ADMIN_TOKEN="${FLUX_ADMIN_TOKEN:-}"
# Helper function for API calls
api_call() {
local method=$1
local endpoint=$2
local data=$3
local auth_header=""
if [[ -n "$FLUX_TOKEN" ]]; then
auth_header="-H \"Authorization: Bearer ${FLUX_TOKEN}\""
fi
local -a cmd=(curl -s -X "$method" "${FLUX_URL}${endpoint}" -H "Content-Type: application/json")
if [[ -n "$FLUX_TOKEN" ]]; then
cmd+=(-H "Authorization: Bearer ${FLUX_TOKEN}")
fi
if [[ -n "$data" ]]; then
cmd+=(-d "$data")
fi
"${cmd[@]}"
}
# Format JSON output (pretty print if jq available)
format_output() {
if command -v jq &> /dev/null; then
jq '.'
else
python3 -m json.tool 2>/dev/null || cat
fi
}
# Commands
case "${1:-}" in
publish)
stream="$2"
source="$3"
entity_id="$4"
properties="$5"
if [[ -z "$stream" || -z "$source" || -z "$entity_id" || -z "$properties" ]]; then
echo "Usage: flux.sh publish STREAM SOURCE ENTITY_ID PROPERTIES_JSON"
echo ""
echo "Example:"
echo " flux.sh publish sensors agent-01 temp-sensor-01 '{\"temperature\":22.5}'"
exit 1
fi
timestamp=$(date +%s)000 # Unix epoch milliseconds
payload=$(cat <<EOF
{
"entity_id": "${entity_id}",
"properties": ${properties}
}
EOF
)
event=$(cat <<EOF
{
"stream": "${stream}",
"source": "${source}",
"timestamp": ${timestamp},
"payload": ${payload}
}
EOF
)
echo "Publishing event to Flux..."
api_call POST "/api/events" "$event" | format_output
;;
batch)
events="$2"
if [[ -z "$events" ]]; then
echo "Usage: flux.sh batch EVENTS_JSON_ARRAY"
exit 1
fi
batch_payload="{\"events\": ${events}}"
echo "Publishing batch to Flux..."
api_call POST "/api/events/batch" "$batch_payload" | format_output
;;
get)
entity_id="$2"
if [[ -z "$entity_id" ]]; then
echo "Usage: flux.sh get ENTITY_ID"
exit 1
fi
# URL-encode slashes in entity_id
encoded_id=$(echo "$entity_id" | sed 's|/|%2F|g')
api_call GET "/api/state/entities/${encoded_id}" | format_output
;;
list)
query=""
if [[ "$2" == "--prefix" && -n "$3" ]]; then
query="?prefix=$3"
elif [[ "$2" == "--namespace" && -n "$3" ]]; then
query="?namespace=$3"
fi
api_call GET "/api/state/entities${query}" | format_output
;;
delete)
entity_id="$2"
if [[ -z "$entity_id" ]]; then
echo "Usage: flux.sh delete ENTITY_ID"
exit 1
fi
encoded_id=$(echo "$entity_id" | sed 's|/|%2F|g')
echo "Deleting entity ${entity_id}..."
api_call DELETE "/api/state/entities/${encoded_id}" | format_output
;;
batch-delete)
filter="$2"
if [[ -z "$filter" ]]; then
echo "Usage: flux.sh batch-delete FILTER_JSON"
echo ""
echo "Examples:"
echo ' flux.sh batch-delete '"'"'{"prefix":"loadtest-"}'"'"
echo ' flux.sh batch-delete '"'"'{"namespace":"matt"}'"'"
echo ' flux.sh batch-delete '"'"'{"entity_ids":["id1","id2"]}'"'"
exit 1
fi
echo "Batch deleting entities..."
api_call POST "/api/state/entities/delete" "$filter" | format_output
;;
connectors)
api_call GET "/api/connectors" | format_output
;;
admin-config)
if [[ -n "$2" ]]; then
# Update config
if [[ -z "$FLUX_ADMIN_TOKEN" ]]; then
echo "Error: FLUX_ADMIN_TOKEN not set"
exit 1
fi
curl -s -X PUT "${FLUX_URL}/api/admin/config" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${FLUX_ADMIN_TOKEN}" \
-d "$2" | format_output
else
# Read config
api_call GET "/api/admin/config" | format_output
fi
;;
health)
echo "Testing Flux connection at ${FLUX_URL}..."
response=$(api_call GET "/api/state/entities")
if [[ $? -eq 0 && -n "$response" ]]; then
echo "✓ Flux is reachable"
entity_count=$(echo "$response" | python3 -c "import sys,json;print(len(json.load(sys.stdin)))" 2>/dev/null || echo "?")
echo " Entities in state: ${entity_count}"
else
echo "✗ Failed to reach Flux at ${FLUX_URL}"
exit 1
fi
;;
*)
echo "Flux CLI - Interact with Flux state engine"
echo ""
echo "Usage: flux.sh COMMAND [ARGS]"
echo ""
echo "Commands:"
echo " publish STREAM SOURCE ENTITY_ID PROPERTIES_JSON"
echo " Publish event to create/update entity"
echo ""
echo " get ENTITY_ID"
echo " Query current state of entity"
echo ""
echo " list [--prefix PREFIX] [--namespace NS]"
echo " List all entities (optional filter)"
echo ""
echo " delete ENTITY_ID"
echo " Delete a single entity"
echo ""
echo " batch-delete FILTER_JSON"
echo " Batch delete by prefix/namespace/IDs"
echo ""
echo " batch EVENTS_JSON_ARRAY"
echo " Publish multiple events at once"
echo ""
echo " connectors"
echo " List connector status"
echo ""
echo " admin-config [UPDATE_JSON]"
echo " Read or update runtime config"
echo ""
echo " health"
echo " Test connection to Flux"
echo ""
echo "Environment:"
echo " FLUX_URL=${FLUX_URL}"
echo " FLUX_TOKEN=(auth token for write operations)"
echo " FLUX_ADMIN_TOKEN=(admin token for config updates)"
exit 1
;;
esac
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### README.md
```markdown
# Flux Skill for OpenClaw
An OpenClaw skill that enables agents to publish events and query shared world state via [Flux](https://github.com/EckmanTechLLC/flux) — a persistent, event-sourced state engine.
## What is Flux?
Flux is a state engine that ingests immutable events and derives canonical world state. Multiple agents can:
- Publish observations as events
- Query current state of entities
- Coordinate through shared state
- Audit full event history
**Key difference from pub/sub:** Flux owns state derivation. Agents don't process raw events—they observe Flux's canonical state.
## Installation
### Local Testing (Option 1)
Copy skill to OpenClaw workspace:
```bash
# On OpenClaw VM
mkdir -p ~/workspace/skills
scp -r <your-host>:/path/to/flux-interact ~/workspace/skills/
```
OpenClaw will auto-discover on next startup.
### ClawHub Install (Option 2)
Install from: https://clawhub.ai/EckmanTechLLC/flux
## Prerequisites
1. **Flux:** public instance at `https://api.flux-universe.com` (default), or local at `http://localhost:3000` (set `FLUX_URL` to override)
2. **curl** installed (required)
3. **jq** recommended (optional, has fallback)
Verify connection:
```bash
cd ~/workspace/skills/flux-interact
./scripts/flux.sh health
```
## Usage Examples
> **Note:** entity IDs must be prefixed with your namespace (e.g. `dawn-coral/sensor-01`, not `sensor-01`)
### For OpenClaw Agents
Agents can naturally use Flux via the skill:
**User:** "What's the temperature of sensor-01?"
**Agent:** Let me check the current state in Flux.
```bash
./scripts/flux.sh get temp-sensor-01
```
**User:** "Record that room-101 temperature is 22.5 celsius"
**Agent:** I'll publish that observation to Flux.
```bash
./scripts/flux.sh publish sensors assistant room-101 '{"temperature":22.5,"unit":"celsius"}'
```
**User:** "Show me all entities we're tracking"
**Agent:**
```bash
./scripts/flux.sh list
```
### Direct CLI Usage
```bash
# Publish sensor reading
./scripts/flux.sh publish sensors agent-01 temp-sensor-01 '{"temperature":22.5,"unit":"celsius"}'
# Query entity state
./scripts/flux.sh get temp-sensor-01
# List all entities
./scripts/flux.sh list
# Test connection
./scripts/flux.sh health
```
## Use Cases
### Multi-Agent Coordination
Multiple OpenClaw instances share state via Flux:
```bash
# Agent A on VM1 observes
flux.sh publish sensors agent-a room-101 '{"temperature":22.5}'
# Agent B on VM2 queries
flux.sh get room-101
# Returns: {"temperature":22.5,...}
```
### Status Tracking
Monitor services across infrastructure:
```bash
# Publish service status
flux.sh publish system monitor api-gateway '{"status":"healthy","latency_ms":45}'
# Query current status
flux.sh get api-gateway
```
### Event Sourcing
All state changes are auditable:
- Events immutable (persisted in NATS)
- State derived from event history
- Can replay/debug past states
## Configuration
### Custom Flux URL
Default is `https://api.flux-universe.com`. Override for local instances:
```bash
export FLUX_URL=http://localhost:3000
./scripts/flux.sh health
```
## File Structure
```
flux-interact/
├── SKILL.md # Skill definition for OpenClaw
├── scripts/
│ └── flux.sh # CLI tool (bash + curl)
├── references/
│ └── api.md # Full API documentation
└── README.md # This file
```
## How It Works
1. **OpenClaw loads SKILL.md** when Flux is mentioned
2. **Agent reads instructions** on how to use scripts
3. **Agent calls flux.sh** with appropriate command
4. **Script makes HTTP requests** to Flux API
5. **Returns JSON** for agent to process
## Architecture
```
OpenClaw Agent
↓
SKILL.md instructions
↓
flux.sh script
↓
curl → Flux API (https://api.flux-universe.com or http://localhost:3000)
↓
Event Ingestion → NATS → State Engine → Query Response
```
## Limitations
- No WebSocket subscription support in flux.sh (use wscat or a WebSocket client directly against `GET /api/ws`)
- State is domain-agnostic (no schema validation)
## Contributing
This skill is part of [Flux](https://github.com/EckmanTechLLC/flux).
## License
*To be determined*
```
### _meta.json
```json
{
"owner": "eckmantechllc",
"slug": "flux",
"displayName": "Flux",
"latest": {
"version": "2.3.0",
"publishedAt": 1772144949174,
"commit": "https://github.com/openclaw/skills/commit/270e2c814a8fe823a2ad5c828f55b613ef02fcdb"
},
"history": [
{
"version": "1.0.1",
"publishedAt": 1771610463912,
"commit": "https://github.com/openclaw/skills/commit/9ef15e3de5e4c4915d0badadae254798ee610483"
},
{
"version": "1.1.0",
"publishedAt": 1771092153456,
"commit": "https://github.com/openclaw/skills/commit/78aa6ad38b00d9504366c96c44232c70d7e1e17b"
},
{
"version": "1.0.0",
"publishedAt": 1770836688466,
"commit": "https://github.com/openclaw/skills/commit/41eddaeb80aca0ca184c221bfdcbba65455ad996"
}
]
}
```
### references/api.md
```markdown
# Flux API Reference
**Base URL:** `https://api.flux-universe.com` (or `http://localhost:3000` for local instances)
---
## Event Ingestion
### POST /api/events
Publish single event to Flux.
**Request:**
```json
{
"stream": "sensors",
"source": "agent-01",
"timestamp": 1739290200000,
"payload": {
"entity_id": "temp-sensor-01",
"properties": {
"temperature": 22.5,
"unit": "celsius"
}
}
}
```
**Optional Fields:**
- `eventId` - Auto-generated UUIDv7 if omitted
- `timestamp` - Required. Unix epoch milliseconds
(e.g. `date +%s000` in bash, `int(time.time()*1000)` in Python).
flux.sh generates this automatically.
- `key` - Optional ordering/grouping hint
- `schema` - Optional schema metadata
**Response:**
```json
{
"eventId": "019c4da0-3e28-75c3-991b-ae7b2576485a",
"stream": "sensors"
}
```
---
### POST /api/events/batch
Publish multiple events at once.
**Request:**
```json
{
"events": [
{
"stream": "sensors",
"source": "agent-01",
"payload": {
"entity_id": "sensor-01",
"properties": {"temp": 22}
}
},
{
"stream": "sensors",
"source": "agent-01",
"payload": {
"entity_id": "sensor-02",
"properties": {"temp": 23}
}
}
]
}
```
**Response:**
```json
{
"successful": 2,
"failed": 0,
"results": [
{
"eventId": "019c4da0-...",
"stream": "sensors"
},
{
"eventId": "019c4da1-...",
"stream": "sensors"
}
]
}
```
---
## State Query
### GET /api/state/entities
List all entities in current world state.
**Response:**
```json
[
{
"id": "temp-sensor-01",
"properties": {
"temperature": 22.5,
"unit": "celsius"
},
"lastUpdated": "2026-02-11T16:54:33.260296395+00:00"
},
{
"id": "temp-sensor-02",
"properties": {
"temperature": 23.8,
"unit": "celsius"
},
"lastUpdated": "2026-02-11T16:55:12.123456789+00:00"
}
]
```
---
### GET /api/state/entities/:id
Get specific entity by ID.
**Response (200 OK):**
```json
{
"id": "temp-sensor-01",
"properties": {
"temperature": 22.5,
"unit": "celsius",
"location": "lab-A"
},
"lastUpdated": "2026-02-11T16:54:33.260296395+00:00"
}
```
**Response (404 Not Found):**
```json
{
"error": "Entity not found"
}
```
---
## State Derivation Model
### How Events Become State
1. **Event Published:**
```json
{
"stream": "sensors",
"payload": {
"entity_id": "sensor-01",
"properties": {"temperature": 22.5}
}
}
```
2. **Flux Processes Event:**
- Validates envelope
- Persists to NATS JetStream
- State engine consumes event
3. **State Derived:**
- Entity `sensor-01` created/updated
- Property `temperature` set to `22.5`
- `lastUpdated` timestamp recorded
4. **Query Returns Current State:**
```json
{
"id": "sensor-01",
"properties": {"temperature": 22.5},
"lastUpdated": "2026-02-11T..."
}
```
### Property Updates
Properties merge on updates (last write wins per property):
```bash
# Event 1: Set temperature and unit
{"entity_id": "sensor-01", "properties": {"temperature": 22.5, "unit": "celsius"}}
# Event 2: Update temperature only
{"entity_id": "sensor-01", "properties": {"temperature": 23.0}}
# Result: Both properties preserved
{"temperature": 23.0, "unit": "celsius"}
```
---
## WebSocket Subscription
Real-time state updates via WebSocket (use wscat or a WebSocket client — not included in flux.sh):
**Endpoint:** `wss://api.flux-universe.com/api/ws` (or `ws://localhost:3000/api/ws` for local instances)
**Subscribe Message:**
```json
{
"type": "subscribe",
"entity_id": "temp-sensor-01"
}
```
**Update Notification:**
```json
{
"type": "state_update",
"entity_id": "temp-sensor-01",
"property": "temperature",
"value": 24.0,
"timestamp": "2026-02-11T16:54:33.260296395+00:00"
}
```
---
## Error Responses
**400 Bad Request:**
```json
{
"error": "stream is required"
}
```
**500 Internal Server Error:**
```json
{
"error": "Failed to publish event to NATS"
}
```
---
## Architecture Notes
**Event Flow:**
```
Agent → POST /api/events → Validation → NATS JetStream → State Engine → In-Memory State (DashMap)
↓
Agent ← GET /api/state/entities ← Query API ← In-Memory State
```
**Key Characteristics:**
- Events are immutable (never modified)
- State is derived (not directly written)
- NATS provides durability (events persist)
- State engine rebuilds from events on restart
- Multiple agents observe same canonical state
```