Back to skills
SkillHub ClubShip Full StackFull Stack

rpa-patterns

Imported from https://github.com/omerlefaruk/CasareRPA.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
0
Hot score
74
Updated
March 20, 2026
Overall rating
C2.9
Composite score
2.9
Best-practice grade
F32.4

Install command

npx @skill-hub/cli install omerlefaruk-casarerpa-rpa-patterns

Repository

omerlefaruk/CasareRPA

Skill path: .claude/skills/rpa-patterns

Imported from https://github.com/omerlefaruk/CasareRPA.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: omerlefaruk.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install rpa-patterns into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/omerlefaruk/CasareRPA before adding rpa-patterns to shared team environments
  • Use rpa-patterns for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: rpa-patterns
description: Common RPA automation patterns (retry, polling, circuit breaker) for resilient automation. Use when: implementing RPA workflows, error handling, resilient automation, retry with backoff, polling for conditions, circuit breaker patterns.
---

# RPA Patterns Skill

Resilience and retry patterns for reliable RPA automation.

## Overview

RPA automation must handle unreliable external systems: network timeouts, dynamic UI loading, rate limits, and transient failures. This skill provides proven patterns for building robust automations.

## Quick Reference

| Pattern | Use Case | Example |
|---------|----------|---------|
| **Retry with Backoff** | Transient failures, network issues | `examples/retry-pattern.py` |
| **Polling** | Wait for UI element or condition | `examples/polling-pattern.py` |
| **Circuit Breaker** | Prevent cascade failures, rate limits | `examples/circuit-breaker-pattern.py` |
| **Continue on Error** | Non-critical operations, data collection | Inline below |
| **Recovery Strategy** | Graceful degradation, fallback actions | Inline below |

## Built-in BrowserBaseNode Support

Browser nodes already have retry support via `execute_with_retry()`:

```python
from casare_rpa.nodes.browser.browser_base import BrowserBaseNode

class MyClickNode(BrowserBaseNode):
    async def execute(self, context):
        page = await self.get_page(context)

        # Built-in retry (uses retry_count, retry_interval from config)
        result, attempts = await self.execute_with_retry(
            lambda: page.click("#submit-btn"),
            operation_name="click submit"
        )
```

Config properties (auto-generated in visual nodes):
- `retry_count`: int = 0 (number of retries after initial failure)
- `retry_interval`: int = 1000 (ms between retries)

## Pattern 1: Exponential Backoff Retry

For operations without built-in retry support.

```python
import asyncio
from loguru import logger

async def retry_with_backoff(
    operation: Callable[[], Awaitable[T]],
    max_attempts: int = 3,
    base_delay_ms: int = 1000,
    max_delay_ms: int = 10000,
) -> T:
    """Retry with exponential backoff: 1s, 2s, 4s, 8s..."""
    for attempt in range(1, max_attempts + 1):
        try:
            return await operation()
        except Exception as e:
            if attempt == max_attempts:
                raise

            delay = min(base_delay_ms * (2 ** (attempt - 1)), max_delay_ms)
            logger.warning(f"Attempt {attempt}/{max_attempts} failed: {e}. Retrying in {delay}ms")
            await asyncio.sleep(delay / 1000)
```

## Pattern 2: Polling for Condition

Wait for dynamic UI elements or async operations.

```python
async def poll_for_condition(
    condition: Callable[[], Awaitable[bool]],
    timeout_ms: int = 30000,
    interval_ms: int = 500,
) -> bool:
    """Poll until condition is true or timeout."""
    deadline = asyncio.get_event_loop().time() + (timeout_ms / 1000)

    while asyncio.get_event_loop().time() < deadline:
        if await condition():
            return True
        await asyncio.sleep(interval_ms / 1000)

    raise TimeoutError(f"Condition not met within {timeout_ms}ms")
```

**Usage with Playwright:**

```python
# Instead of hardcoded sleep
await asyncio.sleep(5)  # BAD

# Use polling
async def is_loaded(page):
    return await page.locator(".data-loaded").count() > 0

await poll_for_condition(lambda: is_loaded(page))  # GOOD
```

## Pattern 3: Circuit Breaker

Prevent hammering failing services and enable auto-recovery.

```python
from dataclasses import dataclass
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing recovery

@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    timeout_ms: int = 60000
    half_open_attempts: int = 1

    def __post_init__(self):
        self._state = CircuitState.CLOSED
        self._failures = 0
        self._last_failure_time = 0

    async def call(self, operation: Callable[[], Awaitable[T]]) -> T:
        """Execute operation with circuit breaker protection."""
        if self._state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self._state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit is OPEN")

        try:
            result = await operation()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
```

## Pattern 4: Continue on Error

For non-critical operations where failure should not stop workflow.

```python
@properties(
    PropertyDef("on_error", PropertyType.CHOICE, default="fail",
                options=["fail", "continue", "retry"]),
)
@node(category="data")
class DataCollectNode(BaseNode):
    async def execute(self, context):
        on_error = self.get_parameter("on_error")
        results = []

        for item in items:
            try:
                result = await self.process_item(item)
                results.append(result)
            except Exception as e:
                logger.error(f"Failed to process {item}: {e}")

                if on_error == "fail":
                    raise
                elif on_error == "continue":
                    results.append({"error": str(e)})
                elif on_error == "retry":
                    # Retry logic here
                    pass

        self.set_output_value("results", results)
```

## Pattern 5: Recovery Strategy

Fallback actions when primary operation fails.

```python
async def execute_with_fallback(
    primary: Callable[[], Awaitable[T]],
    fallback: Callable[[], Awaitable[T]],
    fallback_reason: str = "primary failed",
) -> T:
    """Try primary, use fallback on failure."""
    try:
        return await primary()
    except Exception as e:
        logger.warning(f"Primary operation failed: {e}. Using fallback: {fallback_reason}")
        return await fallback()
```

## RPA-Specific Error Handling

### Screenshot on Failure

Already built into BrowserBaseNode:

```python
# In node config, enable:
# screenshot_on_fail: True
# screenshot_on_fail: "./screenshots"

# Or manually:
await self.screenshot_on_failure(page, prefix="click_failed")
```

### Element Not Found Recovery

```python
async def click_with_recovery(page, selector):
    """Click with multiple selector strategies."""
    try:
        await page.click(selector)
    except Error:
        # Try alternative selectors
        for alt in get_alternative_selectors(selector):
            try:
                await page.click(alt)
                return
            except Error:
                continue
        raise
```

### Dynamic Wait Strategies

```python
async def smart_wait(page, selector, timeout_ms=30000):
    """Wait with progressive timeouts."""
    # Fast check first
    try:
        if await page.locator(selector).count() > 0:
            return
    except Error:
        pass

    # Then full wait
    await page.wait_for_selector(selector, timeout=timeout_ms)
```

## Examples

See `examples/` folder for complete implementations:

| File | Description |
|------|-------------|
| `retry-pattern.py` | Exponential backoff with jitter |
| `polling-pattern.py` | Condition polling with timeout |
| `circuit-breaker-pattern.py` | Full circuit breaker with metrics |

## Cross-References

| Topic | Location |
|-------|----------|
| BrowserBaseNode retry | `src/casare_rpa/nodes/browser/browser_base.py` |
| Node error handling | `.brain/rules/error-handling.md` |
| Control flow nodes | `src/casare_rpa/nodes/control_flow/` |

---

*Parent: [../_index.md](../_index.md)*
*Last updated: 2025-12-26*
rpa-patterns | SkillHub