Guides
DAG Workflows

DAG Workflow Patterns

This guide covers advanced patterns for building DAG workflows, including parallel branches, conditional logic, error handling, and composition.

Pattern: Linear Pipeline

The simplest DAG is a linear chain where each step depends on the previous one.

[A] → [B] → [C] → [D]
steps:
  - id: a
    prompt: "Step A..."
  - id: b
    prompt: "Using A's output: {{a.output}}..."
    dependsOn: [a]
  - id: c
    prompt: "Using B's output: {{b.output}}..."
    dependsOn: [b]
  - id: d
    prompt: "Using C's output: {{c.output}}..."
    dependsOn: [c]

Use when: Steps must execute sequentially and each step depends on the previous output.

Pattern: Fan-Out / Fan-In

Split work across parallel branches, then merge results.

       [A]
      / | \
    [B] [C] [D]
      \ | /
       [E]
steps:
  - id: a
    prompt: "Extract data..."
 
  - id: b
    prompt: "Analyze aspect 1: {{a.output}}"
    dependsOn: [a]
 
  - id: c
    prompt: "Analyze aspect 2: {{a.output}}"
    dependsOn: [a]
 
  - id: d
    prompt: "Analyze aspect 3: {{a.output}}"
    dependsOn: [a]
 
  - id: e
    prompt: |
      Combine analyses into a final report:
      Aspect 1: {{b.output}}
      Aspect 2: {{c.output}}
      Aspect 3: {{d.output}}
    dependsOn: [b, c, d]

Use when: Multiple independent analyses can run concurrently before a synthesis step.

Pattern: Diamond Dependency

Two parallel paths converge at a single node.

    [Extract]
     /     \
[Risk]   [Compliance]
     \     /
    [Report]

This is the pattern used in the contract review template. Steps Risk and Compliance run in parallel because they share only the Extract dependency.

Pattern: Conditional Branching

Skip steps based on previous outputs.

[Triage] → [Critical Path]  (if severity == critical)
         → [Normal Path]    (if severity != critical)
         → [Report]
steps:
  - id: triage
    prompt: |
      Assess the severity of this incident: {{input.incident}}
      Return JSON: { "severity": "critical" | "normal", "reason": "..." }
 
  - id: critical-path
    prompt: "Perform deep analysis for critical incident: {{triage.output}}"
    dependsOn: [triage]
    condition: '{{triage.output}}.includes("critical")'
    model: opus
 
  - id: normal-path
    prompt: "Perform standard analysis: {{triage.output}}"
    dependsOn: [triage]
    condition: '{{triage.output}}.includes("normal")'
    model: haiku
 
  - id: report
    prompt: |
      Generate incident report from analysis:
      {{critical-path.output}}
      {{normal-path.output}}
    dependsOn: [critical-path, normal-path]

When a conditional step is skipped, its output is null. The merge step receives outputs from whichever branch executed. Design your merge prompt to handle both cases.

Pattern: Iterative Refinement

Run a step, evaluate the output, and retry if quality is insufficient.

const plan = await client.createPlan({
  name: 'iterative-refinement',
  steps: [
    {
      id: 'draft',
      prompt: 'Write a technical blog post about {{input.topic}}',
      model: 'sonnet',
    },
    {
      id: 'evaluate',
      prompt: `Rate this blog post on a scale of 1-10 for:
               clarity, accuracy, engagement.
               Return JSON: { "scores": {...}, "average": N, "feedback": "..." }
               Post: {{draft.output}}`,
      model: 'haiku',
      dependsOn: ['draft'],
    },
    {
      id: 'refine',
      prompt: `Improve this blog post based on feedback:
               Original: {{draft.output}}
               Feedback: {{evaluate.output}}
               Address all feedback points and improve the overall quality.`,
      model: 'opus',
      dependsOn: ['evaluate'],
      condition: '{{evaluate.output}}.includes("average") && parseInt({{evaluate.output}}.match(/average.*?(\\d)/)[1]) < 8',
    },
  ],
});

Pattern: Multi-Model Consensus

Run the same task across multiple models and synthesize results.

    [Prompt]
   / | | | \
 [M1][M2][M3][M4][M5]
   \ | | | /
  [Consensus]
const models = ['opus', 'sonnet', 'gpt4o', 'gemini-pro', 'llama-70b'];
 
const plan = await client.createPlan({
  name: 'multi-model-consensus',
  steps: [
    ...models.map((model, i) => ({
      id: `model-${i}`,
      prompt: '{{input.question}}',
      model,
    })),
    {
      id: 'consensus',
      prompt: `Given these responses from 5 different AI models,
               synthesize a consensus answer. Note areas of agreement
               and disagreement.
 
               ${models.map((m, i) => `Model ${i + 1} (${m}): {{model-${i}.output}}`).join('\n')}`,
      model: 'opus',
      dependsOn: models.map((_, i) => `model-${i}`),
    },
  ],
});

Pattern: Map-Reduce

Process a large dataset by splitting, processing in parallel, and combining.

// Split input into chunks
const chunks = splitIntoChunks(largeDataset, 10);
 
const plan = await client.createPlan({
  name: 'map-reduce',
  steps: [
    // Map phase: process each chunk
    ...chunks.map((chunk, i) => ({
      id: `map-${i}`,
      prompt: `Process this data chunk and extract key entities: ${JSON.stringify(chunk)}`,
      model: 'haiku',
    })),
    // Reduce phase: combine results
    {
      id: 'reduce',
      prompt: `Combine and deduplicate entities from all chunks:
               ${chunks.map((_, i) => `Chunk ${i}: {{map-${i}.output}}`).join('\n')}`,
      model: 'sonnet',
      dependsOn: chunks.map((_, i) => `map-${i}`),
    },
  ],
});

Error Handling in DAGs

Step-Level Retry

- id: critical-step
  prompt: "..."
  retry:
    maxAttempts: 3
    backoffMs: 2000

Fallback Steps

- id: primary
  prompt: "Analyze using primary method: {{input.data}}"
  model: opus
  timeoutMs: 30000
 
- id: fallback
  prompt: "Analyze using simplified method: {{input.data}}"
  model: sonnet
  dependsOn: [primary]
  condition: '{{primary.status}} === "failed"'

Plan-Level Error Handling

const result = await client.waitForPlan(executionId);
 
if (result.status === 'failed') {
  const failedSteps = Object.entries(result.steps)
    .filter(([_, step]) => step.status === 'failed');
 
  for (const [id, step] of failedSteps) {
    console.error(`Step ${id} failed: ${step.error}`);
    console.error(`  Retries: ${step.retryCount}`);
  }
}

Performance Optimization

  1. Maximize parallelism: Only add dependsOn when a step truly needs another step's output.
  2. Use fast models for triage: Route initial classification to haiku, then use opus only for complex steps.
  3. Set appropriate timeouts: Short timeouts for fast steps, longer for complex analysis.
  4. Limit output size: Use maxTokens to prevent verbose responses from consuming context in downstream steps.

Next Steps