Plans & DAGs
A Plan defines a directed acyclic graph (DAG) of steps. Each step is a task with optional dependencies on other steps. The orchestrator resolves the dependency graph, runs independent steps in parallel, and passes outputs from completed steps to dependent ones.
createPlan
Define a new workflow plan.
const plan = await client.createPlan(options: CreatePlanOptions): Promise<Plan>;Parameters
interface CreatePlanOptions {
/** Human-readable name for the plan. */
name: string;
/** Optional description. */
description?: string;
/** Ordered list of steps in the DAG. */
steps: PlanStep[];
/** Tags for filtering. */
tags?: string[];
/** Save as a reusable plan template. Default: false */
saveAsTemplate?: boolean;
}
interface PlanStep {
/** Unique identifier for this step within the plan. */
id: string;
/** The prompt template. Use {{stepId.output}} for dependency injection. */
prompt: string;
/** Model to use for this step. */
model?: string;
/** Queue to route this step to. */
queue?: string;
/** IDs of steps that must complete before this one starts. */
dependsOn?: string[];
/** Step-level timeout in milliseconds. */
timeoutMs?: number;
/** Retry configuration for this step. */
retry?: {
maxAttempts?: number;
backoffMs?: number;
};
/** Mark this step as requiring HITL approval. */
requiresApproval?: boolean;
/** Condition to skip this step. */
condition?: string;
/** Maximum tokens for the response. */
maxTokens?: number;
/** Temperature for model sampling. */
temperature?: number;
}Example
const plan = await client.createPlan({
name: 'data-pipeline',
description: 'ETL pipeline with validation',
steps: [
{
id: 'extract',
prompt: 'Extract structured data from this CSV: {{input.data}}',
model: 'sonnet',
},
{
id: 'validate',
prompt: 'Validate the extracted data against schema: {{input.schema}}. Data: {{extract.output}}',
model: 'haiku',
dependsOn: ['extract'],
},
{
id: 'transform',
prompt: 'Transform validated records into target format. Data: {{validate.output}}',
model: 'sonnet',
dependsOn: ['validate'],
condition: '{{validate.output.isValid}} === true',
},
{
id: 'load',
prompt: 'Generate SQL INSERT statements for: {{transform.output}}',
model: 'haiku',
dependsOn: ['transform'],
},
],
});executePlan
Execute a plan with input data.
const execution = await client.executePlan(
planId: string,
options: ExecutePlanOptions
): Promise<PlanExecution>;Parameters
interface ExecutePlanOptions {
/** Input data available to all steps via {{input.*}}. */
input?: Record<string, unknown>;
/** HITL configuration for this execution. */
hitl?: {
requiredFor?: string[];
confidenceThreshold?: number;
timeoutMs?: number;
notifyChannels?: ('email' | 'dashboard' | 'slack' | 'webhook')[];
escalation?: {
afterMs?: number;
to?: string;
};
};
/** Execution-level options. */
options?: {
timeoutMs?: number;
onStepComplete?: (step: StepResult) => void;
onStepFailed?: (step: StepResult) => void;
dryRun?: boolean;
};
}Example
const execution = await client.executePlan(plan.id, {
input: {
data: csvContent,
schema: schemaDefinition,
},
options: {
timeoutMs: 300_000,
onStepComplete: (step) => {
console.log(`[${step.id}] completed in ${step.durationMs}ms`);
},
},
});getPlanStatus
Check the status of a running plan execution.
const status = await client.getPlanStatus(executionId: string): Promise<PlanExecution>;Return Value
interface PlanExecution {
id: string;
planId: string;
status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' | 'paused';
progress: {
total: number;
completed: number;
running: number;
pending: number;
failed: number;
};
steps: Record<string, StepResult>;
input: Record<string, unknown>;
startedAt: string;
completedAt?: string;
usage: TokenUsage;
}
interface StepResult {
id: string;
status: TaskStatus;
output?: string;
error?: string;
model: string;
startedAt?: string;
completedAt?: string;
durationMs?: number;
usage?: TokenUsage;
retryCount: number;
}waitForPlan
Block until the plan execution completes.
const result = await client.waitForPlan(
executionId: string,
options?: WaitOptions
): Promise<PlanExecution>;Example
const result = await client.waitForPlan(execution.id, {
timeoutMs: 300_000,
pollIntervalMs: 3_000,
onProgress: (exec) => {
const p = exec.progress;
console.log(`Progress: ${p.completed}/${p.total} steps complete`);
},
});
console.log('Pipeline completed:', result.status);
console.log('Total cost:', result.usage.totalCost);Dependency Injection
Step outputs are automatically injected into dependent steps using template syntax:
steps: [
{
id: 'step1',
prompt: 'Generate a list of topics about {{input.subject}}',
},
{
id: 'step2',
prompt: 'For each topic in: {{step1.output}}, write a summary paragraph.',
dependsOn: ['step1'],
},
{
id: 'step3',
prompt: `Combine these summaries into a report:
Topics: {{step1.output}}
Summaries: {{step2.output}}`,
dependsOn: ['step1', 'step2'],
},
]The {{stepId.output}} template is replaced with the full text output of the referenced step. For structured data, parse JSON in your prompt instructions.
Conditional Steps
Skip steps based on conditions:
{
id: 'escalate',
prompt: 'Escalate this issue to senior management...',
dependsOn: ['analyze'],
condition: '{{analyze.output.severity}} === "critical"',
}Conditions support JavaScript-like expressions evaluated against step outputs.
Plan YAML Definition
Plans can also be defined as YAML files:
name: contract-review
description: Multi-step contract analysis
steps:
- id: extract
prompt: "Extract clauses from: {{input.contract}}"
model: sonnet
- id: analyze
prompt: "Analyze risks in: {{extract.output}}"
model: opus
dependsOn: [extract]
requiresApproval: true
- id: report
prompt: "Generate summary from: {{analyze.output}}"
model: sonnet
dependsOn: [analyze]Load and execute:
import { loadPlanFromFile } from 'devteam-sdk';
const plan = await client.createPlan(loadPlanFromFile('./pipeline.yaml'));Next Steps
- Fan-Out -- Parallel execution patterns
- HITL -- Adding human approval gates
- DAG Workflow Patterns -- Advanced patterns