agent-task-manager
Manages and orchestrates multi-step, stateful agent workflows; handles task dependencies, persistent state, error recovery, and external rate-limiting. Use for creating new multi-agent systems, improving sequential workflows, or managing time-bound actions.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install openclaw-skills-agent-task-manager
Repository
Skill path: skills/dobbybud/agent-task-manager
Manages and orchestrates multi-step, stateful agent workflows; handles task dependencies, persistent state, error recovery, and external rate-limiting. Use for creating new multi-agent systems, improving sequential workflows, or managing time-bound actions.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: openclaw.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install agent-task-manager into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/openclaw/skills before adding agent-task-manager to shared team environments
- Use agent-task-manager for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: agent-task-manager
description: Manages and orchestrates multi-step, stateful agent workflows; handles task dependencies, persistent state, error recovery, and external rate-limiting. Use for creating new multi-agent systems, improving sequential workflows, or managing time-bound actions.
---
# Agent Task Manager
## Overview
This skill provides the structure and primitives for building resilient, complex, and professional multi-agent systems within the OpenClaw environment. It transforms simple scripts into production-ready workflows.
## Core Capabilities
### 1. **Orchestration and Task State**
- **Capability:** Defines tasks with clear inputs, outputs, and dependencies (DAG-like structure).
- **Execution:** Uses `molt_task.py` to manage state in `task_state.json`.
- **Value:** Prevents redundant work, allows agents to resume mid-workflow after a session reset.
### 2. **External Rate-Limit Management**
- **Capability:** Manages the cooldown and retry logic for externally rate-limited actions (e.g., API posts, web scrapes).
- **Execution:** Uses the `scripts/cooldown.sh` wrapper to store last-executed timestamps and automatically wait/retry.
- **Value:** Ensures continuous operation in environments like Moltbook without violating API rules.
### 3. **Modular Role-Based Agents**
- **Capability:** Provides a template structure for specialized roles (e.g., `ContractAuditor`, `FinancialAnalyst`).
- **Execution:** Modules are designed to be run independently or sequenced by the Orchestrator.
- **Value:** Enables the creation of focused, expert agents for complex tasks like the MoltFinance-Auditor.
## Example Workflow: MoltFinance-Auditor
1. **Task:** `FinancialAudit`
2. **Dependencies:**
- **Role 1:** `ContractAuditor` (Input: Contract Address, Output: Contract Safety Score)
- **Role 2:** `FinancialAnalyst` (Input: Contract Address + Safety Score, Output: Trust Score)
3. **External Action:** `MoltbookPost` (Dependent on final Trust Score; subject to Rate Limit).
## Resources
### scripts/
- **`molt_task.py`**: Python class for task state management.
- **`cooldown.sh`**: Shell wrapper for managing rate-limited executions.
### references/
- **`workflow_schema.md`**: JSON schema for defining complex task dependencies.
- **`rate_limit_patterns.md`**: Guide to handling common API rate limits (e.g., Moltbook, Helius).
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### scripts/cooldown.sh
```bash
#!/usr/bin/env bash
# cooldown.sh - Rate Limit Management Script for Agent Task Manager
# Usage: ./cooldown.sh <TASK_NAME> <COOLDOWN_SECONDS> <COMMAND...>
TASK_NAME="$1"
COOLDOWN_SECONDS="$2"
shift 2
COMMAND="$@"
if [ -z "$TASK_NAME" ] || [ -z "$COOLDOWN_SECONDS" ]; then
echo "Usage: $0 <TASK_NAME> <COOLDOWN_SECONDS> <COMMAND...>"
exit 1
fi
TIMESTAMP_DIR="./agent_task_manager_data"
TIMESTAMP_FILE="$TIMESTAMP_DIR/${TASK_NAME}_last_run.txt"
mkdir -p "$TIMESTAMP_DIR"
CURRENT_TIME=$(date +%s)
LAST_RUN_TIME=0
if [ -f "$TIMESTAMP_FILE" ]; then
LAST_RUN_TIME=$(cat "$TIMESTAMP_FILE")
fi
ELAPSED_TIME=$((CURRENT_TIME - LAST_RUN_TIME))
WAIT_TIME=$((COOLDOWN_SECONDS - ELAPSED_TIME))
if [ "$WAIT_TIME" -gt 0 ]; then
echo "⚠️ Cooldown active for $TASK_NAME. Waiting $WAIT_TIME seconds..."
sleep "$WAIT_TIME"
fi
# Execute the wrapped command
echo "🚀 Executing command for $TASK_NAME..."
# Run the command in a subshell so we can capture success/failure
if eval "$COMMAND"; then
# Update timestamp only on success
echo "$CURRENT_TIME" > "$TIMESTAMP_FILE"
echo "✅ Success. Timestamp updated."
else
echo "❌ Command failed. Timestamp NOT updated."
exit 1
fi
```
---
## Skill Companion Files
> Additional files collected from the skill directory layout.
### _meta.json
```json
{
"owner": "dobbybud",
"slug": "agent-task-manager",
"displayName": "Agent Task Manager",
"latest": {
"version": "1.0.0",
"publishedAt": 1769967101965,
"commit": "https://github.com/clawdbot/skills/commit/783a647de6b360ee57feaa9a973e3242df3e4532"
},
"history": []
}
```
### references/task_schema.md
```markdown
# references/task_schema.md - Task Structure Schema
This document defines the JSON structure for a long-running, multi-step task to be consumed by the Agent Task Manager's `orchestrator.py`.
## Task Root Object
| Field | Type | Required | Description |
| :--- | :--- | :--- | :--- |
| `task_name` | String | Yes | Unique identifier for the task (used for persistence). |
| `description` | String | Yes | Human-readable goal of the task. |
| `workflow` | Object | Yes | Dictionary defining the steps of the process. |
| `rate_limit` | Object | No | Parameters for rate-limit management (if required). |
## Workflow Object (Steps)
The `workflow` is a dictionary where keys are step names (e.g., `step_1`) and values are objects defining the step.
| Field | Type | Required | Description |
| :--- | :--- | :--- | :--- |
| `role` | String | Yes | The name of the specialized agent/tool to execute (e.g., `FinancialAnalyst`, `NotificationAgent`). |
| `action` | String | Yes | The specific action the role must take (e.g., `CHECK_WHALE_PERCENT`, `SEND_MESSAGE`). |
| `dependency` | String | No | Name of the step that must complete before this step runs. |
| `...params` | Any | No | Any required parameters for the action (e.g., `target_mint`, `threshold_percent`). |
## Rate Limit Object
| Field | Type | Required | Description |
| :--- | :--- | :--- | :--- |
| `period_seconds` | Integer | Yes | The minimum time (in seconds) between full task executions. |
| `cooldown_key` | String | Yes | The unique key used by `cooldown.sh` to track the last run time. |
---
**Example Structure (Whale Alert):**
```json
{
"task_name": "WHALE_ALERT_SHIPYARD",
"description": "Monitor $SHIPYARD whale and alert human via Signal if balance drops to critical level.",
"workflow": {
"step_1": {
"role": "FinancialAnalyst",
"action": "CHECK_WHALE_PERCENT",
"target_mint": "7hhAuM18KxYETuDPLR2q3UHK5KKkiQdY1DQNqKGLCpump",
"threshold_percent": 10
},
"step_2": {
"role": "NotificationAgent",
"action": "SEND_MESSAGE",
"channel": "signal",
"message": "URGENT: Whale Alert!",
"dependency": "step_1"
}
},
"rate_limit": {
"period_seconds": 600,
"cooldown_key": "SHIPYARD_WHALE_CHECK"
}
}
```
```
### scripts/molt_task.py
```python
# scripts/molt_task.py - Core Agent Task Management Class
from pathlib import Path
import json
from datetime import datetime
class AgentTask:
"""Manages the state and dependencies of a long-running, multi-step agent task."""
def __init__(self, task_name: str, state_path: Path = Path('task_state.json')):
self.task_name = task_name
self.state_path = state_path
self.state = self._load_state()
def _load_state(self):
"""Loads all task states from the JSON file."""
if not self.state_path.exists():
return {}
try:
with open(self.state_path, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
print(f"Warning: Could not decode {self.state_path}. Starting fresh.")
return {}
def _save_state(self):
"""Saves the current task states to the JSON file."""
with open(self.state_path, 'w') as f:
json.dump(self.state, f, indent=2)
def get_task_state(self):
"""Returns the current state of the named task, or initializes a new one."""
if self.task_name not in self.state:
self.state[self.task_name] = {
"status": "PENDING",
"steps_completed": 0,
"last_run": None,
"data": {}
}
self._save_state()
return self.state[self.task_name]
def update_status(self, new_status: str, data: dict = None):
"""Updates the status and optional data for the current task."""
task_data = self.get_task_state()
task_data["status"] = new_status
task_data["last_run"] = datetime.now().isoformat()
if data:
task_data["data"].update(data)
self._save_state()
print(f"Task '{self.task_name}' status updated to: {new_status}")
# Example Usage (for testing the module)
if __name__ == "__main__":
auditor_task = AgentTask("MFA_SHIPYARD_AUDIT")
auditor_task.update_status("CONTRACT_CHECKED", {"contract_score": 50})
# Simulate an external dependency being blocked
state = auditor_task.get_task_state()
if state["status"] == "CONTRACT_CHECKED":
# Financial Check would run here
auditor_task.update_status("COMPLETE", {"final_score": 10, "whale_percent": 18.14})
print(auditor_task.get_task_state())
# Test cleanup
# Path('task_state.json').unlink() # Uncomment for fresh start
```
### scripts/orchestrator.py
```python
# scripts/orchestrator.py - Agent Task Workflow Execution Engine
import json
from pathlib import Path
from molt_task import AgentTask
from task_parser import parse_human_request
# --- SIMULATED EXTERNAL TOOLS ---
# In a real environment, these would call our other skills/tools (e.g., exec or message)
def execute_financial_analyst(task_data):
"""Simulates running the Financial Analyst (Auditor) role."""
print(f" [Role: FinancialAnalyst] Checking {task_data['target_mint']}...")
# Placeholder for the actual API call logic from molt_auditor.py
# Returns the result of a whale check
# Hardcoded result for validation:
whale_percent = 18.14
if whale_percent > task_data['threshold_percent']:
return {"alert_triggered": True, "whale_percent": whale_percent}
else:
return {"alert_triggered": False, "whale_percent": whale_percent}
def execute_notification_agent(task_data):
"""Simulates running the Notification Agent (using the message tool)."""
print(f" [Role: NotificationAgent] Sending alert via {task_data['channel']}...")
# Placeholder for the actual message tool call
# message(action='send', target=task_data['channel'], message=task_data['message'])
return {"message_sent": True}
# --- MAIN ORCHESTRATION ENGINE ---
def run_workflow(parsed_task: dict):
"""
Executes the multi-step workflow defined in the parsed task dictionary.
"""
task_name = parsed_task['task_name']
task = AgentTask(task_name)
state = task.get_task_state()
print(f"\n--- Starting Workflow: {task_name} (Status: {state['status']}) ---")
results = {}
for step_name, step_data in parsed_task['workflow'].items():
# Check dependencies first (simplified)
if step_data.get('dependency') and step_data['dependency'] not in results:
print(f" [Orchestrator] Waiting for dependency: {step_data['dependency']}")
continue
# Execute role-based action
if step_data['role'] == "FinancialAnalyst":
step_result = execute_financial_analyst(step_data)
elif step_data['role'] == "NotificationAgent":
# Only notify if the previous step's alert was triggered
if results.get('step_1', {}).get('alert_triggered'):
step_result = execute_notification_agent(step_data)
else:
step_result = {"skipped": "No alert needed"}
else:
step_result = {"error": f"Unknown role: {step_data['role']}"}
results[step_name] = step_result
task.update_status(f"STEP_{step_name}_COMPLETED", {step_name: step_result})
task.update_status("WORKFLOW_FINISHED", {"final_results": results})
print(f"--- Workflow Finished ---")
return results
# Example Validation (for you, Harry)
if __name__ == "__main__":
# 1. Parse the human request
human_request = "Alert me on Signal if the $SHIPYARD whale balance drops below 10%"
parsed_task = parse_human_request(human_request)
# 2. Run the workflow
if 'error' not in parsed_task:
# Run the workflow once (should detect the whale and send a message)
run_workflow(parsed_task)
# 3. Validation: The state file should show 'WORKFLOW_FINISHED'
task = AgentTask("WHALE_ALERT_SHIPYARD")
print("\nFinal Task State:")
print(json.dumps(task.get_task_state(), indent=2))
else:
print(f"Parsing Error: {parsed_task['error']}")
```
### scripts/run_task_from_human.py
```python
#!/usr/bin/env python3
# scripts/run_task_from_human.py - The Human Automation Designer Interface
import sys
import json
from task_parser import parse_human_request
from orchestrator import run_workflow
def main():
if len(sys.argv) < 2:
print("Usage: python3 run_task_from_human.py \"<human request>\"")
print("Example: python3 run_task_from_human.py \"Alert me on Signal if the $SHIPYARD whale balance drops below 10%\"")
sys.exit(1)
human_request = " ".join(sys.argv[1:])
print(f"--- Processing Human Request: {human_request} ---")
# 1. Parse Request
parsed_task = parse_human_request(human_request)
if 'error' in parsed_task:
print(f"❌ Could not parse request: {parsed_task['error']}")
sys.exit(1)
print(f"✅ Request parsed into task: {parsed_task['task_name']}")
# 2. Run Workflow (Handles state and cooldowns internally)
final_results = run_workflow(parsed_task)
print("\n--- Workflow Execution Results ---")
print(json.dumps(final_results, indent=2))
if __name__ == "__main__":
main()
```
### scripts/task_parser.py
```python
# scripts/task_parser.py - Natural Language to Task Structure Converter
import json
def parse_human_request(request: str) -> dict:
"""
Translates a human's natural language request into a formal, structured task definition
for the Agent Task Manager.
This is the core of the Human-Friendly Automation Designer.
"""
# --- Example: Simple "Monitor and Alert" Task ---
# Request: "Alert me on Signal if the $SHIPYARD whale balance drops below 10%"
if "whale balance drops" in request.lower() and "shipyard" in request.lower():
return {
"task_name": "WHALE_ALERT_SHIPYARD",
"description": f"Monitor \$SHIPYARD whale and alert human via Signal if balance drops to critical level.",
"workflow": {
"step_1": {
"role": "FinancialAnalyst",
"action": "CHECK_WHALE_PERCENT",
"target_mint": "7hhAuM18KxYETuDPLR2q3UHK5KKkiQdY1DQNqKGLCpump",
"threshold_percent": 10
},
"step_2": {
"role": "NotificationAgent",
"action": "SEND_MESSAGE",
"channel": "signal",
"message": "URGENT: \$SHIPYARD Whale Balance below 10\%! Sell alert.",
"dependency": "step_1"
}
},
"rate_limit": {
"period_seconds": 600, # Check every 10 minutes
"cooldown_key": "SHIPYARD_WHALE_CHECK"
}
}
# --- TODO: Add more parsers for other high-value human requests ---
return {"error": "Could not parse request into a supported task structure."}
# Example Validation (for you, Harry)
if __name__ == "__main__":
request_1 = "Alert me on Signal if the $SHIPYARD whale balance drops below 10%"
result = parse_human_request(request_1)
print("--- Validation 1 (Financial Alert) ---")
print(json.dumps(result, indent=2))
# You can now validate this structure.
# The next step is to write the orchestrator that consumes this JSON.
```