Streaming Events
Fabric provides real-time event streaming for all domain events — workflow execution, tenancy changes, and provider operations.
Subscribing to Events
Section titled “Subscribing to Events”Stream All Events
Section titled “Stream All Events”import { FabricClient } from "@fabric-platform/sdk";
const fabric = new FabricClient({ apiKey: "fab_xxx" });
await fabric.streamEvents((event) => { console.log(event.kind, event.node_key, event.payload);});from fabric_platform import FabricClient
fabric = FabricClient(api_key="fab_xxx")
for event in fabric.stream_events(): print(event["kind"], event.get("node_key"), event.get("payload"))use fabric_sdk::FabricClient;
let client = FabricClient::new("https://gofabric.dev", api_key)?;
let mut stream = client.stream_events().await?;while let Some(event) = stream.next().await { println!("{} {:?}", event.kind, event.payload);}curl -N https://gofabric.dev/v1/events/streamEvents arrive as SSE with this format:
event: workflow.run.starteddata: {"id":"...","kind":"workflow_run_started","run_id":"...","payload":{}}
event: workflow.node.completeddata: {"id":"...","kind":"workflow_node_completed","node_key":"generate","payload":{"output":{}}}Stream Events for a Specific Workflow Run
Section titled “Stream Events for a Specific Workflow Run”const result = await fabric.waitForRun("<run-id>", { onEvent: (event) => { console.log(`Node ${event.node_key}: ${event.kind}`); },});for event in fabric.stream_run_events("<run-id>"): print(f"Node {event.get('node_key')}: {event['kind']}")let mut stream = client.stream_run_events("<run-id>").await?;while let Some(event) = stream.next().await { println!("Node {}: {}", event.node_key.unwrap_or_default(), event.kind);}curl -N https://gofabric.dev/v1/workflow-runs/{run_id}/eventsThis endpoint:
- Replays all existing events for the run (catch-up)
- Streams live events as they occur (filtered to this run)
- Auto-closes after a terminal event (
workflow.run.completed,workflow.run.failed,workflow.run.cancelled)
A client connecting mid-execution receives the full event history before seeing live updates.
Reconnection with Last-Event-ID
Section titled “Reconnection with Last-Event-ID”Each SSE message includes an id: field (the event UUID). If the connection drops, pass the last received ID via the Last-Event-ID header to skip already-seen events on reconnect:
curl -N -H "Last-Event-ID: evt-uuid-of-last-seen" \ https://gofabric.dev/v1/workflow-runs/{run_id}/eventsThe TypeScript SDK handles this automatically when reconnect: true is set.
Internal Node Filtering
Section titled “Internal Node Filtering”Fabric’s workflow SDK wraps every workflow in internal shim tasks (_fabric_capture_input, _fabric_finalize_output). These are hidden from event streams by default. To include them (useful for debugging), add ?include_internal=true:
curl -N "https://gofabric.dev/v1/workflow-runs/{run_id}/events?include_internal=true"Streaming from the Browser
Section titled “Streaming from the Browser”The TypeScript SDK’s SSE layer uses fetch + ReadableStream, not the browser’s
native EventSource. This means fabric.workflows.runs.watch() and
streamEvents() work directly from the browser with custom Authorization
headers — no server-side proxy required.
Auth pattern: never ship a long-lived fab_... API key to the browser.
Configure the client with a dynamic token provider that fetches a short-lived
token from your own backend:
import { FabricClient } from "@fabric-platform/sdk";
export const fabric = new FabricClient({ baseUrl: "https://api.fabric.example.com", auth: async () => { const res = await fetch("/api/fabric-token", { credentials: "include" }); const { token } = await res.json(); return token; },});The provider is re-invoked on every fetch call, so tokens refresh naturally on reconnect. Your backend only needs one route: mint a short-lived Fabric token for the logged-in user.
React usage — the SDK ships a useWorkflowRun hook at
@fabric-platform/sdk/react that wraps watch() and gives you typed state:
import { useState } from "react";import { useWorkflowRun } from "@fabric-platform/sdk/react";import { fabric } from "./fabric-client";
export function RunTrigger({ workflowName }: { workflowName: string }) { const [runId, setRunId] = useState<string | null>(null); const run = useWorkflowRun(fabric, runId);
return ( <div> <button disabled={runId !== null && !run.isComplete} onClick={async () => { const r = await fabric.workflows.runs.submit(workflowName, { input: { topic: "quarterly trends" }, }); setRunId(r.id); }} > Run </button>
<progress value={run.progressPercent} max={100} /> <p>{run.currentStep ?? (runId ? "waiting…" : "idle")}</p> {run.error && <p>Error: {run.error.message}</p>} {run.isComplete && <p>Done: {run.runStatus}</p>} </div> );}CORS requirement: your Fabric API must include the caller’s origin in
CORS_ALLOWED_ORIGINS, and the allow_headers list must cover Authorization,
Cache-Control, and Last-Event-ID. The default tower_http::cors config that
ships with fabric serve already includes all three.
When to use what:
| Approach | Transport | Auth | Use when |
|---|---|---|---|
useWorkflowRun | fetch + ReadableStream | Bearer token (dynamic provider) | You want direct browser → Fabric streaming, no proxy hop |
usePipelineProgress | Native EventSource | Cookies / same-origin only | You already proxy SSE through your own server via createSSEResponse() |
Both are legitimate. useWorkflowRun is strictly newer; reach for it when
starting fresh.
Stream Events for a Specific Job
Section titled “Stream Events for a Specific Job”await fabric.streamEvents("<job-id>", (event) => { console.log(event.kind, event.payload);});for event in fabric.stream_job_events("<job-id>"): print(event["kind"], event.get("payload"))let mut stream = client.stream_job_events("<job-id>").await?;while let Some(event) = stream.next().await { println!("{} {:?}", event.kind, event.payload);}curl -N https://gofabric.dev/v1/jobs/{job_id}/eventsSame replay + live streaming behavior, filtered to the specific job.
NDJSON Streaming
Section titled “NDJSON Streaming”For programmatic consumers that prefer line-by-line parsing without SSE framing, use the NDJSON endpoint:
curl -N https://gofabric.dev/v1/workflow-runs/{run_id}/events/ndjsonEach line is a standalone JSON object:
{"id":"...","kind":"workflow_run_started","run_id":"...","payload":{}}{"id":"...","kind":"workflow_node_completed","node_key":"generate","payload":{"output":{}}}{"id":"...","kind":"workflow_run_completed","payload":{"output":{...}}}Same replay + live + auto-close behavior as SSE, just without event type headers or id: lines.
WebSocket
Section titled “WebSocket”wscat -c wss://gofabric.dev/v1/events/wsWebSocket delivers the same event structure as SSE, using JSON messages.
Per-Run WebSocket
Section titled “Per-Run WebSocket”Connect directly to a run-scoped WebSocket:
wscat -c wss://gofabric.dev/v1/workflow-runs/{run_id}/wsClient Commands
Section titled “Client Commands”WebSocket connections support bidirectional communication. Send JSON commands to control the stream:
{"subscribe_run": "run-uuid"}{"unsubscribe": true}{"include_internal": true}| Command | Description |
|---|---|
subscribe_run | Filter events to a specific run ID |
unsubscribe | Clear the run filter, receive all events again |
include_internal | Toggle visibility of internal SDK shim tasks |
Unlike SSE, WebSocket streams do not replay historical events — new clients start from the current moment. The connection persists until the client disconnects (no auto-close on terminal events).
GraphQL Subscriptions
Section titled “GraphQL Subscriptions”The GraphQL API exposes event subscriptions over WebSocket:
subscription { workflowRunEvents(runId: "run-uuid") { id kind kindRaw organizationId runId nodeKey payload timestamp }}Use events(orgId: "org-uuid") to subscribe to all events for an organization. Maximum 10 concurrent subscriptions per principal.
Event Types
Section titled “Event Types”Workflow Events
Section titled “Workflow Events”| Event | Description |
|---|---|
workflow.run.created | Run was created |
workflow.run.started | Executor began processing |
workflow.run.completed | All nodes completed successfully |
workflow.run.failed | One or more nodes failed |
workflow.run.cancelled | Run was cancelled |
workflow.node.ready | Node dependencies satisfied |
workflow.node.started | Node execution began |
workflow.node.completed | Node finished with output |
workflow.node.failed | Node execution failed |
workflow.node.skipped | Node skipped (dependency failed or cancelled) |
Job Events
Section titled “Job Events”| Event | Description |
|---|---|
job.created | Job was created |
job.started | Executor began processing |
job.completed | Job finished with output |
job.failed | Job execution failed |
Tenancy Events
Section titled “Tenancy Events”| Event | Description |
|---|---|
organization.created | Organization was created |
team.created | Team was created |
membership.created | Membership was created |
invitation.created | Invitation was sent |
invitation.accepted | Invitation was accepted |
invitation.revoked | Invitation was revoked |
Event Payload Structure
Section titled “Event Payload Structure”Every event follows this structure:
{ "id": "event-uuid", "kind": "workflow.node.completed", "organization_id": "org-uuid or null", "workflow_id": "wf-uuid or null", "run_id": "run-uuid or null", "node_key": "generate or null", "payload": { "output": {}, "context_version": 3 }, "timestamp": "2026-03-19T12:00:00Z"}Run State in Terminal Events
Section titled “Run State in Terminal Events”If a workflow run was submitted with an opaque state object, that state is echoed back inside payload.state on terminal events (workflow.run.completed, workflow.run.failed, workflow.run.cancelled). This lets consumers correlate completions with caller-supplied context without a follow-up API call. See the Webhooks Reference for details.
Artifact Download URLs in Terminal Events
Section titled “Artifact Download URLs in Terminal Events”Terminal events include an output_url field in the payload — a stable API URL that returns the workflow output and all artifacts with signed download URLs. See the Webhooks Reference for the full response shape and SDK examples.
{ "kind": "workflow.run.completed", "payload": { "output_url": "https://gofabric.dev/v1/workflows/runs/run-uuid/output" }}Workflow-generated assets are automatically cleaned up after 72 hours. Download or mirror assets promptly. See Asset Lifecycle for details.
Webhooks
Section titled “Webhooks”Fabric supports webhook delivery for push-based event notifications. Register an HTTP endpoint, and Fabric will POST signed payloads for every matching event with automatic retries.
- HMAC signing — Every delivery is signed with HMAC-SHA256 (
X-Fabric-Signatureheader) - Exponential backoff — Automatic retries with configurable
max_retries(30s × 2attempt) - Event & resource filtering — Subscribe to specific event types and scope to individual workflows or runs
- Delivery tracking — Full delivery history with status, response codes, and error messages
All transports (SSE, WebSocket, webhooks) deliver the same DomainEvent structure.
See the Webhooks Reference for the complete guide — event catalog, SDK examples, signature verification, and API endpoints.
Transport Comparison
Section titled “Transport Comparison”| SSE | NDJSON | WebSocket | GraphQL Sub | Webhooks | |
|---|---|---|---|---|---|
| Protocol | HTTP long-lived | HTTP long-lived | WS | WS (GraphQL) | HTTP POST |
| Replays history | Yes | Yes | No | No | No |
| Auto-closes on terminal | Yes | Yes | No | No | N/A |
| Reconnection | Last-Event-ID | Manual | Manual | Protocol-level | Automatic retries |
| Direction | Server → Client | Server → Client | Bidirectional | Server → Client | Server → Server |
| Best for | Browser UIs, progress bars | CLI tools, scripts | Interactive apps | GraphQL clients | Backend pipelines, automation |
Choosing a Transport
Section titled “Choosing a Transport”- Browser UI showing run progress: SSE via
useWorkflowRunhook — replay ensures late-connecting clients see full history, auto-close cleans up on completion. - Backend processing on completion: Webhooks — reliable delivery with retries, HMAC signing, and no long-lived connections.
- CLI or scripting: NDJSON — simple line-by-line parsing, no SSE framing overhead.
- Interactive dashboards: WebSocket — bidirectional commands let you subscribe/unsubscribe dynamically.
- GraphQL clients: GraphQL subscriptions — native integration with your existing GraphQL transport.