Skip to content

Typed Workflows

When you write a Fabric workflow with Pydantic models for task input and output, the Fabric server walks your task graph at boot, derives JSON Schemas from your models, and stores them in the workflow registry. External consumers — SocialSite, your own apps, anyone calling the SDKs — can then discover the contract for any workflow without having to read the source code.

This is the foundation for:

  • Form generation — render a UI that submits to a workflow using the input schema as the source of truth.
  • Client-side validation — fail fast on bad input before paying for a workflow run.
  • Server-side validation — opt in to ?validate=true and let Fabric reject bad payloads with a structured 400 instead of letting the workflow crash mid-run.
  • Type generation — point a code generator at the schemas to produce TypeScript types, Pydantic models in another project, or OpenAPI fragments.

The whole feature is opt-in per task: workflows that don’t use Pydantic still register and run normally, they just have null schemas in the registry. There’s no migration required, no breaking change, no maintenance burden.

Write your task functions with Pydantic input and output models. That’s it.

from pydantic import BaseModel, Field
from fabric_workflow_sdk import Flow, task
class TrendsInput(BaseModel):
"""Input for the trend research workflow."""
topic: str = Field(description="The niche to research")
max_results: int = Field(default=10, ge=1, le=100)
region: str = Field(default="US")
class TrendItem(BaseModel):
title: str
score: float
url: str | None = None
class TrendsOutput(BaseModel):
items: list[TrendItem]
summary: str
@task
async def gather_trends(input: TrendsInput) -> TrendsOutput:
"""Search the web for trending topics in the given niche."""
# ... your implementation ...
return TrendsOutput(items=[], summary="")
trend_research = Flow("research/trends").then(gather_trends).build()

That’s the entire change. The next time the Fabric server boots, it spawns a Python subprocess that imports your workflow file, walks the task graph, and writes the schemas into the registry. From then on, the workflow is queryable via the SDKs:

const schemas = await fabric.workflows.registry.getSchemas("research/trends");
console.log(schemas.input_schema);
// → JSON Schema describing TrendsInput
console.log(schemas.task_schemas);
// → [
// { task_id: "gather_trends",
// input_schema: { ...TrendsInput... },
// output_schema: { ...TrendsOutput... },
// description: "Search the web for trending topics in the given niche.",
// source: "pydantic" }
// ]

For the workflow above, the registry stores:

  • input_schema — JSON Schema for TrendsInput. Derived from the first user task’s input model.
  • output_schema — JSON Schema for TrendsOutput. Derived from the last user task’s output model.
  • output_aliases — public alias→dot-path map for chaining this workflow’s output into downstream stages of a workflow set. See the Output aliases section below.
  • task_schemas — an ordered array with one entry per user task, each containing the task’s input/output schemas plus the first line of its docstring.

Both the workflow-level pair AND the per-task array are populated. The workflow-level pair is the canonical “submit/result contract” that consumers use to render forms; the per-task array is the full breakdown for tooling that wants to display each step’s contract (e.g. a workflow visualizer).

The internal :capture and :finalize shim tasks the SDK auto-injects into every flow are excluded from task_schemas — they’re plumbing for the auto-output-capture feature, not user contracts.

Use fab-workflow --lint to introspect your workflow without running it. The lint command imports the file, walks the task graph, and prints exactly what the registry will store:

Terminal window
$ fab-workflow --lint research/trends
research/trends
description: Trend research workflow discover trending topics in a niche
file: workflows/research/trends.py
input: TrendsInput
output: TrendsOutput
tasks:
1. gather_trends in: TrendsInput out: TrendsOutput
warnings: (none)

If any warnings come up, the lint command exits with status 1 — you can use this in CI to gate PRs on type coverage:

.github/workflows/test.yml
- name: Lint workflow schemas
run: |
for wf in workflows/research/*.py workflows/video/*.py; do
.venv/bin/fab-workflow --lint "$wf" || exit 1
done

Workflows that don’t declare Pydantic types print a warning per untyped task and exit non-zero, so an unannotated task gets caught at PR review time instead of surfacing as a null schema in production.

Once a workflow has an input_schema, you can opt in to server-side validation of the request body. The server compiles the schema with jsonschema and rejects mismatched payloads with a structured 400:

try {
const run = await fabric.workflows.runs.submit("research/trends", {
input: { topic: 42 }, // wrong type
validate: true,
});
} catch (err) {
if (err.status === 400 && err.code === "input_validation_failed") {
console.log(err.details.errors);
// → [{ path: "/topic", message: "42 is not of type \"string\"" }]
}
}

Validation is opt-in in v1: the server only validates when you pass validate=true. Workflows whose input_schema is null (no Pydantic types declared) skip validation gracefully — submission proceeds as normal so opting in doesn’t break callers running against partially-typed workflows.

When fabric serve starts:

  1. The boot seed walks the layered workflow directories for .py files containing Flow("...") calls.
  2. For each file, the seed pre-filters via a fast Rust string scan (no Python execution), then spawns a Python subprocess for the files that match: python3 -m fabric_workflow_sdk.introspect <path>.
  3. The subprocess imports the file, finds the top-level Flow instance, calls Workflow.iter_nodes() to walk the task graph in topological order, and reads _input_type / _output_type off each task function.
  4. For each Pydantic type, it calls model.model_json_schema() and emits the result as JSON.
  5. The Rust parent collects all subprocess results in parallel (capped at available_parallelism().min(8)) and writes the schemas to the registry alongside the existing seed metadata.
  6. Total cost: typically 1–2 seconds for ~67 workflows on a modern laptop. Configurable via FABRIC_INTROSPECT_CONCURRENCY and FABRIC_INTROSPECT_TIMEOUT_SECS.

A workflow whose introspection FAILS — broken import, missing dependency, crash during Flow.build(), infinite loop — is still seeded with empty schemas so it remains discoverable and submittable. The failure is logged as a WARN with the file path and error variant. The seed loop never aborts on a single bad file.

If you want to skip introspection entirely (e.g. for very fast dev boots), set FABRIC_SKIP_SCHEMA_INTROSPECTION=1 and the seed falls back to the legacy fast path that only extracts workflow names.

Plan 038 also introduces a four-layer workflow discovery system so you can develop dynamic project-local workflows without losing the platform’s bundled defaults:

LayerWhereIntent
PlatformDiscovered via the binary’s install locationThe 67-ish workflows that ship with Fabric — always available
Project./workflows/ relative to wherever you launchedThe workflows you’re developing right now in this project
User~/.fabric/workflows/Your personal scratchpad workflows that follow you across projects
ConfigWhatever’s in runtime.workflow_dirs from your configExtra search paths from environment / file config

When the same workflow name appears in multiple layers, the higher-priority layer wins: Project > User > Config > Platform. The boot seed logs DEBUG lines for shadows so you can see what’s overridden. Each entry’s scope ends up as a scope:platform / scope:project etc. tag in the registry’s worker_tags.

Platform discovery resolves via:

  1. FABRIC_PLATFORM_WORKFLOWS_DIR env var (explicit override)
  2. <bin>/../share/fabric/workflows (FHS install path)
  3. <bin>/workflows (adjacent-to-binary, with a research/ sentinel)
  4. Source-tree fallback (./workflows/ with the same sentinel)

So you can cargo run from the Fabric source tree, fabric serve from a downstream project, or run a packaged fabric binary on a production host — the platform workflows are visible in all three cases.

model_json_schema() produces standard JSON Schema, so most of Pydantic’s expressiveness round-trips through to consumers:

  • Field constraintsField(ge=1, le=100, min_length=2, ...) become minimum, maximum, minLength keywords.
  • DescriptionsField(description="...") becomes description on the property.
  • DefaultsField(default=10) becomes default.
  • EnumsLiteral["a", "b", "c"] becomes enum.
  • Nested models — emit $defs and $ref. Preserved as-is by the schema extractor (no inlining), so your consumer’s JSON Schema validator handles them correctly.
  • Discriminated unionsField(discriminator="kind") becomes the JSON Schema discriminator keyword.
  • Custom validators — these run at validation time but don’t appear in the schema. Use Annotated types or Field with constraints if you want the rule to be machine-readable.
  • TypedDict / dataclass / plain type hints — v1 is Pydantic-only. Tasks with TypedDict inputs get source: "none" and no schema. We may add support in a follow-up; the JSON Schema quality will be thinner since TypedDict has no descriptions or defaults.
  • Default-on validation — v1 is opt-in via ?validate=true. Once schema coverage is broad enough across the in-tree workflows (mostly a question of authors adding Pydantic types to the ~64 workflows that don’t have them yet), we’ll flip the default to on in a future major release.
  • Forked-flow output unions — workflows that end in unjoined parallel branches get a JSON Schema oneOf over the branch outputs and a warning. If you want a clean output schema, add a merge task that joins the branches into a single output model.
  • Schema versioning / drift detection — if a workflow’s Pydantic types change incompatibly between two boots, the registry silently picks up the new schema. We may add a “warn on incompatible change” check in a follow-up.
  • Per-task schemas in the workflow output — the registry stores schemas; the workflow run output endpoint (GET /v1/workflows/runs/{id}/output) doesn’t yet surface per-task outputs. That’s tracked as plan 035 §5.

For an existing workflow without Pydantic types:

  1. Identify the workflow’s actual input contract. Look at how it’s currently submitted — what keys does it read from the input dict? What types? Which are required?
  2. Define a BaseModel subclass for the input. Use Field(description=...) liberally; the descriptions show up in the consumer’s form UI.
  3. Annotate the first task’s signature with the new model: async def my_task(input: MyInput) -> .... The Sayiir runtime already wraps @task-decorated functions with Pydantic validation — your model is now the runtime contract.
  4. Repeat for the output. The last task’s return type becomes the workflow’s output contract.
  5. Run fab-workflow --lint to verify the schemas look right before committing.
  6. Restart the server — the next boot picks up the new schemas automatically.

The rollout is incremental: workflows you haven’t typed yet keep working exactly as before. There’s no big-bang migration.

Many workflows need files as input — an actor portrait for avatar generation, a voiceover sample for voice cloning, a product photo for an ad composite. The AssetRef type solves the inbound side of this (just as save_artifact() solves the outbound side).

An AssetRef accepts exactly one of three forms:

FormWhen to use
asset_idFile was previously uploaded via POST /v1/assets. The worker downloads it via the asset API with auth headers.
urlFile is at a publicly fetchable HTTP(S) URL.
pathFile is already on the worker’s local filesystem (CLI runs, intra-workflow hand-off).

Use AssetRef as the Pydantic type for any input field that expects a file. The schema extractor emits it as a $ref to a shared $defs/AssetRef definition, so UI consumers can render the correct asset picker / file uploader instead of a text box.

from pydantic import Field
from fabric_workflow_sdk import AssetRef, Flow, fetch_asset, task
from fabric_workflow_sdk.schema import WorkflowModel, workflow_meta
class AvatarInput(WorkflowModel):
"""Generate a lip-synced talking-head clip from a portrait + audio."""
actor: AssetRef = Field(..., description="Actor portrait image (PNG/JPG)")
audio: AssetRef = Field(..., description="Audio clip to lip-sync (MP3/WAV)")
prompt: str = Field("Person speaking to camera, warm lighting", description="Stylistic prompt")
avatar_model: str = Field("fal-ai/kling-video/ai-avatar/v2/standard", description="Provider model id")
@task(timeout="10m", tags=["python", "ai", "video"])
async def render_avatar(input: AvatarInput) -> AvatarOutput:
validated = AvatarInput.model_validate(input if isinstance(input, dict) else input.model_dump())
context = input if isinstance(input, dict) else {}
actor_path = await fetch_asset(context, validated.actor)
audio_path = await fetch_asset(context, validated.audio)
# ... generate the talking head, save artifact, return output ...
avatar_pipeline = (
Flow("video/avatar", metadata=workflow_meta(AvatarInput, AvatarOutput))
.then(render_avatar)
.build()
)
import type { AssetRef } from "@fabric-platform/sdk";
// With a previously uploaded asset:
const run = await fabric.workflows.runs.submit("video/avatar", {
input: {
actor: { asset_id: "018f1234-abcd-..." } satisfies AssetRef,
audio: { url: "https://cdn.example.com/voiceover.mp3" } satisfies AssetRef,
prompt: "Person explaining with natural gestures",
},
});

Inside the task, call fetch_asset(input, ref) to resolve any AssetRef form into a local file path:

  • path → returned directly (existence check, no copy).
  • url → HTTP GET, written to a tempfile with the correct extension.
  • asset_idGET /v1/assets/{id}/download with Fabric auth headers, respects Content-Disposition filenames.

The result is always a pathlib.Path you can hand to ffmpeg, PIL, fal_client.upload_file(), etc. No manual URL construction or auth header plumbing needed.

fetch_asset() also accepts bare strings and dicts as shorthand — a string starting with http(s):// becomes a URL ref, an existing path becomes a path ref, anything else is treated as an asset id:

# All equivalent inside a task body:
path = await fetch_asset(input, AssetRef(asset_id="018f..."))
path = await fetch_asset(input, {"asset_id": "018f..."})
path = await fetch_asset(input, "018f...") # auto-detected as asset id

As of plan 038 landing, the in-tree workflows that already use Pydantic types are:

  • workflows/video/avatar.py — uses AssetRef for reference-file inputs
  • workflows/video/long_form.py
  • workflows/content/long_form_script.py
  • workflows/content/landing_page_factory.py

The rest are slated for incremental migration as authors touch them. There’s no deadline — typed and untyped workflows coexist indefinitely.

When a workflow is going to feed its output into another workflow as part of a multi-stage pipeline, the chain author needs a stable “public name” for each addressable output value. Fabric calls these output aliases, and they’re declared as a class-level dict on the WorkflowOutput subclass:

class HookGenerationOutput(WorkflowOutput):
hook_ideas: list[dict] = Field(default_factory=list)
# Public alias → dot-path into the dumped output dict.
# Plan 078. These names become part of the workflow's contract:
# renaming an alias key is a breaking change for downstream sets.
output_aliases = {
"hook": "hook_ideas.0.hook", # primary single-hook handle
"all_hooks": "hook_ideas",
}

A downstream workflow set then references the upstream by alias rather than walking the raw output schema:

runs:
- name: hooks
workflow: hooks/generate
input: { niche: "AI productivity" }
- name: short
workflow: video/quick-shorts
inputs_from:
# `hook` is the alias declared above; resolves internally to
# `hook_ideas.0.hook` but the chain stays stable across schema
# refactors.
- { source: hooks.0.hook, map: topic }

If a workflow doesn’t declare any output_aliases, every top-level field is implicitly auto-aliased to itself (foo"foo"), so simple cases just work.

Discoverability:

Terminal window
fab-workflow --lint hooks/generate # full schemas + aliases
fab-workflow --scaffold hooks/generate --show-output # output aliases + schema

Aliases are serialized into workflows/registry.lock.json next to each workflow’s output_schema, so downstream tooling can resolve them without re-importing the Python module.

Scaffolding a run-spec from a typed workflow

Section titled “Scaffolding a run-spec from a typed workflow”

fab-workflow --scaffold <workflow> reads the workflow’s model_json_schema() and emits a populated YAML / TOML / JSON template with field descriptions and constraints inline:

Terminal window
# YAML by default (with comments)
fab-workflow --scaffold research/trends > my-trends.yaml
# Pick a different format
fab-workflow --scaffold research/trends -o my-trends.toml --format toml
# Required-fields-only template
fab-workflow --scaffold research/trends --no-optional

The template is a standard run-spec that fab-workflow --from-file FILE accepts directly (see the Submitting Jobs guide).

  • Workflow Authoring — the foundation of writing Fabric workflows
  • Submitting Jobs — how the SDK methods fit into the broader submission flow, including the YAML/TOML/JSON request body formats and --scaffold / --from-file
  • Workflow Sets — multi-stage pipelines that consume the output aliases described above
  • Plan 038 — the schema-discovery design doc
  • Plan 078 — the scaffolding + workflow-sets design doc