DAG Workflow Patterns
This guide covers advanced patterns for building DAG workflows, including parallel branches, conditional logic, error handling, and composition.
Pattern: Linear Pipeline
The simplest DAG is a linear chain where each step depends on the previous one.
[A] → [B] → [C] → [D]steps:
- id: a
prompt: "Step A..."
- id: b
prompt: "Using A's output: {{a.output}}..."
dependsOn: [a]
- id: c
prompt: "Using B's output: {{b.output}}..."
dependsOn: [b]
- id: d
prompt: "Using C's output: {{c.output}}..."
dependsOn: [c]Use when: Steps must execute sequentially and each step depends on the previous output.
Pattern: Fan-Out / Fan-In
Split work across parallel branches, then merge results.
[A]
/ | \
[B] [C] [D]
\ | /
[E]steps:
- id: a
prompt: "Extract data..."
- id: b
prompt: "Analyze aspect 1: {{a.output}}"
dependsOn: [a]
- id: c
prompt: "Analyze aspect 2: {{a.output}}"
dependsOn: [a]
- id: d
prompt: "Analyze aspect 3: {{a.output}}"
dependsOn: [a]
- id: e
prompt: |
Combine analyses into a final report:
Aspect 1: {{b.output}}
Aspect 2: {{c.output}}
Aspect 3: {{d.output}}
dependsOn: [b, c, d]Use when: Multiple independent analyses can run concurrently before a synthesis step.
Pattern: Diamond Dependency
Two parallel paths converge at a single node.
[Extract]
/ \
[Risk] [Compliance]
\ /
[Report]This is the pattern used in the contract review template. Steps Risk and Compliance run in parallel because they share only the Extract dependency.
Pattern: Conditional Branching
Skip steps based on previous outputs.
[Triage] → [Critical Path] (if severity == critical)
→ [Normal Path] (if severity != critical)
→ [Report]steps:
- id: triage
prompt: |
Assess the severity of this incident: {{input.incident}}
Return JSON: { "severity": "critical" | "normal", "reason": "..." }
- id: critical-path
prompt: "Perform deep analysis for critical incident: {{triage.output}}"
dependsOn: [triage]
condition: '{{triage.output}}.includes("critical")'
model: opus
- id: normal-path
prompt: "Perform standard analysis: {{triage.output}}"
dependsOn: [triage]
condition: '{{triage.output}}.includes("normal")'
model: haiku
- id: report
prompt: |
Generate incident report from analysis:
{{critical-path.output}}
{{normal-path.output}}
dependsOn: [critical-path, normal-path]When a conditional step is skipped, its output is null. The merge step receives outputs from whichever branch executed. Design your merge prompt to handle both cases.
Pattern: Iterative Refinement
Run a step, evaluate the output, and retry if quality is insufficient.
const plan = await client.createPlan({
name: 'iterative-refinement',
steps: [
{
id: 'draft',
prompt: 'Write a technical blog post about {{input.topic}}',
model: 'sonnet',
},
{
id: 'evaluate',
prompt: `Rate this blog post on a scale of 1-10 for:
clarity, accuracy, engagement.
Return JSON: { "scores": {...}, "average": N, "feedback": "..." }
Post: {{draft.output}}`,
model: 'haiku',
dependsOn: ['draft'],
},
{
id: 'refine',
prompt: `Improve this blog post based on feedback:
Original: {{draft.output}}
Feedback: {{evaluate.output}}
Address all feedback points and improve the overall quality.`,
model: 'opus',
dependsOn: ['evaluate'],
condition: '{{evaluate.output}}.includes("average") && parseInt({{evaluate.output}}.match(/average.*?(\\d)/)[1]) < 8',
},
],
});Pattern: Multi-Model Consensus
Run the same task across multiple models and synthesize results.
[Prompt]
/ | | | \
[M1][M2][M3][M4][M5]
\ | | | /
[Consensus]const models = ['opus', 'sonnet', 'gpt4o', 'gemini-pro', 'llama-70b'];
const plan = await client.createPlan({
name: 'multi-model-consensus',
steps: [
...models.map((model, i) => ({
id: `model-${i}`,
prompt: '{{input.question}}',
model,
})),
{
id: 'consensus',
prompt: `Given these responses from 5 different AI models,
synthesize a consensus answer. Note areas of agreement
and disagreement.
${models.map((m, i) => `Model ${i + 1} (${m}): {{model-${i}.output}}`).join('\n')}`,
model: 'opus',
dependsOn: models.map((_, i) => `model-${i}`),
},
],
});Pattern: Map-Reduce
Process a large dataset by splitting, processing in parallel, and combining.
// Split input into chunks
const chunks = splitIntoChunks(largeDataset, 10);
const plan = await client.createPlan({
name: 'map-reduce',
steps: [
// Map phase: process each chunk
...chunks.map((chunk, i) => ({
id: `map-${i}`,
prompt: `Process this data chunk and extract key entities: ${JSON.stringify(chunk)}`,
model: 'haiku',
})),
// Reduce phase: combine results
{
id: 'reduce',
prompt: `Combine and deduplicate entities from all chunks:
${chunks.map((_, i) => `Chunk ${i}: {{map-${i}.output}}`).join('\n')}`,
model: 'sonnet',
dependsOn: chunks.map((_, i) => `map-${i}`),
},
],
});Error Handling in DAGs
Step-Level Retry
- id: critical-step
prompt: "..."
retry:
maxAttempts: 3
backoffMs: 2000Fallback Steps
- id: primary
prompt: "Analyze using primary method: {{input.data}}"
model: opus
timeoutMs: 30000
- id: fallback
prompt: "Analyze using simplified method: {{input.data}}"
model: sonnet
dependsOn: [primary]
condition: '{{primary.status}} === "failed"'Plan-Level Error Handling
const result = await client.waitForPlan(executionId);
if (result.status === 'failed') {
const failedSteps = Object.entries(result.steps)
.filter(([_, step]) => step.status === 'failed');
for (const [id, step] of failedSteps) {
console.error(`Step ${id} failed: ${step.error}`);
console.error(` Retries: ${step.retryCount}`);
}
}Performance Optimization
- Maximize parallelism: Only add
dependsOnwhen a step truly needs another step's output. - Use fast models for triage: Route initial classification to
haiku, then useopusonly for complex steps. - Set appropriate timeouts: Short timeouts for fast steps, longer for complex analysis.
- Limit output size: Use
maxTokensto prevent verbose responses from consuming context in downstream steps.
Next Steps
- Multi-Agent Orchestration -- Distributed agent patterns
- Plans SDK Reference -- API documentation for plan management