cloudflare-queues
Cloudflare Queues message queue playbook: producers, consumers, Workers integration, message batching, retries, dead letter queues, delivery delay, concurrency, pull consumers, HTTP API. Keywords: Cloudflare Queues, message queue, producer, consumer, Workers binding, batch, retry, DLQ, dead letter queue, pull consumer, at-least-once delivery.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install itechmeat-llm-code-cloudflare-queues
Repository
Skill path: skills/cloudflare-queues
Cloudflare Queues message queue playbook: producers, consumers, Workers integration, message batching, retries, dead letter queues, delivery delay, concurrency, pull consumers, HTTP API. Keywords: Cloudflare Queues, message queue, producer, consumer, Workers binding, batch, retry, DLQ, dead letter queue, pull consumer, at-least-once delivery.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack, Backend, Integration.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: itechmeat.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install cloudflare-queues into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/itechmeat/llm-code before adding cloudflare-queues to shared team environments
- Use cloudflare-queues for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: cloudflare-queues
description: "Cloudflare Queues message queue playbook: producers, consumers, Workers integration, message batching, retries, dead letter queues, delivery delay, concurrency, pull consumers, HTTP API. Keywords: Cloudflare Queues, message queue, producer, consumer, Workers binding, batch, retry, DLQ, dead letter queue, pull consumer, at-least-once delivery."
---
# Cloudflare Queues
Queues is a message queue for Workers. Supports push (Worker consumer) and pull (HTTP API) patterns. At-least-once delivery.
---
## Quick Start
### Create queue
```bash
npx wrangler queues create my-queue
```
### Producer binding
```jsonc
// wrangler.jsonc
{
"queues": {
"producers": [
{
"queue": "my-queue",
"binding": "MY_QUEUE"
}
]
}
}
```
### Consumer binding
```jsonc
// wrangler.jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"max_batch_size": 10,
"max_batch_timeout": 5
}
]
}
}
```
### Producer Worker
```typescript
export interface Env {
MY_QUEUE: Queue;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
await env.MY_QUEUE.send({ url: request.url, method: request.method });
return new Response("Message sent");
},
};
```
### Consumer Worker
```typescript
export interface Env {}
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const msg of batch.messages) {
console.log(msg.body);
msg.ack();
}
},
};
```
---
## Producer API
### send(body, options?)
```typescript
await env.MY_QUEUE.send({ action: "process", id: 123 });
// With delay
await env.MY_QUEUE.send(message, { delaySeconds: 600 }); // 10 min delay
// With content type
await env.MY_QUEUE.send(message, { contentType: "json" });
```
### sendBatch(messages, options?)
```typescript
await env.MY_QUEUE.sendBatch([{ body: { id: 1 } }, { body: { id: 2 }, options: { delaySeconds: 300 } }, { body: { id: 3 } }]);
// Global delay for batch
await env.MY_QUEUE.sendBatch(messages, { delaySeconds: 600 });
```
**Limits**:
- Max 100 messages per batch
- Max 128 KB per message
- Total batch ≤ 256 KB
### Content Types
| Type | Description |
| ------- | ------------------------------- |
| `json` | JSON serialized (default) |
| `text` | Plain text |
| `bytes` | Raw binary |
| `v8` | V8 serialization (Workers only) |
**Note**: Pull consumers cannot decode `v8` content type.
See [api.md](references/api.md) for type definitions.
---
## Consumer API
### MessageBatch
```typescript
interface MessageBatch<Body = unknown> {
readonly queue: string;
readonly messages: Message<Body>[];
ackAll(): void;
retryAll(options?: { delaySeconds?: number }): void;
}
```
### Message
```typescript
interface Message<Body = unknown> {
readonly id: string;
readonly timestamp: Date;
readonly body: Body;
readonly attempts: number;
ack(): void;
retry(options?: { delaySeconds?: number }): void;
}
```
### Acknowledgment Patterns
```typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const msg of batch.messages) {
try {
await processMessage(msg.body);
msg.ack(); // Explicit success
} catch (error) {
msg.retry({ delaySeconds: 60 }); // Retry with delay
}
}
},
};
```
### Batch-level operations
```typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
try {
await processAll(batch.messages);
batch.ackAll(); // All succeeded
} catch (error) {
batch.retryAll({ delaySeconds: 300 }); // Retry all
}
},
};
```
**Precedence**: Per-message calls override batch-level.
---
## Consumer Configuration
```jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"max_batch_size": 10, // 1-100, default 10
"max_batch_timeout": 5, // 0-60 seconds, default 5
"max_retries": 3, // default 3
"max_concurrency": 10, // default: auto-scale
"dead_letter_queue": "dlq", // optional DLQ
"retry_delay": 60 // default retry delay (seconds)
}
]
}
}
```
| Setting | Default | Max | Description |
| ------------------- | ------- | ----- | ------------------------- |
| `max_batch_size` | 10 | 100 | Messages per batch |
| `max_batch_timeout` | 5 | 60 | Seconds to wait for batch |
| `max_retries` | 3 | 100 | Retries before DLQ/delete |
| `max_concurrency` | auto | 250 | Concurrent invocations |
| `retry_delay` | 0 | 43200 | Default retry delay (12h) |
See [consumer.md](references/consumer.md) for details.
---
## Dead Letter Queues
Messages that fail after `max_retries` go to DLQ.
```jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"max_retries": 5,
"dead_letter_queue": "my-dlq"
}
]
}
}
```
**Create DLQ**:
```bash
npx wrangler queues create my-dlq
```
**DLQ retention**: 4 days without consumer.
**Process DLQ**:
```jsonc
{
"queues": {
"consumers": [
{
"queue": "my-dlq",
"max_batch_size": 1
}
]
}
}
```
---
## Delivery Delay
### On send
```typescript
await env.MY_QUEUE.send(message, { delaySeconds: 600 }); // 10 min
```
### On retry
```typescript
msg.retry({ delaySeconds: 3600 }); // 1 hour
```
### Queue-level default
```bash
npx wrangler queues create my-queue --delivery-delay-secs=300
```
### Exponential backoff
```typescript
const backoff = (attempts: number, base = 10) => base ** attempts;
msg.retry({ delaySeconds: Math.min(backoff(msg.attempts), 43200) });
```
**Maximum delay**: 12 hours (43200 seconds).
---
## Concurrency
Consumers auto-scale based on backlog. Set max:
```jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"max_concurrency": 5
}
]
}
}
```
**max_concurrency: 1** = sequential processing.
**Scaling factors**:
- Backlog size and growth
- Success/failure ratio
- max_concurrency limit
**Note**: `retry()` calls don't count as failures for scaling.
---
## Pull Consumers (HTTP API)
For consuming outside Workers.
### Enable pull consumer
```jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"type": "http_pull",
"visibility_timeout_ms": 5000,
"max_retries": 5
}
]
}
}
```
### Pull messages
```bash
curl -X POST "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/queues/$QUEUE_ID/messages/pull" \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{"batch_size": 10, "visibility_timeout_ms": 30000}'
```
### Acknowledge messages
```bash
curl -X POST "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/queues/$QUEUE_ID/messages/ack" \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"acks": [{"lease_id": "..."}],
"retries": [{"lease_id": "...", "delay_seconds": 60}]
}'
```
See [pull-consumer.md](references/pull-consumer.md) for details.
---
## Wrangler Commands
```bash
# Queue management
wrangler queues create <name> [--delivery-delay-secs=N]
wrangler queues delete <name>
wrangler queues list
wrangler queues info <name>
# Pause/resume
wrangler queues pause-delivery <name>
wrangler queues resume-delivery <name>
# Purge all messages
wrangler queues purge <name>
# Consumer management
wrangler queues consumer add <queue> <script> [options]
wrangler queues consumer remove <queue> <script>
wrangler queues consumer http add <queue> [options]
wrangler queues consumer http remove <queue>
```
---
## Limits
| Parameter | Limit |
| ---------------------- | ------------------ |
| Queues per account | 10,000 |
| Message size | 128 KB |
| Messages per sendBatch | 100 |
| Batch size (consumer) | 100 |
| Per-queue throughput | 5,000 msg/sec |
| Per-queue backlog | 25 GB |
| Message retention | 4 days (max 14) |
| Concurrent consumers | 250 |
| Consumer duration | 15 min wall clock |
| Consumer CPU | 30 sec (max 5 min) |
| Delay (send/retry) | 12 hours |
| Max retries | 100 |
### Increase CPU limit
```jsonc
{
"limits": {
"cpu_ms": 300000 // 5 minutes
}
}
```
---
## Pricing
**Workers Paid**: 1M operations/month included, then $0.40/million.
**Operation** = 64 KB chunk written, read, or deleted.
| Action | Operations |
| ----------------- | ----------------- |
| Send 1 message | 1 write |
| Consume 1 message | 1 read |
| Delete 1 message | 1 delete (on ack) |
| Retry | 1 additional read |
| DLQ write | 1 write |
**Formula**: `(Messages × 3 - 1M) / 1M × $0.40`
**No egress fees**.
See [pricing.md](references/pricing.md) for examples.
---
## Delivery Guarantees
**At-least-once delivery**: Messages delivered at least once, possibly duplicated.
**Handle duplicates**:
```typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const msg of batch.messages) {
const key = `processed:${msg.id}`;
if (await env.KV.get(key)) {
msg.ack(); // Already processed
continue;
}
await processMessage(msg.body);
await env.KV.put(key, "1", { expirationTtl: 86400 });
msg.ack();
}
},
};
```
---
## Event Notifications
R2 and other services can send events to Queues.
```bash
# R2 → Queue
wrangler r2 bucket notification create my-bucket \
--event-type object-create \
--queue my-queue
```
See `cloudflare-r2` skill for event notification setup.
---
## Prohibitions
- ❌ Do not use `v8` content type with pull consumers
- ❌ Do not exceed 128 KB per message
- ❌ Do not rely on exactly-once delivery (use idempotency)
- ❌ Do not ignore DLQ — process failed messages
- ❌ Do not set excessive concurrency without testing
---
## References
- [api.md](references/api.md) — Producer/Consumer API reference
- [consumer.md](references/consumer.md) — Consumer configuration
- [pull-consumer.md](references/pull-consumer.md) — HTTP pull API
- [pricing.md](references/pricing.md) — Billing details
## Related Skills
- `cloudflare-workers` — Worker development
- `cloudflare-r2` — R2 event notifications
- `cloudflare-durable-objects` — Queue producer from DO
- `cloudflare-kv` — Idempotency tracking
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### references/api.md
```markdown
# Queues API Reference
## Queue (Producer)
```typescript
interface Queue<Body = unknown> {
send(body: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;
}
interface QueueSendOptions {
contentType?: QueueContentType;
delaySeconds?: number; // 0-43200 (12 hours)
}
interface QueueSendBatchOptions {
delaySeconds?: number; // Default for all messages
}
interface MessageSendRequest<Body = unknown> {
body: Body;
options?: QueueSendOptions;
}
type QueueContentType = "text" | "bytes" | "json" | "v8";
```
## MessageBatch (Consumer)
```typescript
interface MessageBatch<Body = unknown> {
readonly queue: string;
readonly messages: Message<Body>[];
ackAll(): void;
retryAll(options?: QueueRetryOptions): void;
}
interface Message<Body = unknown> {
readonly id: string;
readonly timestamp: Date;
readonly body: Body;
readonly attempts: number;
ack(): void;
retry(options?: QueueRetryOptions): void;
}
interface QueueRetryOptions {
delaySeconds?: number; // 0-43200
}
```
## Content Types
| Type | Description | Pull Consumer Support |
| ------- | ------------------------- | --------------------- |
| `json` | JSON serialized (default) | ✓ |
| `text` | Plain UTF-8 text | ✓ |
| `bytes` | Raw binary (ArrayBuffer) | ✓ |
| `v8` | V8 internal serialization | ✗ |
## Send Examples
```typescript
// Basic send
await env.MY_QUEUE.send({ task: "process", id: 123 });
// With delay
await env.MY_QUEUE.send(data, { delaySeconds: 600 });
// With explicit content type
await env.MY_QUEUE.send(text, { contentType: "text" });
await env.MY_QUEUE.send(buffer, { contentType: "bytes" });
```
## Batch Examples
```typescript
// Send batch
await env.MY_QUEUE.sendBatch([{ body: { id: 1 } }, { body: { id: 2 }, options: { delaySeconds: 300 } }]);
// With global options
await env.MY_QUEUE.sendBatch(messages, { delaySeconds: 60 });
```
## Consumer Examples
```typescript
export default {
async queue(batch: MessageBatch<MyType>, env: Env): Promise<void> {
for (const msg of batch.messages) {
console.log(msg.id, msg.attempts, msg.body);
msg.ack();
}
},
};
```
## Acknowledgment Priority
1. Per-message `ack()` / `retry()` calls take precedence
2. Batch-level `ackAll()` / `retryAll()` for remaining
3. No action = implicit retry (uses default delay)
```
### references/consumer.md
```markdown
# Consumer Configuration
## wrangler.jsonc Configuration
```jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"max_batch_size": 10,
"max_batch_timeout": 5,
"max_retries": 3,
"max_concurrency": 10,
"dead_letter_queue": "my-dlq",
"retry_delay": 60
}
]
}
}
```
## Consumer Settings
| Setting | Type | Default | Range | Description |
| ------------------- | ------ | -------- | ------- | ----------------------------- |
| `queue` | string | required | — | Queue name |
| `max_batch_size` | number | 10 | 1-100 | Messages per batch |
| `max_batch_timeout` | number | 5 | 0-60 | Seconds to fill batch |
| `max_retries` | number | 3 | 0-100 | Retries before DLQ/delete |
| `max_concurrency` | number | auto | 1-250 | Max concurrent invocations |
| `dead_letter_queue` | string | none | — | DLQ name |
| `retry_delay` | number | 0 | 0-43200 | Default retry delay (seconds) |
## Batching
Messages batch until:
- `max_batch_size` reached, OR
- `max_batch_timeout` elapsed
```jsonc
// Small batches, fast delivery
{ "max_batch_size": 1, "max_batch_timeout": 0 }
// Large batches, higher latency
{ "max_batch_size": 100, "max_batch_timeout": 30 }
```
## Concurrency
Auto-scales based on:
- Queue backlog size and growth rate
- Consumer success/failure ratio
- `max_concurrency` limit
```jsonc
// Sequential processing
{ "max_concurrency": 1 }
// High parallelism
{ "max_concurrency": 100 }
```
## Retry Behavior
1. Message fails (exception or no ack/retry call)
2. Message retried after `retry_delay`
3. After `max_retries`:
- Sent to `dead_letter_queue` (if configured)
- Otherwise deleted
## Dead Letter Queue
```jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"max_retries": 5,
"dead_letter_queue": "my-dlq"
}
]
}
}
```
DLQ requires separate consumer to process failures.
## Multiple Consumers
A queue can have only ONE consumer. To fan-out:
```typescript
// Single consumer fans out to multiple queues
await Promise.all([env.QUEUE_A.send(message), env.QUEUE_B.send(message), env.QUEUE_C.send(message)]);
```
## Pause/Resume
```bash
wrangler queues pause-delivery my-queue
wrangler queues resume-delivery my-queue
```
Messages queue but don't deliver when paused.
```
### references/pull-consumer.md
```markdown
# Pull Consumers (HTTP API)
Pull consumers allow consuming queue messages from outside Cloudflare Workers via HTTP API.
## Enable Pull Consumer
```jsonc
// wrangler.jsonc
{
"queues": {
"consumers": [
{
"queue": "my-queue",
"type": "http_pull",
"visibility_timeout_ms": 30000,
"max_retries": 5,
"dead_letter_queue": "my-dlq"
}
]
}
}
```
## Configuration
| Setting | Default | Range | Description |
| ----------------------- | -------- | ---------------------- | ----------------------- |
| `type` | `worker` | `worker` / `http_pull` | Consumer type |
| `visibility_timeout_ms` | 30000 | 1000-43200000 | Time before re-delivery |
| `max_retries` | 3 | 0-100 | Retries before DLQ |
| `dead_letter_queue` | none | — | DLQ name |
## API Endpoints
Base URL: `https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/{queue_id}/messages`
### Pull Messages
```bash
curl -X POST ".../messages/pull" \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"batch_size": 10,
"visibility_timeout_ms": 30000
}'
```
**Response**:
```json
{
"result": {
"messages": [
{
"id": "msg-abc123",
"body": "{\"task\":\"process\"}",
"timestamp_ms": 1699000000000,
"attempts": 1,
"lease_id": "lease-xyz789",
"metadata": {
"content-type": "application/json"
}
}
]
}
}
```
### Acknowledge Messages
```bash
curl -X POST ".../messages/ack" \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"acks": [
{ "lease_id": "lease-xyz789" }
],
"retries": [
{ "lease_id": "lease-abc456", "delay_seconds": 60 }
]
}'
```
## Visibility Timeout
When pulled, message becomes invisible for `visibility_timeout_ms`.
- If not acknowledged: re-delivered after timeout
- Ack before timeout: permanently deleted
- Retry: re-queued with optional delay
## Content Types
**Warning**: `v8` content type cannot be decoded by external consumers.
| Content-Type Header | Body Format |
| -------------------------- | -------------- |
| `application/json` | JSON string |
| `text/plain` | UTF-8 text |
| `application/octet-stream` | Base64 encoded |
## Python Example
```python
import requests
ACCOUNT_ID = "..."
QUEUE_ID = "..."
API_TOKEN = "..."
BASE_URL = f"https://api.cloudflare.com/client/v4/accounts/{ACCOUNT_ID}/queues/{QUEUE_ID}/messages"
HEADERS = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
# Pull messages
resp = requests.post(f"{BASE_URL}/pull", headers=HEADERS, json={"batch_size": 10})
messages = resp.json()["result"]["messages"]
# Process and acknowledge
acks = []
for msg in messages:
process(msg["body"])
acks.append({"lease_id": msg["lease_id"]})
requests.post(f"{BASE_URL}/ack", headers=HEADERS, json={"acks": acks})
```
## Required Permissions
API Token needs:
- `com.cloudflare.api.account.queues.pull` — Read messages
- `com.cloudflare.api.account.queues.ack` — Acknowledge messages
```
### references/pricing.md
```markdown
# Queues Pricing
## Plan Requirement
Queues requires **Workers Paid** plan ($5/month).
## Included
- 1 million operations/month
## Additional Usage
- **$0.40 per million operations** (after included)
## What Counts as an Operation
| Action | Operations |
| ----------------------- | ----------------- |
| Write message to queue | 1 per 64 KB |
| Read message (consumer) | 1 per 64 KB |
| Delete message (on ack) | 1 per 64 KB |
| Retry (re-read) | 1 additional read |
| DLQ write | 1 write |
**64 KB unit**: Messages larger than 64 KB count as multiple operations.
| Message Size | Write Ops | Read Ops | Delete Ops |
| ------------ | --------- | -------- | ---------- |
| ≤ 64 KB | 1 | 1 | 1 |
| 65 KB | 2 | 2 | 2 |
| 128 KB | 2 | 2 | 2 |
## Cost Examples
### Light usage
- 500K messages/month × 3 ops = 1.5M ops
- Within included: **$0**
### Medium usage
- 5M messages/month × 3 ops = 15M ops
- Billable: 15M - 1M = 14M ops
- Cost: 14 × $0.40 = **$5.60**
### With retries
- 1M messages, 10% retry once
- Ops: 1M writes + 1.1M reads + 1M deletes = 3.1M
- Billable: 2.1M ops
- Cost: 2.1 × $0.40 = **$0.84**
### Large messages
- 1M messages at 100KB each
- Each = 2 ops per action
- Ops: 1M × 2 × 3 = 6M ops
- Billable: 5M ops
- Cost: 5 × $0.40 = **$2.00**
## No Additional Charges
- ✓ No egress fees
- ✓ No storage fees
- ✓ No per-queue fees
- ✓ No API request fees
## DLQ Consideration
DLQ messages incur:
1. Original queue retries (reads)
2. DLQ write (1 op)
3. DLQ read when processed (1 op)
4. DLQ delete when acked (1 op)
## Cost Optimization
1. Keep messages small (≤ 64 KB)
2. Minimize retries with proper error handling
3. Process DLQ regularly
4. Use batch operations efficiently
```