Workflow Authoring
A workflow is a DAG (directed acyclic graph) of nodes that execute in dependency order with a shared context. Workflows enable multi-step pipelines like “download video, transcribe audio, generate summary.”
Core Concepts
Section titled “Core Concepts”Workflow Definition
Section titled “Workflow Definition”A reusable template that defines the graph of nodes, their dependencies, and configurations.
Workflow Run
Section titled “Workflow Run”A specific execution instance of a workflow definition, with its own context, state, and event history.
Shared Context
Section titled “Shared Context”A JSON object passed through the workflow. Nodes read inputs from context and write outputs back to it, enabling data flow between steps.
Creating a Workflow Definition
Section titled “Creating a Workflow Definition”import { WorkflowBuilder } from "@fabric-platform/sdk";
const workflow = new WorkflowBuilder("transcribe-and-summarize") .node("transcribe", "ai_invoke", (n) => n.config({ operation: "audio.transcribe", modality: "audio", }) .input("audio_url", "$context.audio.url") .output("transcript", "$context.transcript.text") ) .node("summarize", "ai_invoke", (n) => n.dependsOn("transcribe") .config({ operation: "ai.generate", modality: "text", }) .input("prompt", "$context.transcript.text") .output("summary", "$context.summary") ) .build();
const fabric = new FabricClient({ apiKey: "fab_xxx" });const defId = await fabric.registerDefinition(workflow);from fabric_platform import FabricClient
fabric = FabricClient(api_key="fab_xxx")
wf_id = fabric.upsert_workflow("transcribe-and-summarize", nodes=[ { "key": "transcribe", "operation": "audio.transcribe", "runtime": "provider", "config": {"modality": "audio"}, "inputs": {"audio_url": "$context.audio.url"}, "outputs": {"transcript": "$context.transcript.text"}, }, { "key": "summarize", "operation": "ai.generate", "runtime": "provider", "depends_on": ["transcribe"], "config": {"modality": "text"}, "inputs": {"prompt": "$context.transcript.text"}, "outputs": {"summary": "$context.summary"}, },])use fabric_sdk::FabricClient;
let client = FabricClient::new("http://localhost:3001", api_key)?;
let wf_id = client.upsert_workflow("transcribe-and-summarize", serde_json::json!({ "nodes": [ { "key": "transcribe", "operation": "audio.transcribe", "runtime": "provider", "config": {"modality": "audio"}, "inputs": {"audio_url": "$context.audio.url"}, "outputs": {"transcript": "$context.transcript.text"} }, { "key": "summarize", "operation": "ai.generate", "runtime": "provider", "depends_on": ["transcribe"], "config": {"modality": "text"}, "inputs": {"prompt": "$context.transcript.text"}, "outputs": {"summary": "$context.summary"} } ]})).await?;curl -X POST http://localhost:3001/v1/workflow-definitions \ -H 'Authorization: Bearer fab_xxx' \ -H 'content-type: application/json' \ -d '{ "name": "transcribe-and-summarize", "organization_id": "<org-id>", "nodes": [ { "key": "transcribe", "operation": "audio.transcribe", "runtime": "provider", "config": {"modality": "audio"}, "inputs": {"audio_url": "$context.audio.url"}, "outputs": {"transcript": "$context.transcript.text"} }, { "key": "summarize", "operation": "ai.generate", "runtime": "provider", "depends_on": ["transcribe"], "config": {"modality": "text"}, "inputs": {"prompt": "$context.transcript.text"}, "outputs": {"summary": "$context.summary"} } ] }'Running a Workflow
Section titled “Running a Workflow”// Create and start a run, stream events until completionconst result = await fabric.runDefinition(workflow, { context: { audio: { url: "https://example.com/podcast.wav" }, }, onEvent: (event) => { console.log(`${event.node_key}: ${event.kind}`); },});console.log("Summary:", result.context.summary);# Create a runrun_id = fabric.run_workflow(wf_id, context={ "audio": {"url": "https://example.com/podcast.wav"},})
# Wait for completionresult = fabric.wait_for_run(run_id)print("Summary:", result["context"]["summary"])// Create a runlet run_id = client.run_workflow(wf_id, serde_json::json!({ "audio": {"url": "https://example.com/podcast.wav"}})).await?;
// Wait for completionlet result = client.wait_for_run(run_id).await?;println!("Summary: {}", result.context["summary"]);# 1. Create a runcurl -X POST http://localhost:3001/v1/workflow-runs \ -H 'Authorization: Bearer fab_xxx' \ -H 'content-type: application/json' \ -d '{ "workflow_definition_id": "<definition-id>", "context": { "audio": {"url": "https://example.com/podcast.wav"} } }'
# 2. Start executioncurl -X POST http://localhost:3001/v1/workflow-runs/<run-id>/start
# 3. Stream events (replays history, then streams live)curl -N http://localhost:3001/v1/workflow-runs/<run-id>/events
# 4. Poll statuscurl http://localhost:3001/v1/workflow-runs/<run-id>
# 5. Cancel (if needed)curl -X POST http://localhost:3001/v1/workflow-runs/<run-id>/cancelIdempotency
Section titled “Idempotency”Workflow runs support idempotency keys to prevent duplicate execution:
const runId = await fabric.runWorkflow(wfId, { context: { audio: { url: "..." } }, idempotencyKey: "client-request-xyz",});run_id = fabric.run_workflow(wf_id, context={"audio": {"url": "..."}}, idempotency_key="client-request-xyz",)let run_id = client.run_workflow_with_key(wf_id, context, "client-request-xyz").await?;curl -X POST http://localhost:3001/v1/workflow-runs \ -H 'Authorization: Bearer fab_xxx' \ -H 'content-type: application/json' \ -d '{ "workflow_definition_id": "<definition-id>", "context": {"audio": {"url": "..."}}, "idempotency_key": "client-request-xyz" }'Duplicate submissions with the same idempotency key return the original run.
Context Propagation
Section titled “Context Propagation”Nodes communicate through the shared context using input/output bindings:
- Input bindings resolve values from the shared context before execution (e.g.,
$context.audio.url) - Output bindings write results back to the shared context after execution
- Merge policies control how outputs are written:
Replace,Merge, orAppend
The context version is tracked, enabling optimistic concurrency.
Execution Model
Section titled “Execution Model”DAG Scheduling
Section titled “DAG Scheduling”Nodes execute in dependency order using Kahn’s algorithm for topological sorting. Nodes with no unresolved dependencies execute concurrently.
Retries and Backoff
Section titled “Retries and Backoff”Nodes support configurable retry behavior:
- Maximum retry attempts
- Backoff strategy (exponential)
- Per-node timeout enforcement
Failure Modes
Section titled “Failure Modes”Workflows support two failure strategies:
| Mode | Behavior |
|---|---|
| Fail-fast | Cancel remaining nodes when any node fails |
| Partial completion | Continue executing independent branches even if one fails |
Step Claiming
Section titled “Step Claiming”The execution plane uses Postgres SELECT ... FOR UPDATE SKIP LOCKED for step claiming, ensuring:
- No double-execution across workers
- Automatic lease expiry if a worker crashes
- Horizontal scaling with multiple executor workers
Run States
Section titled “Run States”| State | Description |
|---|---|
created | Run exists but hasn’t started |
running | Executor is processing nodes |
completed | All nodes completed successfully |
failed | One or more nodes failed |
cancelled | Run was cancelled |
Event Types
Section titled “Event Types”Subscribe to workflow run events via SSE:
| Event | Description |
|---|---|
workflow.run.created | Run was created |
workflow.run.started | Executor began processing |
workflow.run.completed | All nodes completed |
workflow.run.failed | Run failed |
workflow.run.cancelled | Run was cancelled |
workflow.node.ready | Node dependencies satisfied |
workflow.node.started | Node execution began |
workflow.node.completed | Node finished with output |
workflow.node.failed | Node execution failed |
workflow.node.skipped | Node skipped (dependency failed) |