Python Workflow SDK
The Python Workflow SDK (fabric-workflow-sdk) is the toolkit for building Fabric workflows. It provides the Flow DSL for composing tasks into pipelines, AI generation helpers (text, image, video), model routing, web research, cost tracking, and file utilities.
Installation
Section titled “Installation”pip install fabric-workflow-sdk
# With local model supportpip install "fabric-workflow-sdk[local-all]"Requirements: Python >= 3.11
Core dependencies: sayiir (workflow DSL), httpx, google-genai, python-dotenv, pydantic
Quick Start
Section titled “Quick Start”from fabric_workflow_sdk import Flow, task, generate, generate_json, get_model
@task(timeout="5m", retries=2)async def research(input: dict) -> dict: from fabric_workflow_sdk import search_web results = await search_web(input["topic"], num=10) return {**input, "results": results}
@task(timeout="3m")async def summarize(input: dict) -> dict: summary = generate(input, f"Summarize: {input['results']}") return {**input, "summary": summary}
pipeline = ( Flow("my/research-pipeline") .then(research) .then(summarize) .build())Workflow DSL
Section titled “Workflow DSL”Flow builds directed acyclic graphs of tasks. Chain tasks with .then(), fork into parallel branches, loop, delay, or compose child workflows.
from fabric_workflow_sdk import Flow, task
pipeline = ( Flow("my/pipeline") .then(step_one) .then(step_two) .fork() .branch(parallel_a) .branch(parallel_b) .branch(parallel_c) .join(merge_results) .then(final_step) .build())Methods
Section titled “Methods”| Method | Description |
|---|---|
.then(task_fn) | Chain a task sequentially |
.then_flow(child) | Chain a child workflow |
.fork() | Start parallel branches |
.branch(task_fn) | Add a parallel branch (after .fork()) |
.join(merge_fn) | Merge parallel branches |
.route(key_fn, keys=[...]) | Conditional routing |
.loop(task_fn, max_iterations=10) | Loop with max iterations |
.delay(name, duration) | Delay execution |
.wait_for_signal(signal, timeout=None) | Wait for external signal |
.build() | Build to executable workflow |
task Decorator
Section titled “task Decorator”@task(timeout="5m", retries=2, tags=["python", "ai"], name="my_task")async def my_task(input: dict) -> dict: # Process input, return updated dict return {**input, "result": "done"}| Parameter | Type | Description |
|---|---|---|
timeout | str | Max execution time (e.g., "5m", "30s", "1h") |
retries | int | Retry count on failure |
tags | list[str] | Tags for filtering/routing |
name | str | Override task name |
Typed tasks with Pydantic (plan 038)
Section titled “Typed tasks with Pydantic (plan 038)”Tasks can declare Pydantic input and output models instead of bare
dict. When you do, Fabric automatically derives JSON Schemas for
the task’s contract at server boot and stores them in the workflow
registry — external consumers can then discover what to pass in
without reading your source code. See the
Typed Workflows guide for the full
pattern and trade-offs.
from pydantic import BaseModel, Fieldfrom fabric_workflow_sdk import Flow, task
class TrendsInput(BaseModel): topic: str = Field(description="The niche to research") max_results: int = Field(default=10, ge=1, le=100)
class TrendsOutput(BaseModel): items: list[str] summary: str
@taskasync def gather_trends(input: TrendsInput) -> TrendsOutput: """Search the web for trending topics.""" return TrendsOutput(items=[], summary="")
trend_research = Flow("research/trends").then(gather_trends).build()Verify locally with fab-workflow --lint:
$ fab-workflow --lint research/trendsresearch/trends description: Trend research workflow input: ✓ TrendsInput output: ✓ TrendsOutput tasks: 1. ✓ gather_trends in: TrendsInput out: TrendsOutput warnings: (none)The lint command exits non-zero on warnings so you can add it to CI as a type-coverage gate.
AI Generation
Section titled “AI Generation”Text Generation
Section titled “Text Generation”from fabric_workflow_sdk import generate, generate_json
# Simple text generationresponse = generate(input, "Explain quantum computing simply")
# With model overrideresponse = generate(input, "Write a poem", model="gemini-2.5-flash")
# JSON generation (auto-parses, strips markdown fences)data = generate_json(input, "Return a JSON list of 5 topics", default=[])| Function | Returns | Description |
|---|---|---|
generate(input, prompt, *, model=None) | str | Text generation via Gemini or local LLM |
generate_json(input, prompt, *, model=None, default=None) | Any | Generate + parse JSON, returns default on parse failure |
parse_json(text, *, default=None) | Any | Strip markdown fences and parse JSON |
get_client(input, *, required=True) | genai.Client | None | Get Gemini client instance |
Image Generation
Section titled “Image Generation”from fabric_workflow_sdk import generate_image
# Returns path to downloaded .png filepath = await generate_image( input, "A sunset over mountains, oil painting style", aspect_ratio="16:9",)async def generate_image( input: dict, prompt: str, *, model: str | None = None, # resolved via get_model(input, "image_fast") aspect_ratio: str = "3:4", # "9:16", "16:9", "1:1", "3:4") -> str: # returns file pathAuto-routes between Gemini Imagen (remote) and local models (SDXL, Flux, SD3.5) based on the resolved model.
Video Generation
Section titled “Video Generation”from fabric_workflow_sdk import generate_video
path = await generate_video( input, "A cinematic ocean wave crashing on rocks",)async def generate_video( input: dict, prompt: str, *, model: str | None = None, timeout_secs: int = 300, poll_interval_secs: int = 5,) -> str: # returns file pathModel Resolution
Section titled “Model Resolution”from fabric_workflow_sdk import get_model
model = get_model(input, "text") # "gemini-2.5-flash"model = get_model(input, "broll") # depends on quality profilemodel = get_model(input, "keyframe_grid") # "skip" or image model
# With fallback for custom operationsmodel = get_model(input, "my_op", fallback="gemini-2.5-flash")Resolution order (first match wins):
- Per-run input:
--input text_model="..." - Environment:
FABRIC_TEXT_MODEL=... - Project config:
./models.yaml - Global config:
~/.fabric/models.yaml - Quality profile:
--input quality=local - Built-in defaults
Model Helpers
Section titled “Model Helpers”from fabric_workflow_sdk import has_llm, require_model
# Check if any LLM is availableif has_llm(input): text = generate(input, prompt)
# Get model or raise if "skip"model = require_model(input, "tts", label="Text-to-speech")See Model Configuration for the full list of operations, defaults, and quality profiles.
Research Helpers
Section titled “Research Helpers”from fabric_workflow_sdk import search_web, format_web_results
# Web search (Exa API → DuckDuckGo fallback)results = await search_web("Rust vs Go performance", num=10, exa_api_key="...")# [{"title": "...", "url": "...", "snippet": "...", "score": 0.95}, ...]
# Format as markdownmarkdown = format_web_results(results)| Function | Description |
|---|---|
search_web(query, num=5, *, exa_api_key=None) | Search web, returns list of results |
format_web_results(results, web_content=None) | Format web results as markdown |
format_youtube_results(results) | Format YouTube results as markdown |
format_reddit_results(results) | Format Reddit posts as markdown |
format_rss_results(results) | Format RSS entries as markdown |
format_all_sources(raw_sources, web_content=None) | Format all sources into markdown |
build_research_context(input) | Extract research context from synthesis data |
File Helpers
Section titled “File Helpers”from fabric_workflow_sdk import save_temp, make_temp_path
# Write bytes to temp file (NOT auto-deleted)path = save_temp(video_bytes, suffix=".mp4", prefix="fabric_")
# Create empty temp file, get pathpath = make_temp_path(suffix=".png")Input Extraction
Section titled “Input Extraction”from fabric_workflow_sdk import get_video_inputs
# Extract video workflow inputs with defaultsparams = get_video_inputs(input)# {# "topic": "...",# "hook": "",# "mood": "high-energy and conversational",# "platform": "TikTok",# "duration_secs": 45,# "presenter_look": "confident young creator...",# "visual_style": "",# }Hook Helpers
Section titled “Hook Helpers”from fabric_workflow_sdk import pick_best_hook, resolve_mood, build_hook_input
# Pick highest-strength hookbest = pick_best_hook(hook_ideas)# {"hook_text": "...", "estimated_strength": 0.91, ...}
# Map tone to descriptive moodmood = resolve_mood("edgy") # "edgy and provocative"
# Build input dict for hook generation workflowhook_input = build_hook_input( input, niche="AI productivity", num_hooks=15, platform="TikTok",)Cost Tracking
Section titled “Cost Tracking”from fabric_workflow_sdk import log_cost, get_cost_summary
# Log an API call costlog_cost( "image_generation", "imagen-4.0-fast", units=1, unit_label="image", estimated_usd=0.02,)
# Log token-based costlog_cost( "text_generation", "gemini-2.5-flash", input_tokens=500, output_tokens=1200, estimated_usd=0.001,)
# Get accumulated summarysummary = get_cost_summary()# {"entries": [...], "total_estimated_usd": 0.05, "num_calls": 12}Quality & Validation
Section titled “Quality & Validation”The SDK provides three layers of output validation: declarative gates, custom function gates, and a video-specific output gate. When a gate fails it raises GateError, which Sayiir treats as a task failure — meaning upstream @task(retries=N) policies still apply, giving you automatic retry-on-bad-output for free.
Declarative Gates
Section titled “Declarative Gates”Assert conditions on task output using JSON-path expressions:
from fabric_workflow_sdk import Flow, gate
pipeline = ( Flow("my/pipeline") .then(generate_content) .then(gate( "content-quality", assertions=[ ("generated_content.content", "is_not_empty"), ("generated_content.word_count", "gte", 100), ], )) .then(next_step) .build())Supported operators: is_not_empty, eq, neq, gt, gte, lt, lte, contains, matches_regex, is_type
Custom Function Gates
Section titled “Custom Function Gates”For validation logic that doesn’t fit a simple assertion:
from fabric_workflow_sdk import Flow, gate_fn
pipeline = ( Flow("my/pipeline") .then(generate_script) .then(gate_fn("script-segments", lambda out: ( len(out.get("script", {}).get("segments", [])) >= 3, "Script must have at least 3 segments", ))) .build())Video Output Gate
Section titled “Video Output Gate”video_output_gate() probes a rendered video file with ffprobe and asserts it meets minimum quality criteria. This catches broken renders — truncated files, missing audio, wrong resolution — before delivery.
from fabric_workflow_sdk import Flow, video_output_gate
pipeline = ( Flow("my/video-pipeline") .then(render_video) .then(video_output_gate( "my-output-check", require_audio=True, min_resolution=(720, 1280), min_duration=5.0, max_duration=120.0, )) .then(deliver) .build())| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | "video-output" | Gate name (appears in error messages) |
path_key | str | None | None | Input key holding the video path. If None, tries common keys: output_path, composed_path, hooked_path, effects_path, outro_path, subtitled_path, media_path |
min_duration | float | None | None | Minimum duration in seconds |
max_duration | float | None | None | Maximum duration in seconds |
expected_duration | float | None | None | Expected duration — fails if actual deviates beyond duration_tolerance |
duration_tolerance | float | 0.05 | Tolerance as a fraction (5% default) |
min_resolution | tuple[int, int] | None | (720, 1280) | Minimum (width, height) |
require_audio | bool | True | Require at least one audio track |
min_filesize_bytes | int | 10000 | Reject files smaller than this (likely corrupt) |
On success, the gate passes through the input unchanged and adds a _video_validation dict with the probed metadata (duration, width, height, fps, has_audio, codec, filesize).
Video Probing
Section titled “Video Probing”The underlying probe function is also available directly:
from fabric_workflow_sdk.media.ffmpeg import probe_video
info = probe_video("/path/to/video.mp4")# {# "duration": 45.2,# "width": 1080,# "height": 1920,# "fps": 30.0,# "has_audio": True,# "codec": "h264",# "filesize": 12345678,# }Quality Contracts
Section titled “Quality Contracts”For LLM-evaluated quality (e.g., content scoring), use quality_contract:
from fabric_workflow_sdk import quality_contract, generate_json
@task(retries=2)async def evaluate_quality(input: dict) -> dict: evaluation = generate_json(input, "Rate this content 1-10...", default={}) return quality_contract( input, score=evaluation.get("overall_score", 0), min_score=6.0, details=evaluation, )When the score falls below min_score, QualityBelowThreshold is raised — triggering retries on the upstream generation step.
Async Helpers
Section titled “Async Helpers”from fabric_workflow_sdk import run_sync, http_client
# Run blocking function in thread poolresult = await run_sync(heavy_cpu_function, arg1, arg2)
# Async HTTP client with defaultsasync with http_client(timeout=60) as client: resp = await client.get("https://api.example.com/data")Fabric API Helpers
Section titled “Fabric API Helpers”from fabric_workflow_sdk import fabric_api_url, fabric_api_headers
url = fabric_api_url(input) # "https://gofabric.dev"headers = fabric_api_headers(input) # {"Authorization": "Bearer ...", ...}Logging
Section titled “Logging”from fabric_workflow_sdk import log
log.step("Generating script", "Using gemini-2.5-flash")log.result("Script generated", count=7) # "7 segments"log.progress(3, 10, "Processing segments")log.warn("Model not available, using fallback")log.error("Generation failed after 3 retries")log.section("Phase 2: Video Generation")Constants
Section titled “Constants”from fabric_workflow_sdk import ( DEFAULT_MOOD, # "high-energy and conversational" DEFAULT_PLATFORM, # "TikTok" DEFAULT_DURATION, # 45 DEFAULT_PRESENTER, # "confident young creator, casual style, natural lighting" DEFAULT_LANGUAGE, # "en" MOOD_MAP, # {"edgy": "edgy and provocative", ...})Local Generation
Section titled “Local Generation”The SDK includes local video and image generation. See Local Video & Image Models for full details.
from fabric_workflow_sdk._local_video import ( generate_video, # Local video generation generate_image, # Local image generation generate_talking_head, # Portrait + audio → talking head lipsync_video, # Lip-sync existing video is_available, # Check if any local backend exists)Keyframe Grid
Section titled “Keyframe Grid”The SDK includes the 2x2 grid keyframe system:
from fabric_workflow_sdk._keyframe_grid import ( generate_keyframe_grid, # Full orchestration select_grid_segments, # Pick segments for grid build_grid_prompt, # Construct grid prompt crop_grid_to_keyframes, # Crop grid into PNGs)Pipeline Stages (v0.2.0)
Section titled “Pipeline Stages (v0.2.0)”Reusable building blocks for video, audio, and content pipelines. Each stage is a plain async function — independently callable from any workflow, CLI script, or test.
Stages are optional — workflows can also call APIs directly without using stages.
Available Stages
Section titled “Available Stages”| Module | Functions | Description |
|---|---|---|
stages.script | generate_script, build_script_prompt, validate_script | Viral script generation with research grounding |
stages.voiceover | generate_voiceover, resolve_voice, infer_gender | Multi-provider TTS (ElevenLabs, Kokoro, local) |
stages.broll | generate_all_broll, ken_burns_from_still | B-roll generation (AI, stock, Ken Burns fallback) |
stages.music | generate_music, detect_speech_regions, build_duck_filter | BGM generation with speech-aware ducking |
stages.captions | transcribe_audio, generate_ass_subtitles, generate_srt_subtitles, burn_subtitles | Whisper transcription + karaoke subtitles |
stages.audio | mix_audio, remix_ducked | Audio mixing with dynamic ducking |
stages.avatar | generate_ai_actor, generate_talking_heads, lipsync_talking_heads | AI avatars with parallel generation |
stages.compose | compose_timeline, burn_hook_overlay, collect_final_output | Video assembly + post-processing |
stages.hooks | generate_hooks, gather_competitor_insights, gather_trend_signals | Data-driven hook generation (70/20/10 strategy) |
gates | gate, gate_fn, video_output_gate | Declarative, functional, and video-specific validation gates |
quality | quality_contract | LLM-evaluated quality enforcement with retry |
media.ffmpeg | probe_video, probe_duration, probe_fps | Video file probing via ffprobe/GStreamer |
Using Stages in Workflows
Section titled “Using Stages in Workflows”from fabric_workflow_sdk import Flow, taskfrom fabric_workflow_sdk.stages.voiceover import generate_voiceoverfrom fabric_workflow_sdk.stages.captions import transcribe_audio
@task(timeout="5m", tags=["python", "ai"])async def voiceover_stage(input: dict) -> dict: return await generate_voiceover(input)
@task(timeout="10m", tags=["python", "media"])async def transcribe_stage(input: dict) -> dict: return await transcribe_audio(input)
pipeline = ( Flow("custom/voice-pipeline") .then(voiceover_stage) .then(transcribe_stage) .build())BYOK (Bring Your Own Keys)
Section titled “BYOK (Bring Your Own Keys)”Pass provider API keys per-request:
result = await generate_voiceover({ "script": {"full_narration": "Hello world"}, "keys": { "elevenlabs_api_key": "your-key-here", },})Prompt Customization
Section titled “Prompt Customization”Every AI stage supports PromptExtension for customizing prompts without forking code:
result = await generate_script({ "topic": "AI trends", "prompts": { "system_prompt_append": "Brand voice: professional but approachable.", "user_prompt_append": "Include mention of our product DataFlow.", },})Runtime Model Registration
Section titled “Runtime Model Registration”Add new models at runtime for frontier/open-source model support:
from fabric_workflow_sdk import register_local_models
register_local_models({ "tts": {"my-voice-v2": {"provider": "kokoro", "voice": "custom"}}, "text": {"llama4:8b": {"repo": "meta-llama/Llama-4-GGUF"}},})Provider Fallback Chains
Section titled “Provider Fallback Chains”Stages with ProviderChain automatically try alternatives on failure:
ElevenLabs → FAL Kokoro → Edge TTS (voiceover)FAL Stable Audio → MusicGen local → skip (music)AI generation → stock footage → Ken Burns placeholder (b-roll)Each stage returns StageExecutionMeta reporting what model was actually used and whether a fallback engaged.