Back to skills
SkillHub ClubAnalyze Data & AIFull StackData / AI

trigger-agents

AI agent patterns with Trigger.dev - orchestration, parallelization, routing, evaluator-optimizer, and human-in-the-loop. Use when building LLM-powered tasks that need parallel workers, approval gates, tool calling, or multi-step agent workflows.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
17
Hot score
87
Updated
March 20, 2026
Overall rating
C1.6
Composite score
1.6
Best-practice grade
B73.6

Install command

npx @skill-hub/cli install triggerdotdev-skills-trigger-agents

Repository

triggerdotdev/skills

Skill path: trigger-agents

AI agent patterns with Trigger.dev - orchestration, parallelization, routing, evaluator-optimizer, and human-in-the-loop. Use when building LLM-powered tasks that need parallel workers, approval gates, tool calling, or multi-step agent workflows.

Open repository

Best for

Primary workflow: Analyze Data & AI.

Technical facets: Full Stack, Data / AI.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: triggerdotdev.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install trigger-agents into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/triggerdotdev/skills before adding trigger-agents to shared team environments
  • Use trigger-agents for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: trigger-agents
description: AI agent patterns with Trigger.dev - orchestration, parallelization, routing, evaluator-optimizer, and human-in-the-loop. Use when building LLM-powered tasks that need parallel workers, approval gates, tool calling, or multi-step agent workflows.
---

# AI Agent Patterns with Trigger.dev

Build production-ready AI agents using Trigger.dev's durable execution.

## Pattern Selection

```
Need to...                              → Use
─────────────────────────────────────────────────────
Process items in parallel               → Parallelization
Route to different models/handlers      → Routing
Chain steps with validation gates       → Prompt Chaining
Coordinate multiple specialized tasks   → Orchestrator-Workers
Self-improve until quality threshold    → Evaluator-Optimizer
Pause for human approval                → Human-in-the-Loop (waitpoints.md)
Stream progress to frontend             → Realtime Streams (streaming.md)
Let LLM call your tasks as tools        → ai.tool (ai-tool.md)
```

---

## Core Patterns

### 1. Prompt Chaining (Sequential with Gates)

Chain LLM calls with validation between steps. Fail early if intermediate output is bad.

```typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";

export const translateCopy = task({
  id: "translate-copy",
  run: async ({ text, targetLanguage, maxWords }) => {
    // Step 1: Generate
    const draft = await generateText({
      model: openai("gpt-4o"),
      prompt: `Write marketing copy about: ${text}`,
    });

    // Gate: Validate before continuing
    const wordCount = draft.text.split(/\s+/).length;
    if (wordCount > maxWords) {
      throw new Error(`Draft too long: ${wordCount} > ${maxWords}`);
    }

    // Step 2: Translate (only if gate passed)
    const translated = await generateText({
      model: openai("gpt-4o"),
      prompt: `Translate to ${targetLanguage}: ${draft.text}`,
    });

    return { draft: draft.text, translated: translated.text };
  },
});
```

---

### 2. Routing (Classify → Dispatch)

Use a cheap model to classify, then route to appropriate handler.

```typescript
import { task } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

const routingSchema = z.object({
  model: z.enum(["gpt-4o", "o1-mini"]),
  reason: z.string(),
});

export const routeQuestion = task({
  id: "route-question",
  run: async ({ question }) => {
    // Cheap classification call
    const routing = await generateText({
      model: openai("gpt-4o-mini"),
      messages: [
        {
          role: "system",
          content: `Classify question complexity. Return JSON: {"model": "gpt-4o" | "o1-mini", "reason": "..."}
          - gpt-4o: simple factual questions
          - o1-mini: complex reasoning, math, code`,
        },
        { role: "user", content: question },
      ],
    });

    const { model } = routingSchema.parse(JSON.parse(routing.text));

    // Route to selected model
    const answer = await generateText({
      model: openai(model),
      prompt: question,
    });

    return { answer: answer.text, routedTo: model };
  },
});
```

---

### 3. Parallelization

Run independent LLM calls simultaneously with `batch.triggerByTaskAndWait`.

```typescript
import { batch, task } from "@trigger.dev/sdk";

export const analyzeContent = task({
  id: "analyze-content",
  run: async ({ text }) => {
    // All three run in parallel
    const { runs: [sentiment, summary, moderation] } = await batch.triggerByTaskAndWait([
      { task: analyzeSentiment, payload: { text } },
      { task: summarizeText, payload: { text } },
      { task: moderateContent, payload: { text } },
    ]);

    // Check moderation first
    if (moderation.ok && moderation.output.flagged) {
      return { error: "Content flagged", reason: moderation.output.reason };
    }

    return {
      sentiment: sentiment.ok ? sentiment.output : null,
      summary: summary.ok ? summary.output : null,
    };
  },
});
```

**See:** `references/orchestration.md` for advanced patterns

---

### 4. Orchestrator-Workers (Fan-out/Fan-in)

Orchestrator extracts work items, fans out to workers, aggregates results.

```typescript
import { batch, task } from "@trigger.dev/sdk";

export const factChecker = task({
  id: "fact-checker",
  run: async ({ article }) => {
    // Step 1: Extract claims (sequential - need output first)
    const { runs: [extractResult] } = await batch.triggerByTaskAndWait([
      { task: extractClaims, payload: { article } },
    ]);

    if (!extractResult.ok) throw new Error("Failed to extract claims");
    const claims = extractResult.output;

    // Step 2: Fan-out - verify all claims in parallel
    const { runs } = await batch.triggerByTaskAndWait(
      claims.map(claim => ({ task: verifyClaim, payload: claim }))
    );

    // Step 3: Fan-in - aggregate results
    const verified = runs
      .filter((r): r is typeof r & { ok: true } => r.ok)
      .map(r => r.output);

    return { claims, verifications: verified };
  },
});
```

---

### 5. Evaluator-Optimizer (Self-Refining Loop)

Generate → Evaluate → Retry with feedback until approved.

```typescript
import { task } from "@trigger.dev/sdk";

export const refineTranslation = task({
  id: "refine-translation",
  run: async ({ text, targetLanguage, feedback, attempt = 0 }) => {
    // Bail condition
    if (attempt >= 5) {
      return { text, status: "MAX_ATTEMPTS", attempts: attempt };
    }

    // Generate (with feedback if retrying)
    const prompt = feedback
      ? `Improve this translation based on feedback:\n${feedback}\n\nOriginal: ${text}`
      : `Translate to ${targetLanguage}: ${text}`;

    const translation = await generateText({
      model: openai("gpt-4o"),
      prompt,
    });

    // Evaluate
    const evaluation = await generateText({
      model: openai("gpt-4o"),
      prompt: `Evaluate translation quality. Reply APPROVED or provide specific feedback:\n${translation.text}`,
    });

    if (evaluation.text.includes("APPROVED")) {
      return { text: translation.text, status: "APPROVED", attempts: attempt + 1 };
    }

    // Recursive self-call with feedback
    return refineTranslation.triggerAndWait({
      text,
      targetLanguage,
      feedback: evaluation.text,
      attempt: attempt + 1,
    }).unwrap();
  },
});
```

---

## Trigger-Specific Features

| Feature | What it enables | Reference |
|---------|-----------------|-----------|
| **Waitpoints** | Human approval gates, external callbacks | `references/waitpoints.md` |
| **Streams** | Real-time progress to frontend | `references/streaming.md` |
| **ai.tool** | Let LLMs call your tasks as tools | `references/ai-tool.md` |
| **batch.triggerByTaskAndWait** | Typed parallel execution | `references/orchestration.md` |

---

## Error Handling

```typescript
const { runs } = await batch.triggerByTaskAndWait([...]);

// Check individual results
for (const run of runs) {
  if (run.ok) {
    console.log(run.output);  // Typed output
  } else {
    console.error(run.error);  // Error details
    console.log(run.taskIdentifier);  // Which task failed
  }
}

// Or filter by task type
const verifications = runs
  .filter((r): r is typeof r & { ok: true } =>
    r.ok && r.taskIdentifier === "verify-claim"
  )
  .map(r => r.output);
```

---

## Quick Reference

```typescript
// Trigger and wait for result
const result = await myTask.triggerAndWait(payload);
if (result.ok) console.log(result.output);

// Batch trigger same task
const results = await myTask.batchTriggerAndWait([
  { payload: item1 },
  { payload: item2 },
]);

// Batch trigger different tasks (typed)
const { runs } = await batch.triggerByTaskAndWait([
  { task: taskA, payload: { foo: 1 } },
  { task: taskB, payload: { bar: "x" } },
]);

// Self-recursion with unwrap
return myTask.triggerAndWait(newPayload).unwrap();
```


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### references/orchestration.md

```markdown
# Orchestration Patterns

Advanced patterns for `batch.triggerByTaskAndWait` and task coordination.

## Basic Usage

```typescript
import { batch, task } from "@trigger.dev/sdk";

// Trigger different tasks, get typed results
const { runs } = await batch.triggerByTaskAndWait([
  { task: taskA, payload: { foo: "bar" } },  // payload typed to taskA
  { task: taskB, payload: { num: 42 } },     // payload typed to taskB
]);

// Results are typed based on position
if (runs[0].ok) {
  console.log(runs[0].output);  // typed as taskA output
}
```

## Destructured Results

```typescript
const {
  runs: [userRun, postsRun, settingsRun],
} = await batch.triggerByTaskAndWait([
  { task: fetchUser, payload: { id } },
  { task: fetchPosts, payload: { userId: id } },
  { task: fetchSettings, payload: { userId: id } },
]);

// Each run is individually typed
const user = userRun.ok ? userRun.output : null;
const posts = postsRun.ok ? postsRun.output : [];
```

---

## Error Handling Per-Task

```typescript
const { runs } = await batch.triggerByTaskAndWait([
  { task: riskyTask, payload: item1 },
  { task: riskyTask, payload: item2 },
  { task: riskyTask, payload: item3 },
]);

// Individual error handling
const results = runs.map(run => {
  if (run.ok) {
    return { success: true, data: run.output };
  }
  return {
    success: false,
    error: run.error,
    taskId: run.taskIdentifier,
    runId: run.id,
  };
});

// Or throw if any failed
const failed = runs.filter(r => !r.ok);
if (failed.length > 0) {
  throw new Error(`${failed.length} tasks failed`);
}
```

---

## Filtering by Task Identifier

When running mixed task types, filter results by `taskIdentifier`:

```typescript
const { runs } = await batch.triggerByTaskAndWait([
  ...claims.map(c => ({ task: verifySource, payload: c })),
  ...claims.map(c => ({ task: analyzeHistory, payload: c })),
]);

// Filter to specific task results
const verifications = runs
  .filter((r): r is typeof r & { ok: true } =>
    r.ok && r.taskIdentifier === "verify-source"
  )
  .map(r => r.output as SourceVerification);

const analyses = runs
  .filter((r): r is typeof r & { ok: true } =>
    r.ok && r.taskIdentifier === "analyze-history"
  )
  .map(r => r.output as HistoricalAnalysis);
```

---

## Fan-out/Fan-in Pattern

```typescript
export const processItems = task({
  id: "process-items",
  run: async ({ items }) => {
    // Fan-out: process all items in parallel
    const { runs } = await batch.triggerByTaskAndWait(
      items.map(item => ({ task: processItem, payload: item }))
    );

    // Fan-in: aggregate results
    const successful = runs.filter(r => r.ok).map(r => r.output);
    const failed = runs.filter(r => !r.ok);

    return {
      processed: successful.length,
      failed: failed.length,
      results: successful,
      errors: failed.map(f => ({ id: f.id, error: f.error })),
    };
  },
});
```

---

## Sequential Then Parallel

```typescript
export const orchestrator = task({
  id: "orchestrator",
  run: async ({ input }) => {
    // Step 1: Sequential preprocessing
    const { runs: [prepResult] } = await batch.triggerByTaskAndWait([
      { task: preprocess, payload: { input } },
    ]);

    if (!prepResult.ok) {
      throw new Error(`Preprocessing failed: ${prepResult.error}`);
    }

    const items = prepResult.output;

    // Step 2: Parallel processing
    const { runs } = await batch.triggerByTaskAndWait(
      items.map(item => ({ task: processItem, payload: item }))
    );

    // Step 3: Sequential aggregation
    const { runs: [aggResult] } = await batch.triggerByTaskAndWait([
      { task: aggregate, payload: { results: runs.filter(r => r.ok).map(r => r.output) } },
    ]);

    return aggResult.ok ? aggResult.output : null;
  },
});
```

---

## Same Task, Multiple Items

For batch processing the same task:

```typescript
// Using batchTriggerAndWait (single task type)
const results = await processItem.batchTriggerAndWait([
  { payload: item1 },
  { payload: item2 },
  { payload: item3 },
]);

// Equivalent using batch.triggerByTaskAndWait
const { runs } = await batch.triggerByTaskAndWait([
  { task: processItem, payload: item1 },
  { task: processItem, payload: item2 },
  { task: processItem, payload: item3 },
]);
```

---

## Concurrency Control

Control parallelism via queue settings on child tasks:

```typescript
import { queue, task } from "@trigger.dev/sdk";

const rateLimitedQueue = queue({
  name: "api-calls",
  concurrencyLimit: 5,  // Max 5 concurrent
});

export const callExternalApi = task({
  id: "call-external-api",
  queue: rateLimitedQueue,
  run: async (payload) => {
    // Rate limited to 5 concurrent executions
    return fetch(payload.url);
  },
});

// Parent can batch trigger many - queue handles concurrency
export const batchProcess = task({
  id: "batch-process",
  run: async ({ urls }) => {
    // Will queue up, respecting concurrencyLimit: 5
    return callExternalApi.batchTriggerAndWait(
      urls.map(url => ({ payload: { url } }))
    );
  },
});
```

---

## Streaming Batch Items

For large batches, stream items instead of loading all at once:

```typescript
import { batch } from "@trigger.dev/sdk";

// Generator function for items
async function* generateItems() {
  for await (const record of database.cursor()) {
    yield { task: processRecord, payload: record };
  }
}

// Stream to batch trigger
const { runs } = await batch.triggerByTaskAndWait(generateItems());
```

---

## Tips

1. **Use destructuring** for known task counts - cleaner code
2. **Filter by taskIdentifier** when mixing task types
3. **Check `.ok`** before accessing `.output`
4. **Control concurrency** on child task queues, not in orchestrator
5. **Avoid parallel waits** - use batch methods, not Promise.all with triggerAndWait

```

### references/waitpoints.md

```markdown
# Human-in-the-Loop with Waitpoints

Pause task execution for human approval, external callbacks, or async events.

## Core API

```typescript
import { wait } from "@trigger.dev/sdk";

// Create a token (pauses execution point)
const token = await wait.createToken({
  timeout: "10m",  // "1h", "1d", etc.
});

// Wait for completion (blocks until resolved)
const result = await wait.forToken<ApprovalPayload>(token.id);

if (result.ok) {
  console.log(result.output);  // Typed as ApprovalPayload
} else {
  console.log("Timed out:", result.error);
}
```

---

## Complete Pattern: Slack Approval

```typescript
import { task, wait } from "@trigger.dev/sdk";

type ApprovalToken = {
  approved: boolean;
  selectedOption: "optionA" | "optionB";
  approvedBy: string;
};

export const generateWithApproval = task({
  id: "generate-with-approval",
  maxDuration: 600,  // 10 min to account for human delay
  run: async ({ prompt }) => {
    // 1. Generate options
    const options = await generateOptions(prompt);

    // 2. Create approval token
    const token = await wait.createToken({
      timeout: "1h",
    });

    // 3. Send to Slack/email/webhook
    await sendSlackMessage({
      text: "Please approve one option:",
      options,
      approvalUrl: `${process.env.APP_URL}/approve?token=${token.id}`,
      // Or use: token.url for direct callback
    });

    // 4. Wait for human (task suspends here)
    const result = await wait.forToken<ApprovalToken>(token.id);

    if (!result.ok) {
      throw new Error("Approval timed out");
    }

    // 5. Continue with approved option
    return {
      selected: result.output.selectedOption,
      approvedBy: result.output.approvedBy,
      options,
    };
  },
});
```

---

## Completing Tokens

### From your backend

```typescript
import { wait } from "@trigger.dev/sdk";

// In your approval endpoint
export async function POST(request: Request) {
  const { tokenId, approved, option, userId } = await request.json();

  await wait.completeToken<ApprovalToken>(tokenId, {
    approved,
    selectedOption: option,
    approvedBy: userId,
  });

  return Response.json({ success: true });
}
```

### Via HTTP callback (webhooks)

```typescript
const token = await wait.createToken({ timeout: "10m" });

// token.url is a webhook URL that completes the token
// POST to token.url with JSON body → becomes the output
await externalService.startJob({
  callbackUrl: token.url,  // Service POSTs result here
});

const result = await wait.forToken<ExternalResult>(token.id);
```

### From React (useWaitToken)

```typescript
import { useWaitToken } from "@trigger.dev/react-hooks";

function ApprovalButton({ tokenId, publicToken }) {
  const { complete, isCompleting } = useWaitToken(tokenId, {
    accessToken: publicToken,
  });

  return (
    <button
      onClick={() => complete({ approved: true })}
      disabled={isCompleting}
    >
      Approve
    </button>
  );
}
```

---

## Timeout Handling

```typescript
const result = await wait.forToken<ApprovalToken>(token.id);

if (result.ok) {
  // Human responded in time
  return processApproval(result.output);
} else {
  // Timed out - handle gracefully
  await notifyTimeout();
  return { status: "timeout", defaultAction: "rejected" };
}
```

### Using .unwrap() for cleaner code

```typescript
try {
  const approval = await wait.forToken<ApprovalToken>(token.id).unwrap();
  // approval is directly typed, throws on timeout
  return processApproval(approval);
} catch (error) {
  // Timeout throws here
  return handleTimeout();
}
```

---

## Idempotency

Prevent duplicate tokens for the same workflow:

```typescript
const token = await wait.createToken({
  timeout: "1h",
  idempotencyKey: `review-${workflowId}`,
});
```

---

## Tags for Tracking

```typescript
const token = await wait.createToken({
  timeout: "1h",
  tags: [`workflow:${workflowId}`, `user:${userId}`],
});
```

---

## Public Access Token

For frontend completion without server round-trip:

```typescript
const token = await wait.createToken({ timeout: "10m" });

// Pass to frontend
return {
  tokenId: token.id,
  publicToken: token.publicAccessToken,  // Auto-generated, expires in 1h
};
```

---

## Example: Multi-step Review

```typescript
export const contentPipeline = task({
  id: "content-pipeline",
  run: async ({ content }) => {
    // Step 1: AI generation
    const draft = await generateDraft(content);

    // Step 2: Human review
    const reviewToken = await wait.createToken({ timeout: "24h" });
    await sendForReview(draft, reviewToken.id);
    const review = await wait.forToken<ReviewResult>(reviewToken.id);

    if (!review.ok || !review.output.approved) {
      return { status: "rejected", feedback: review.output?.feedback };
    }

    // Step 3: Final approval
    const publishToken = await wait.createToken({ timeout: "1h" });
    await sendForPublishApproval(draft, publishToken.id);
    const publish = await wait.forToken<PublishResult>(publishToken.id);

    if (!publish.ok || !publish.output.approved) {
      return { status: "not_published" };
    }

    // Step 4: Publish
    await publishContent(draft);
    return { status: "published" };
  },
});
```

---

## Tips

1. **Set realistic timeouts** - account for human response time
2. **Handle timeouts gracefully** - don't throw, provide default behavior
3. **Use idempotencyKey** - prevent duplicate tokens on retries
4. **Increase maxDuration** - task needs enough time for human + processing
5. **Use publicAccessToken** - for direct frontend completion

```

### references/streaming.md

```markdown
# Realtime Streams

Stream data from tasks to your frontend in real-time. Perfect for AI completions, progress updates, and live status.

## Define Streams

Create typed stream definitions in a shared file:

```typescript
// trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

// Define with type and unique ID
export const progressStream = streams.define<string>({
  id: "progress",
});

export const aiOutputStream = streams.define<string>({
  id: "ai-output",
});

// Export type for frontend
export type STREAMS = typeof progressStream | typeof aiOutputStream;
```

---

## Emit from Tasks

### Basic emit

```typescript
import { task } from "@trigger.dev/sdk";
import { progressStream } from "./streams";

export const processItems = task({
  id: "process-items",
  run: async ({ items }) => {
    for (const [i, item] of items.entries()) {
      await processItem(item);

      // Emit progress
      progressStream.append(
        JSON.stringify({
          current: i + 1,
          total: items.length,
          status: `Processing ${item.name}`,
        })
      );
    }

    return { processed: items.length };
  },
});
```

### Stream AI completion

```typescript
import { task } from "@trigger.dev/sdk";
import { streamText } from "ai";
import { aiOutputStream } from "./streams";

export const generateText = task({
  id: "generate-text",
  run: async ({ prompt }) => {
    const result = streamText({
      model: openai("gpt-4o"),
      prompt,
    });

    // Pipe AI stream to Trigger stream
    for await (const chunk of result.textStream) {
      aiOutputStream.append(chunk);
    }

    return { text: await result.text };
  },
});
```

---

## Child → Parent Streaming

When child tasks need to emit to the parent's stream:

```typescript
// Child task
export const workerTask = task({
  id: "worker",
  run: async ({ item }) => {
    const result = await processItem(item);

    // Emit to PARENT's stream, not this task's
    progressStream.append(
      JSON.stringify({ item: item.id, status: "done" }),
      { target: "parent" }
    );

    return result;
  },
});

// Parent task - frontend subscribes to this run
export const orchestrator = task({
  id: "orchestrator",
  run: async ({ items }) => {
    // Child emits bubble up to this task's stream
    return workerTask.batchTriggerAndWait(
      items.map(item => ({ payload: { item } }))
    );
  },
});
```

---

## Frontend Subscription

### Using useRealtimeStream (Recommended)

```tsx
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import type { progressStream } from "@/trigger/streams";

function Progress({ runId, accessToken }: { runId: string; accessToken: string }) {
  const { data } = useRealtimeStream<typeof progressStream>(runId, {
    accessToken,
    stream: "progress",
  });

  if (!data) return <div>Waiting...</div>;

  // data is array of emitted values
  const latest = data[data.length - 1];
  const progress = JSON.parse(latest);

  return (
    <div>
      {progress.current} / {progress.total}: {progress.status}
    </div>
  );
}
```

### Using useRealtimeRunWithStreams

```tsx
import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks";
import type { processItems, STREAMS } from "@/trigger/tasks";

function TaskProgress({ runId, accessToken }: Props) {
  const { run, streams } = useRealtimeRunWithStreams<typeof processItems, STREAMS>(
    runId,
    { accessToken }
  );

  const progressUpdates = streams.progress ?? [];
  const latest = progressUpdates[progressUpdates.length - 1];

  return (
    <div>
      <p>Status: {run?.status}</p>
      {latest && <p>Progress: {latest}</p>}
    </div>
  );
}
```

---

## Backend Consumption

Read streams from your backend:

```typescript
import { aiOutputStream } from "./trigger/streams";

async function consumeStream(runId: string) {
  const stream = await aiOutputStream.read(runId, {
    timeoutInSeconds: 120,
  });

  let fullText = "";
  for await (const chunk of stream) {
    fullText += chunk;
    console.log("Received:", chunk);
  }

  return fullText;
}
```

---

## JSON Serialization Pattern

Streams serialize as strings. For objects, use JSON:

```typescript
// Define helper functions
export function emitProgress(update: ProgressUpdate, options?: { target: "parent" }) {
  progressStream.append(JSON.stringify(update), options);
}

// Parse on frontend
const updates = streams.progress?.map(s => JSON.parse(s) as ProgressUpdate) ?? [];
```

---

## Throttling Frontend Updates

Prevent excessive re-renders:

```tsx
const { data } = useRealtimeStream<typeof progressStream>(runId, {
  accessToken,
  stream: "progress",
  throttleInMs: 100,  // Max 10 updates/second
});
```

---

## AI SDK Tool Calls

Stream tool calls and results:

```tsx
const { streams } = useRealtimeRunWithStreams<typeof aiTask, STREAMS>(runId, {
  accessToken,
});

// streams.openai is TextStreamPart[]
const toolCalls = streams.openai?.filter(s => s.type === "tool-call") ?? [];
const toolResults = streams.openai?.filter(s => s.type === "tool-result") ?? [];
const textDeltas = streams.openai?.filter(s => s.type === "text-delta") ?? [];

const fullText = textDeltas.map(d => d.textDelta).join("");
```

---

## Tips

1. **Use streams.define()** - always define in shared file for type safety
2. **JSON stringify objects** - streams are strings internally
3. **Use `{ target: "parent" }`** - for child-to-parent bubbling
4. **Throttle on frontend** - prevent excessive re-renders
5. **Set appropriate timeouts** - AI completions may need longer waits

```

### references/ai-tool.md

```markdown
# ai.tool Integration

Convert Trigger.dev tasks to Vercel AI SDK tools. Let LLMs call your tasks autonomously.

## Basic Usage

```typescript
import { schemaTask, ai } from "@trigger.dev/sdk";
import { generateText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

// 1. Define task with schema
const lookupWeather = schemaTask({
  id: "lookup-weather",
  schema: z.object({
    location: z.string().describe("City name"),
    units: z.enum(["celsius", "fahrenheit"]).default("celsius"),
  }),
  run: async ({ location, units }) => {
    const weather = await fetchWeather(location, units);
    return { temperature: weather.temp, conditions: weather.conditions };
  },
});

// 2. Convert to AI tool
const weatherTool = ai.tool(lookupWeather);

// 3. Use with AI SDK
export const weatherAgent = schemaTask({
  id: "weather-agent",
  schema: z.object({ question: z.string() }),
  run: async ({ question }) => {
    const result = await generateText({
      model: openai("gpt-4o"),
      prompt: question,
      tools: {
        lookupWeather: weatherTool,
      },
    });

    return { answer: result.text };
  },
});
```

---

## Schema Requirements

The task **must** use `schemaTask` with a Zod schema:

```typescript
// ✅ Works - has schema
const myTask = schemaTask({
  id: "my-task",
  schema: z.object({
    query: z.string(),
  }),
  run: async (payload) => { ... },
});

// ❌ Won't work - no schema
const myTask = task({
  id: "my-task",
  run: async (payload: { query: string }) => { ... },
});
```

**Supported schema libraries:**
- Zod
- ArkType
- Any schema with `.toJsonSchema()` method

---

## Tool Result Customization

Customize how results are sent back to the LLM:

```typescript
const searchTool = ai.tool(searchDatabase, {
  experimental_toToolResultContent: (result) => {
    // Return structured content for the LLM
    return [
      {
        type: "text",
        text: `Found ${result.count} results:\n${result.items.map(i => i.title).join("\n")}`,
      },
    ];
  },
});
```

---

## Accessing Tool Options

Get execution context inside the task:

```typescript
const myToolTask = schemaTask({
  id: "my-tool-task",
  schema: z.object({ input: z.string() }),
  run: async (payload) => {
    // Access AI SDK tool execution options
    const toolOptions = ai.currentToolOptions();

    console.log(toolOptions);
    // { toolCallId: "...", messages: [...], ... }

    return processInput(payload.input);
  },
});
```

---

## Multiple Tools

```typescript
const searchTool = ai.tool(searchDatabase);
const calculateTool = ai.tool(calculate);
const summarizeTool = ai.tool(summarize);

export const agentTask = schemaTask({
  id: "agent",
  schema: z.object({ task: z.string() }),
  run: async ({ task }) => {
    const result = await generateText({
      model: openai("gpt-4o"),
      prompt: task,
      tools: {
        search: searchTool,
        calculate: calculateTool,
        summarize: summarizeTool,
      },
      maxSteps: 10,  // Allow multiple tool calls
    });

    return { result: result.text };
  },
});
```

---

## With Tool Choice

```typescript
const result = await generateText({
  model: openai("gpt-4o"),
  prompt: "What's the weather in Tokyo?",
  tools: {
    weather: weatherTool,
    news: newsTool,
  },
  toolChoice: "required",  // Force tool use
  // or: toolChoice: { type: "tool", toolName: "weather" }
});
```

---

## Description from Schema

Add descriptions for better LLM understanding:

```typescript
const searchTask = schemaTask({
  id: "search-database",
  description: "Search the product database for items matching a query",
  schema: z.object({
    query: z.string().describe("Search terms"),
    limit: z.number().min(1).max(100).describe("Max results to return"),
    category: z.enum(["electronics", "clothing", "books"]).optional()
      .describe("Filter by product category"),
  }),
  run: async (payload) => { ... },
});
```

---

## Common Pattern: Research Agent

```typescript
const webSearch = schemaTask({
  id: "web-search",
  schema: z.object({
    query: z.string(),
    maxResults: z.number().default(5),
  }),
  run: async ({ query, maxResults }) => {
    return await searchWeb(query, maxResults);
  },
});

const readUrl = schemaTask({
  id: "read-url",
  schema: z.object({
    url: z.string().url(),
  }),
  run: async ({ url }) => {
    return await fetchAndParse(url);
  },
});

export const researchAgent = schemaTask({
  id: "research-agent",
  schema: z.object({ topic: z.string() }),
  run: async ({ topic }) => {
    const result = await generateText({
      model: openai("gpt-4o"),
      system: "Research the topic thoroughly using available tools.",
      prompt: topic,
      tools: {
        search: ai.tool(webSearch),
        read: ai.tool(readUrl),
      },
      maxSteps: 20,
    });

    return { research: result.text };
  },
});
```

---

## Tips

1. **Always use schemaTask** - regular `task` won't work
2. **Add descriptions** - helps LLM understand when to use the tool
3. **Use `.describe()`** - on schema fields for parameter hints
4. **Set maxSteps** - allow multiple tool calls for complex tasks
5. **Customize results** - use `experimental_toToolResultContent` for better LLM context

```