Skip to content

Python Workflow SDK

The Python Workflow SDK (fabric-workflow-sdk) is the toolkit for building Fabric workflows. It provides the Flow DSL for composing tasks into pipelines, AI generation helpers (text, image, video), model routing, web research, cost tracking, and file utilities.

Terminal window
pip install fabric-workflow-sdk
# With local model support
pip install "fabric-workflow-sdk[local-all]"

Requirements: Python >= 3.11

Core dependencies: sayiir (workflow DSL), httpx, google-genai, python-dotenv, pydantic

from fabric_workflow_sdk import Flow, task, generate, generate_json, get_model
@task(timeout="5m", retries=2)
async def research(input: dict) -> dict:
from fabric_workflow_sdk import search_web
results = await search_web(input["topic"], num=10)
return {**input, "results": results}
@task(timeout="3m")
async def summarize(input: dict) -> dict:
summary = generate(input, f"Summarize: {input['results']}")
return {**input, "summary": summary}
pipeline = (
Flow("my/research-pipeline")
.then(research)
.then(summarize)
.build()
)

Flow builds directed acyclic graphs of tasks. Chain tasks with .then(), fork into parallel branches, loop, delay, or compose child workflows.

from fabric_workflow_sdk import Flow, task
pipeline = (
Flow("my/pipeline")
.then(step_one)
.then(step_two)
.fork()
.branch(parallel_a)
.branch(parallel_b)
.branch(parallel_c)
.join(merge_results)
.then(final_step)
.build()
)
MethodDescription
.then(task_fn)Chain a task sequentially
.then_flow(child)Chain a child workflow
.fork()Start parallel branches
.branch(task_fn)Add a parallel branch (after .fork())
.join(merge_fn)Merge parallel branches
.route(key_fn, keys=[...])Conditional routing
.loop(task_fn, max_iterations=10)Loop with max iterations
.delay(name, duration)Delay execution
.wait_for_signal(signal, timeout=None)Wait for external signal
.build()Build to executable workflow
@task(timeout="5m", retries=2, tags=["python", "ai"], name="my_task")
async def my_task(input: dict) -> dict:
# Process input, return updated dict
return {**input, "result": "done"}
ParameterTypeDescription
timeoutstrMax execution time (e.g., "5m", "30s", "1h")
retriesintRetry count on failure
tagslist[str]Tags for filtering/routing
namestrOverride task name

Tasks can declare Pydantic input and output models instead of bare dict. When you do, Fabric automatically derives JSON Schemas for the task’s contract at server boot and stores them in the workflow registry — external consumers can then discover what to pass in without reading your source code. See the Typed Workflows guide for the full pattern and trade-offs.

from pydantic import BaseModel, Field
from fabric_workflow_sdk import Flow, task
class TrendsInput(BaseModel):
topic: str = Field(description="The niche to research")
max_results: int = Field(default=10, ge=1, le=100)
class TrendsOutput(BaseModel):
items: list[str]
summary: str
@task
async def gather_trends(input: TrendsInput) -> TrendsOutput:
"""Search the web for trending topics."""
return TrendsOutput(items=[], summary="")
trend_research = Flow("research/trends").then(gather_trends).build()

Verify locally with fab-workflow --lint:

Terminal window
$ fab-workflow --lint research/trends
research/trends
description: Trend research workflow
input: TrendsInput
output: TrendsOutput
tasks:
1. gather_trends in: TrendsInput out: TrendsOutput
warnings: (none)

The lint command exits non-zero on warnings so you can add it to CI as a type-coverage gate.

from fabric_workflow_sdk import generate, generate_json
# Simple text generation
response = generate(input, "Explain quantum computing simply")
# With model override
response = generate(input, "Write a poem", model="gemini-2.5-flash")
# JSON generation (auto-parses, strips markdown fences)
data = generate_json(input, "Return a JSON list of 5 topics", default=[])
FunctionReturnsDescription
generate(input, prompt, *, model=None)strText generation via Gemini or local LLM
generate_json(input, prompt, *, model=None, default=None)AnyGenerate + parse JSON, returns default on parse failure
parse_json(text, *, default=None)AnyStrip markdown fences and parse JSON
get_client(input, *, required=True)genai.Client | NoneGet Gemini client instance
from fabric_workflow_sdk import generate_image
# Returns path to downloaded .png file
path = await generate_image(
input,
"A sunset over mountains, oil painting style",
aspect_ratio="16:9",
)
async def generate_image(
input: dict,
prompt: str,
*,
model: str | None = None, # resolved via get_model(input, "image_fast")
aspect_ratio: str = "3:4", # "9:16", "16:9", "1:1", "3:4"
) -> str: # returns file path

Auto-routes between Gemini Imagen (remote) and local models (SDXL, Flux, SD3.5) based on the resolved model.

from fabric_workflow_sdk import generate_video
path = await generate_video(
input,
"A cinematic ocean wave crashing on rocks",
)
async def generate_video(
input: dict,
prompt: str,
*,
model: str | None = None,
timeout_secs: int = 300,
poll_interval_secs: int = 5,
) -> str: # returns file path
from fabric_workflow_sdk import get_model
model = get_model(input, "text") # "gemini-2.5-flash"
model = get_model(input, "broll") # depends on quality profile
model = get_model(input, "keyframe_grid") # "skip" or image model
# With fallback for custom operations
model = get_model(input, "my_op", fallback="gemini-2.5-flash")

Resolution order (first match wins):

  1. Per-run input: --input text_model="..."
  2. Environment: FABRIC_TEXT_MODEL=...
  3. Project config: ./models.yaml
  4. Global config: ~/.fabric/models.yaml
  5. Quality profile: --input quality=local
  6. Built-in defaults
from fabric_workflow_sdk import has_llm, require_model
# Check if any LLM is available
if has_llm(input):
text = generate(input, prompt)
# Get model or raise if "skip"
model = require_model(input, "tts", label="Text-to-speech")

See Model Configuration for the full list of operations, defaults, and quality profiles.

from fabric_workflow_sdk import search_web, format_web_results
# Web search (Exa API → DuckDuckGo fallback)
results = await search_web("Rust vs Go performance", num=10, exa_api_key="...")
# [{"title": "...", "url": "...", "snippet": "...", "score": 0.95}, ...]
# Format as markdown
markdown = format_web_results(results)
FunctionDescription
search_web(query, num=5, *, exa_api_key=None)Search web, returns list of results
format_web_results(results, web_content=None)Format web results as markdown
format_youtube_results(results)Format YouTube results as markdown
format_reddit_results(results)Format Reddit posts as markdown
format_rss_results(results)Format RSS entries as markdown
format_all_sources(raw_sources, web_content=None)Format all sources into markdown
build_research_context(input)Extract research context from synthesis data
from fabric_workflow_sdk import save_temp, make_temp_path
# Write bytes to temp file (NOT auto-deleted)
path = save_temp(video_bytes, suffix=".mp4", prefix="fabric_")
# Create empty temp file, get path
path = make_temp_path(suffix=".png")
from fabric_workflow_sdk import get_video_inputs
# Extract video workflow inputs with defaults
params = get_video_inputs(input)
# {
# "topic": "...",
# "hook": "",
# "mood": "high-energy and conversational",
# "platform": "TikTok",
# "duration_secs": 45,
# "presenter_look": "confident young creator...",
# "visual_style": "",
# }
from fabric_workflow_sdk import pick_best_hook, resolve_mood, build_hook_input
# Pick highest-strength hook
best = pick_best_hook(hook_ideas)
# {"hook_text": "...", "estimated_strength": 0.91, ...}
# Map tone to descriptive mood
mood = resolve_mood("edgy") # "edgy and provocative"
# Build input dict for hook generation workflow
hook_input = build_hook_input(
input,
niche="AI productivity",
num_hooks=15,
platform="TikTok",
)
from fabric_workflow_sdk import log_cost, get_cost_summary
# Log an API call cost
log_cost(
"image_generation",
"imagen-4.0-fast",
units=1,
unit_label="image",
estimated_usd=0.02,
)
# Log token-based cost
log_cost(
"text_generation",
"gemini-2.5-flash",
input_tokens=500,
output_tokens=1200,
estimated_usd=0.001,
)
# Get accumulated summary
summary = get_cost_summary()
# {"entries": [...], "total_estimated_usd": 0.05, "num_calls": 12}

The SDK provides three layers of output validation: declarative gates, custom function gates, and a video-specific output gate. When a gate fails it raises GateError, which Sayiir treats as a task failure — meaning upstream @task(retries=N) policies still apply, giving you automatic retry-on-bad-output for free.

Assert conditions on task output using JSON-path expressions:

from fabric_workflow_sdk import Flow, gate
pipeline = (
Flow("my/pipeline")
.then(generate_content)
.then(gate(
"content-quality",
assertions=[
("generated_content.content", "is_not_empty"),
("generated_content.word_count", "gte", 100),
],
))
.then(next_step)
.build()
)

Supported operators: is_not_empty, eq, neq, gt, gte, lt, lte, contains, matches_regex, is_type

For validation logic that doesn’t fit a simple assertion:

from fabric_workflow_sdk import Flow, gate_fn
pipeline = (
Flow("my/pipeline")
.then(generate_script)
.then(gate_fn("script-segments", lambda out: (
len(out.get("script", {}).get("segments", [])) >= 3,
"Script must have at least 3 segments",
)))
.build()
)

video_output_gate() probes a rendered video file with ffprobe and asserts it meets minimum quality criteria. This catches broken renders — truncated files, missing audio, wrong resolution — before delivery.

from fabric_workflow_sdk import Flow, video_output_gate
pipeline = (
Flow("my/video-pipeline")
.then(render_video)
.then(video_output_gate(
"my-output-check",
require_audio=True,
min_resolution=(720, 1280),
min_duration=5.0,
max_duration=120.0,
))
.then(deliver)
.build()
)
ParameterTypeDefaultDescription
namestr"video-output"Gate name (appears in error messages)
path_keystr | NoneNoneInput key holding the video path. If None, tries common keys: output_path, composed_path, hooked_path, effects_path, outro_path, subtitled_path, media_path
min_durationfloat | NoneNoneMinimum duration in seconds
max_durationfloat | NoneNoneMaximum duration in seconds
expected_durationfloat | NoneNoneExpected duration — fails if actual deviates beyond duration_tolerance
duration_tolerancefloat0.05Tolerance as a fraction (5% default)
min_resolutiontuple[int, int] | None(720, 1280)Minimum (width, height)
require_audioboolTrueRequire at least one audio track
min_filesize_bytesint10000Reject files smaller than this (likely corrupt)

On success, the gate passes through the input unchanged and adds a _video_validation dict with the probed metadata (duration, width, height, fps, has_audio, codec, filesize).

The underlying probe function is also available directly:

from fabric_workflow_sdk.media.ffmpeg import probe_video
info = probe_video("/path/to/video.mp4")
# {
# "duration": 45.2,
# "width": 1080,
# "height": 1920,
# "fps": 30.0,
# "has_audio": True,
# "codec": "h264",
# "filesize": 12345678,
# }

For LLM-evaluated quality (e.g., content scoring), use quality_contract:

from fabric_workflow_sdk import quality_contract, generate_json
@task(retries=2)
async def evaluate_quality(input: dict) -> dict:
evaluation = generate_json(input, "Rate this content 1-10...", default={})
return quality_contract(
input,
score=evaluation.get("overall_score", 0),
min_score=6.0,
details=evaluation,
)

When the score falls below min_score, QualityBelowThreshold is raised — triggering retries on the upstream generation step.

from fabric_workflow_sdk import run_sync, http_client
# Run blocking function in thread pool
result = await run_sync(heavy_cpu_function, arg1, arg2)
# Async HTTP client with defaults
async with http_client(timeout=60) as client:
resp = await client.get("https://api.example.com/data")
from fabric_workflow_sdk import fabric_api_url, fabric_api_headers
url = fabric_api_url(input) # "https://gofabric.dev"
headers = fabric_api_headers(input) # {"Authorization": "Bearer ...", ...}
from fabric_workflow_sdk import log
log.step("Generating script", "Using gemini-2.5-flash")
log.result("Script generated", count=7) # "7 segments"
log.progress(3, 10, "Processing segments")
log.warn("Model not available, using fallback")
log.error("Generation failed after 3 retries")
log.section("Phase 2: Video Generation")
from fabric_workflow_sdk import (
DEFAULT_MOOD, # "high-energy and conversational"
DEFAULT_PLATFORM, # "TikTok"
DEFAULT_DURATION, # 45
DEFAULT_PRESENTER, # "confident young creator, casual style, natural lighting"
DEFAULT_LANGUAGE, # "en"
MOOD_MAP, # {"edgy": "edgy and provocative", ...}
)

The SDK includes local video and image generation. See Local Video & Image Models for full details.

from fabric_workflow_sdk._local_video import (
generate_video, # Local video generation
generate_image, # Local image generation
generate_talking_head, # Portrait + audio → talking head
lipsync_video, # Lip-sync existing video
is_available, # Check if any local backend exists
)

The SDK includes the 2x2 grid keyframe system:

from fabric_workflow_sdk._keyframe_grid import (
generate_keyframe_grid, # Full orchestration
select_grid_segments, # Pick segments for grid
build_grid_prompt, # Construct grid prompt
crop_grid_to_keyframes, # Crop grid into PNGs
)

Reusable building blocks for video, audio, and content pipelines. Each stage is a plain async function — independently callable from any workflow, CLI script, or test.

Stages are optional — workflows can also call APIs directly without using stages.

ModuleFunctionsDescription
stages.scriptgenerate_script, build_script_prompt, validate_scriptViral script generation with research grounding
stages.voiceovergenerate_voiceover, resolve_voice, infer_genderMulti-provider TTS (ElevenLabs, Kokoro, local)
stages.brollgenerate_all_broll, ken_burns_from_stillB-roll generation (AI, stock, Ken Burns fallback)
stages.musicgenerate_music, detect_speech_regions, build_duck_filterBGM generation with speech-aware ducking
stages.captionstranscribe_audio, generate_ass_subtitles, generate_srt_subtitles, burn_subtitlesWhisper transcription + karaoke subtitles
stages.audiomix_audio, remix_duckedAudio mixing with dynamic ducking
stages.avatargenerate_ai_actor, generate_talking_heads, lipsync_talking_headsAI avatars with parallel generation
stages.composecompose_timeline, burn_hook_overlay, collect_final_outputVideo assembly + post-processing
stages.hooksgenerate_hooks, gather_competitor_insights, gather_trend_signalsData-driven hook generation (70/20/10 strategy)
gatesgate, gate_fn, video_output_gateDeclarative, functional, and video-specific validation gates
qualityquality_contractLLM-evaluated quality enforcement with retry
media.ffmpegprobe_video, probe_duration, probe_fpsVideo file probing via ffprobe/GStreamer
from fabric_workflow_sdk import Flow, task
from fabric_workflow_sdk.stages.voiceover import generate_voiceover
from fabric_workflow_sdk.stages.captions import transcribe_audio
@task(timeout="5m", tags=["python", "ai"])
async def voiceover_stage(input: dict) -> dict:
return await generate_voiceover(input)
@task(timeout="10m", tags=["python", "media"])
async def transcribe_stage(input: dict) -> dict:
return await transcribe_audio(input)
pipeline = (
Flow("custom/voice-pipeline")
.then(voiceover_stage)
.then(transcribe_stage)
.build()
)

Pass provider API keys per-request:

result = await generate_voiceover({
"script": {"full_narration": "Hello world"},
"keys": {
"elevenlabs_api_key": "your-key-here",
},
})

Every AI stage supports PromptExtension for customizing prompts without forking code:

result = await generate_script({
"topic": "AI trends",
"prompts": {
"system_prompt_append": "Brand voice: professional but approachable.",
"user_prompt_append": "Include mention of our product DataFlow.",
},
})

Add new models at runtime for frontier/open-source model support:

from fabric_workflow_sdk import register_local_models
register_local_models({
"tts": {"my-voice-v2": {"provider": "kokoro", "voice": "custom"}},
"text": {"llama4:8b": {"repo": "meta-llama/Llama-4-GGUF"}},
})

Stages with ProviderChain automatically try alternatives on failure:

ElevenLabs → FAL Kokoro → Edge TTS (voiceover)
FAL Stable Audio → MusicGen local → skip (music)
AI generation → stock footage → Ken Burns placeholder (b-roll)

Each stage returns StageExecutionMeta reporting what model was actually used and whether a fallback engaged.