Typed Workflows
When you write a Fabric workflow with Pydantic models for task input and output, the Fabric server walks your task graph at boot, derives JSON Schemas from your models, and stores them in the workflow registry. External consumers — SocialSite, your own apps, anyone calling the SDKs — can then discover the contract for any workflow without having to read the source code.
This is the foundation for:
- Form generation — render a UI that submits to a workflow using the input schema as the source of truth.
- Client-side validation — fail fast on bad input before paying for a workflow run.
- Server-side validation — opt in to
?validate=trueand let Fabric reject bad payloads with a structured 400 instead of letting the workflow crash mid-run. - Type generation — point a code generator at the schemas to produce TypeScript types, Pydantic models in another project, or OpenAPI fragments.
The whole feature is opt-in per task: workflows that don’t use Pydantic still register and run normally, they just have null schemas in the registry. There’s no migration required, no breaking change, no maintenance burden.
The pattern
Section titled “The pattern”Write your task functions with Pydantic input and output models. That’s it.
from pydantic import BaseModel, Fieldfrom fabric_workflow_sdk import Flow, task
class TrendsInput(BaseModel): """Input for the trend research workflow.""" topic: str = Field(description="The niche to research") max_results: int = Field(default=10, ge=1, le=100) region: str = Field(default="US")
class TrendItem(BaseModel): title: str score: float url: str | None = None
class TrendsOutput(BaseModel): items: list[TrendItem] summary: str
@taskasync def gather_trends(input: TrendsInput) -> TrendsOutput: """Search the web for trending topics in the given niche.""" # ... your implementation ... return TrendsOutput(items=[], summary="")
trend_research = Flow("research/trends").then(gather_trends).build()That’s the entire change. The next time the Fabric server boots, it spawns a Python subprocess that imports your workflow file, walks the task graph, and writes the schemas into the registry. From then on, the workflow is queryable via the SDKs:
const schemas = await fabric.workflows.registry.getSchemas("research/trends");
console.log(schemas.input_schema);// → JSON Schema describing TrendsInput
console.log(schemas.task_schemas);// → [// { task_id: "gather_trends",// input_schema: { ...TrendsInput... },// output_schema: { ...TrendsOutput... },// description: "Search the web for trending topics in the given niche.",// source: "pydantic" }// ]from fabric_platform import FabricClient
client = FabricClient(api_key="fab_xxx")schemas = client.get_workflow_schemas("research/trends")
print(schemas["input_schema"]) # JSON Schema describing TrendsInputprint(schemas["task_schemas"]) # Per-task breakdowncurl -H "Authorization: Bearer fab_xxx" \ https://api.fabric.dev/v1/workflow-schemas/research/trendsWhat ends up in the registry
Section titled “What ends up in the registry”For the workflow above, the registry stores:
input_schema— JSON Schema forTrendsInput. Derived from the first user task’s input model.output_schema— JSON Schema forTrendsOutput. Derived from the last user task’s output model.output_aliases— public alias→dot-path map for chaining this workflow’s output into downstream stages of a workflow set. See the Output aliases section below.task_schemas— an ordered array with one entry per user task, each containing the task’s input/output schemas plus the first line of its docstring.
Both the workflow-level pair AND the per-task array are populated. The workflow-level pair is the canonical “submit/result contract” that consumers use to render forms; the per-task array is the full breakdown for tooling that wants to display each step’s contract (e.g. a workflow visualizer).
The internal :capture and :finalize shim tasks the SDK
auto-injects into every flow are excluded from task_schemas —
they’re plumbing for the auto-output-capture feature, not user
contracts.
Verifying schemas locally
Section titled “Verifying schemas locally”Use fab-workflow --lint to introspect your workflow without
running it. The lint command imports the file, walks the task graph,
and prints exactly what the registry will store:
$ fab-workflow --lint research/trends
research/trends description: Trend research workflow — discover trending topics in a niche file: workflows/research/trends.py input: ✓ TrendsInput output: ✓ TrendsOutput tasks: 1. ✓ gather_trends in: TrendsInput out: TrendsOutput warnings: (none)If any warnings come up, the lint command exits with status 1 — you can use this in CI to gate PRs on type coverage:
- name: Lint workflow schemas run: | for wf in workflows/research/*.py workflows/video/*.py; do .venv/bin/fab-workflow --lint "$wf" || exit 1 doneWorkflows that don’t declare Pydantic types print a warning per untyped task and exit non-zero, so an unannotated task gets caught at PR review time instead of surfacing as a null schema in production.
Server-side validation (opt-in)
Section titled “Server-side validation (opt-in)”Once a workflow has an input_schema, you can opt in to server-side
validation of the request body. The server compiles the schema with
jsonschema and rejects
mismatched payloads with a structured 400:
try { const run = await fabric.workflows.runs.submit("research/trends", { input: { topic: 42 }, // wrong type validate: true, });} catch (err) { if (err.status === 400 && err.code === "input_validation_failed") { console.log(err.details.errors); // → [{ path: "/topic", message: "42 is not of type \"string\"" }] }}from fabric_platform.errors import FabricValidationError
try: run = client.submit_run( "research/trends", input={"topic": 42}, # wrong type validate=True, )except FabricValidationError as e: print(e.response.json()) # → {"meta": {...}, "error": {"code": "input_validation_failed", # "message": "...", # "details": {"errors": [{"path": "/topic", "message": "..."}]}}}Validation is opt-in in v1: the server only validates when you
pass validate=true. Workflows whose input_schema is null (no
Pydantic types declared) skip validation gracefully — submission
proceeds as normal so opting in doesn’t break callers running
against partially-typed workflows.
How it works under the hood
Section titled “How it works under the hood”When fabric serve starts:
- The boot seed walks the layered workflow directories
for
.pyfiles containingFlow("...")calls. - For each file, the seed pre-filters via a fast Rust string scan
(no Python execution), then spawns a Python subprocess for the
files that match:
python3 -m fabric_workflow_sdk.introspect <path>. - The subprocess imports the file, finds the top-level
Flowinstance, callsWorkflow.iter_nodes()to walk the task graph in topological order, and reads_input_type/_output_typeoff each task function. - For each Pydantic type, it calls
model.model_json_schema()and emits the result as JSON. - The Rust parent collects all subprocess results in parallel
(capped at
available_parallelism().min(8)) and writes the schemas to the registry alongside the existing seed metadata. - Total cost: typically 1–2 seconds for ~67 workflows on a modern
laptop. Configurable via
FABRIC_INTROSPECT_CONCURRENCYandFABRIC_INTROSPECT_TIMEOUT_SECS.
A workflow whose introspection FAILS — broken import, missing
dependency, crash during Flow.build(), infinite loop — is still
seeded with empty schemas so it remains discoverable and
submittable. The failure is logged as a WARN with the file path and
error variant. The seed loop never aborts on a single bad file.
If you want to skip introspection entirely (e.g. for very fast dev
boots), set FABRIC_SKIP_SCHEMA_INTROSPECTION=1 and the seed falls
back to the legacy fast path that only extracts workflow names.
Layered workflow discovery
Section titled “Layered workflow discovery”Plan 038 also introduces a four-layer workflow discovery system so you can develop dynamic project-local workflows without losing the platform’s bundled defaults:
| Layer | Where | Intent |
|---|---|---|
| Platform | Discovered via the binary’s install location | The 67-ish workflows that ship with Fabric — always available |
| Project | ./workflows/ relative to wherever you launched | The workflows you’re developing right now in this project |
| User | ~/.fabric/workflows/ | Your personal scratchpad workflows that follow you across projects |
| Config | Whatever’s in runtime.workflow_dirs from your config | Extra search paths from environment / file config |
When the same workflow name appears in multiple layers, the
higher-priority layer wins: Project > User > Config > Platform.
The boot seed logs DEBUG lines for shadows so you can see what’s
overridden. Each entry’s scope ends up as a scope:platform /
scope:project etc. tag in the registry’s worker_tags.
Platform discovery resolves via:
FABRIC_PLATFORM_WORKFLOWS_DIRenv var (explicit override)<bin>/../share/fabric/workflows(FHS install path)<bin>/workflows(adjacent-to-binary, with aresearch/sentinel)- Source-tree fallback (
./workflows/with the same sentinel)
So you can cargo run from the Fabric source tree, fabric serve
from a downstream project, or run a packaged fabric binary on a
production host — the platform workflows are visible in all three
cases.
Pydantic features that survive
Section titled “Pydantic features that survive”model_json_schema() produces standard JSON Schema, so most of
Pydantic’s expressiveness round-trips through to consumers:
- Field constraints —
Field(ge=1, le=100, min_length=2, ...)becomeminimum,maximum,minLengthkeywords. - Descriptions —
Field(description="...")becomesdescriptionon the property. - Defaults —
Field(default=10)becomesdefault. - Enums —
Literal["a", "b", "c"]becomesenum. - Nested models — emit
$defsand$ref. Preserved as-is by the schema extractor (no inlining), so your consumer’s JSON Schema validator handles them correctly. - Discriminated unions —
Field(discriminator="kind")becomes the JSON Schemadiscriminatorkeyword. - Custom validators — these run at validation time but don’t
appear in the schema. Use
Annotatedtypes orFieldwith constraints if you want the rule to be machine-readable.
What doesn’t work yet (v1 scope)
Section titled “What doesn’t work yet (v1 scope)”- TypedDict / dataclass / plain type hints — v1 is Pydantic-only.
Tasks with TypedDict inputs get
source: "none"and no schema. We may add support in a follow-up; the JSON Schema quality will be thinner since TypedDict has no descriptions or defaults. - Default-on validation — v1 is opt-in via
?validate=true. Once schema coverage is broad enough across the in-tree workflows (mostly a question of authors adding Pydantic types to the ~64 workflows that don’t have them yet), we’ll flip the default to on in a future major release. - Forked-flow output unions — workflows that end in unjoined
parallel branches get a JSON Schema
oneOfover the branch outputs and a warning. If you want a clean output schema, add amergetask that joins the branches into a single output model. - Schema versioning / drift detection — if a workflow’s Pydantic types change incompatibly between two boots, the registry silently picks up the new schema. We may add a “warn on incompatible change” check in a follow-up.
- Per-task schemas in the workflow output — the registry stores
schemas; the workflow run output endpoint
(
GET /v1/workflows/runs/{id}/output) doesn’t yet surface per-task outputs. That’s tracked as plan 035 §5.
Migration tips
Section titled “Migration tips”For an existing workflow without Pydantic types:
- Identify the workflow’s actual input contract. Look at how it’s currently submitted — what keys does it read from the input dict? What types? Which are required?
- Define a
BaseModelsubclass for the input. UseField(description=...)liberally; the descriptions show up in the consumer’s form UI. - Annotate the first task’s signature with the new model:
async def my_task(input: MyInput) -> .... The Sayiir runtime already wraps@task-decorated functions with Pydantic validation — your model is now the runtime contract. - Repeat for the output. The last task’s return type becomes the workflow’s output contract.
- Run
fab-workflow --lintto verify the schemas look right before committing. - Restart the server — the next boot picks up the new schemas automatically.
The rollout is incremental: workflows you haven’t typed yet keep working exactly as before. There’s no big-bang migration.
Reference-file inputs with AssetRef
Section titled “Reference-file inputs with AssetRef”Many workflows need files as input — an actor portrait for avatar
generation, a voiceover sample for voice cloning, a product photo for
an ad composite. The AssetRef type solves the inbound side of this
(just as save_artifact() solves the outbound side).
An AssetRef accepts exactly one of three forms:
| Form | When to use |
|---|---|
asset_id | File was previously uploaded via POST /v1/assets. The worker downloads it via the asset API with auth headers. |
url | File is at a publicly fetchable HTTP(S) URL. |
path | File is already on the worker’s local filesystem (CLI runs, intra-workflow hand-off). |
Declaring reference-file inputs
Section titled “Declaring reference-file inputs”Use AssetRef as the Pydantic type for any input field that expects
a file. The schema extractor emits it as a $ref to a shared
$defs/AssetRef definition, so UI consumers can render the correct
asset picker / file uploader instead of a text box.
from pydantic import Fieldfrom fabric_workflow_sdk import AssetRef, Flow, fetch_asset, taskfrom fabric_workflow_sdk.schema import WorkflowModel, workflow_meta
class AvatarInput(WorkflowModel): """Generate a lip-synced talking-head clip from a portrait + audio.""" actor: AssetRef = Field(..., description="Actor portrait image (PNG/JPG)") audio: AssetRef = Field(..., description="Audio clip to lip-sync (MP3/WAV)") prompt: str = Field("Person speaking to camera, warm lighting", description="Stylistic prompt") avatar_model: str = Field("fal-ai/kling-video/ai-avatar/v2/standard", description="Provider model id")
@task(timeout="10m", tags=["python", "ai", "video"])async def render_avatar(input: AvatarInput) -> AvatarOutput: validated = AvatarInput.model_validate(input if isinstance(input, dict) else input.model_dump()) context = input if isinstance(input, dict) else {}
actor_path = await fetch_asset(context, validated.actor) audio_path = await fetch_asset(context, validated.audio) # ... generate the talking head, save artifact, return output ...
avatar_pipeline = ( Flow("video/avatar", metadata=workflow_meta(AvatarInput, AvatarOutput)) .then(render_avatar) .build())Submitting with reference files
Section titled “Submitting with reference files”import type { AssetRef } from "@fabric-platform/sdk";
// With a previously uploaded asset:const run = await fabric.workflows.runs.submit("video/avatar", { input: { actor: { asset_id: "018f1234-abcd-..." } satisfies AssetRef, audio: { url: "https://cdn.example.com/voiceover.mp3" } satisfies AssetRef, prompt: "Person explaining with natural gestures", },});from fabric_platform import FabricClient, AssetRef
client = FabricClient(api_key="fab_xxx")run = client.submit_workflow_run( "video/avatar", input={ "actor": AssetRef(asset_id="018f1234-abcd-...").model_dump(), "audio": AssetRef(url="https://cdn.example.com/voiceover.mp3").model_dump(), "prompt": "Person explaining with natural gestures", },)fab-workflow video/avatar \ --input actor.path=./portrait.png \ --input audio.path=./voiceover.mp3 \ --input prompt="Person speaking to camera"How fetch_asset() resolves references
Section titled “How fetch_asset() resolves references”Inside the task, call fetch_asset(input, ref) to resolve any
AssetRef form into a local file path:
path→ returned directly (existence check, no copy).url→ HTTP GET, written to a tempfile with the correct extension.asset_id→GET /v1/assets/{id}/downloadwith Fabric auth headers, respectsContent-Dispositionfilenames.
The result is always a pathlib.Path you can hand to ffmpeg, PIL,
fal_client.upload_file(), etc. No manual URL construction or auth
header plumbing needed.
fetch_asset() also accepts bare strings and dicts as shorthand —
a string starting with http(s):// becomes a URL ref, an existing
path becomes a path ref, anything else is treated as an asset id:
# All equivalent inside a task body:path = await fetch_asset(input, AssetRef(asset_id="018f..."))path = await fetch_asset(input, {"asset_id": "018f..."})path = await fetch_asset(input, "018f...") # auto-detected as asset idToday’s typed workflows
Section titled “Today’s typed workflows”As of plan 038 landing, the in-tree workflows that already use Pydantic types are:
workflows/video/avatar.py— usesAssetReffor reference-file inputsworkflows/video/long_form.pyworkflows/content/long_form_script.pyworkflows/content/landing_page_factory.py
The rest are slated for incremental migration as authors touch them. There’s no deadline — typed and untyped workflows coexist indefinitely.
Output aliases (for workflow sets)
Section titled “Output aliases (for workflow sets)”When a workflow is going to feed its output into another workflow as
part of a multi-stage pipeline, the chain author needs a stable
“public name” for each addressable output value. Fabric calls these
output aliases, and they’re declared as a class-level dict on
the WorkflowOutput subclass:
class HookGenerationOutput(WorkflowOutput): hook_ideas: list[dict] = Field(default_factory=list)
# Public alias → dot-path into the dumped output dict. # Plan 078. These names become part of the workflow's contract: # renaming an alias key is a breaking change for downstream sets. output_aliases = { "hook": "hook_ideas.0.hook", # primary single-hook handle "all_hooks": "hook_ideas", }A downstream workflow set then references the upstream by alias rather than walking the raw output schema:
runs: - name: hooks workflow: hooks/generate input: { niche: "AI productivity" }
- name: short workflow: video/quick-shorts inputs_from: # `hook` is the alias declared above; resolves internally to # `hook_ideas.0.hook` but the chain stays stable across schema # refactors. - { source: hooks.0.hook, map: topic }If a workflow doesn’t declare any output_aliases, every top-level
field is implicitly auto-aliased to itself (foo → "foo"), so
simple cases just work.
Discoverability:
fab-workflow --lint hooks/generate # full schemas + aliasesfab-workflow --scaffold hooks/generate --show-output # output aliases + schemaAliases are serialized into workflows/registry.lock.json next to
each workflow’s output_schema, so downstream tooling can resolve
them without re-importing the Python module.
Scaffolding a run-spec from a typed workflow
Section titled “Scaffolding a run-spec from a typed workflow”fab-workflow --scaffold <workflow> reads the workflow’s
model_json_schema() and emits a populated YAML / TOML / JSON
template with field descriptions and constraints inline:
# YAML by default (with comments)fab-workflow --scaffold research/trends > my-trends.yaml
# Pick a different formatfab-workflow --scaffold research/trends -o my-trends.toml --format toml
# Required-fields-only templatefab-workflow --scaffold research/trends --no-optionalThe template is a standard run-spec that
fab-workflow --from-file FILE accepts directly (see the
Submitting Jobs guide).
See also
Section titled “See also”- Workflow Authoring — the foundation of writing Fabric workflows
- Submitting Jobs — how the SDK methods
fit into the broader submission flow, including the YAML/TOML/JSON
request body formats and
--scaffold/--from-file - Workflow Sets — multi-stage pipelines that consume the output aliases described above
- Plan 038 — the schema-discovery design doc
- Plan 078 — the scaffolding + workflow-sets design doc