Python Workflow SDK
The Python Workflow SDK (fabric-workflow-sdk) is the toolkit for building Fabric workflows. It provides the Flow DSL for composing tasks into pipelines, AI generation helpers (text, image, video), model routing, web research, cost tracking, and file utilities.
Installation
Section titled “Installation”pip install fabric-workflow-sdk
# With local model supportpip install "fabric-workflow-sdk[local-all]"Requirements: Python >= 3.11
Core dependencies: sayiir (workflow DSL), httpx, google-genai, python-dotenv, pydantic
Quick Start
Section titled “Quick Start”from fabric_workflow_sdk import Flow, task, generate, generate_json, get_model
@task(timeout="5m", retries=2)async def research(input: dict) -> dict: from fabric_workflow_sdk import search_web results = await search_web(input["topic"], num=10) return {**input, "results": results}
@task(timeout="3m")async def summarize(input: dict) -> dict: summary = generate(input, f"Summarize: {input['results']}") return {**input, "summary": summary}
pipeline = ( Flow("my/research-pipeline") .then(research) .then(summarize) .build())Workflow DSL
Section titled “Workflow DSL”Flow builds directed acyclic graphs of tasks. Chain tasks with .then(), fork into parallel branches, loop, delay, or compose child workflows.
from fabric_workflow_sdk import Flow, task
pipeline = ( Flow("my/pipeline") .then(step_one) .then(step_two) .fork() .branch(parallel_a) .branch(parallel_b) .branch(parallel_c) .join(merge_results) .then(final_step) .build())Methods
Section titled “Methods”| Method | Description |
|---|---|
.then(task_fn) | Chain a task sequentially |
.then_flow(child) | Chain a child workflow |
.fork() | Start parallel branches |
.branch(task_fn) | Add a parallel branch (after .fork()) |
.join(merge_fn) | Merge parallel branches |
.route(key_fn, keys=[...]) | Conditional routing |
.loop(task_fn, max_iterations=10) | Loop with max iterations |
.delay(name, duration) | Delay execution |
.wait_for_signal(signal, timeout=None) | Wait for external signal |
.build() | Build to executable workflow |
task Decorator
Section titled “task Decorator”@task(timeout="5m", retries=2, tags=["python", "ai"], name="my_task")async def my_task(input: dict) -> dict: # Process input, return updated dict return {**input, "result": "done"}| Parameter | Type | Description |
|---|---|---|
timeout | str | Max execution time (e.g., "5m", "30s", "1h") |
retries | int | Retry count on failure |
tags | list[str] | Tags for filtering/routing |
name | str | Override task name |
AI Generation
Section titled “AI Generation”Text Generation
Section titled “Text Generation”from fabric_workflow_sdk import generate, generate_json
# Simple text generationresponse = generate(input, "Explain quantum computing simply")
# With model overrideresponse = generate(input, "Write a poem", model="gemini-2.5-flash")
# JSON generation (auto-parses, strips markdown fences)data = generate_json(input, "Return a JSON list of 5 topics", default=[])| Function | Returns | Description |
|---|---|---|
generate(input, prompt, *, model=None) | str | Text generation via Gemini or local LLM |
generate_json(input, prompt, *, model=None, default=None) | Any | Generate + parse JSON, returns default on parse failure |
parse_json(text, *, default=None) | Any | Strip markdown fences and parse JSON |
get_client(input, *, required=True) | genai.Client | None | Get Gemini client instance |
Image Generation
Section titled “Image Generation”from fabric_workflow_sdk import generate_image
# Returns path to downloaded .png filepath = await generate_image( input, "A sunset over mountains, oil painting style", aspect_ratio="16:9",)async def generate_image( input: dict, prompt: str, *, model: str | None = None, # resolved via get_model(input, "image_fast") aspect_ratio: str = "3:4", # "9:16", "16:9", "1:1", "3:4") -> str: # returns file pathAuto-routes between Gemini Imagen (remote) and local models (SDXL, Flux, SD3.5) based on the resolved model.
Video Generation
Section titled “Video Generation”from fabric_workflow_sdk import generate_video
path = await generate_video( input, "A cinematic ocean wave crashing on rocks",)async def generate_video( input: dict, prompt: str, *, model: str | None = None, timeout_secs: int = 300, poll_interval_secs: int = 5,) -> str: # returns file pathModel Resolution
Section titled “Model Resolution”from fabric_workflow_sdk import get_model
model = get_model(input, "text") # "gemini-2.5-flash"model = get_model(input, "broll") # depends on quality profilemodel = get_model(input, "keyframe_grid") # "skip" or image model
# With fallback for custom operationsmodel = get_model(input, "my_op", fallback="gemini-2.5-flash")Resolution order (first match wins):
- Per-run input:
--input text_model="..." - Environment:
FABRIC_TEXT_MODEL=... - Project config:
./models.yaml - Global config:
~/.fabric/models.yaml - Quality profile:
--input quality=local - Built-in defaults
Model Helpers
Section titled “Model Helpers”from fabric_workflow_sdk import has_llm, require_model
# Check if any LLM is availableif has_llm(input): text = generate(input, prompt)
# Get model or raise if "skip"model = require_model(input, "tts", label="Text-to-speech")See Model Configuration for the full list of operations, defaults, and quality profiles.
Research Helpers
Section titled “Research Helpers”from fabric_workflow_sdk import search_web, format_web_results
# Web search (Exa API → DuckDuckGo fallback)results = await search_web("Rust vs Go performance", num=10, exa_api_key="...")# [{"title": "...", "url": "...", "snippet": "...", "score": 0.95}, ...]
# Format as markdownmarkdown = format_web_results(results)| Function | Description |
|---|---|
search_web(query, num=5, *, exa_api_key=None) | Search web, returns list of results |
format_web_results(results, web_content=None) | Format web results as markdown |
format_youtube_results(results) | Format YouTube results as markdown |
format_reddit_results(results) | Format Reddit posts as markdown |
format_rss_results(results) | Format RSS entries as markdown |
format_all_sources(raw_sources, web_content=None) | Format all sources into markdown |
build_research_context(input) | Extract research context from synthesis data |
File Helpers
Section titled “File Helpers”from fabric_workflow_sdk import save_temp, make_temp_path
# Write bytes to temp file (NOT auto-deleted)path = save_temp(video_bytes, suffix=".mp4", prefix="fabric_")
# Create empty temp file, get pathpath = make_temp_path(suffix=".png")Input Extraction
Section titled “Input Extraction”from fabric_workflow_sdk import get_video_inputs
# Extract video workflow inputs with defaultsparams = get_video_inputs(input)# {# "topic": "...",# "hook": "",# "mood": "high-energy and conversational",# "platform": "TikTok",# "duration_secs": 45,# "presenter_look": "confident young creator...",# "visual_style": "",# }Hook Helpers
Section titled “Hook Helpers”from fabric_workflow_sdk import pick_best_hook, resolve_mood, build_hook_input
# Pick highest-strength hookbest = pick_best_hook(hook_ideas)# {"hook_text": "...", "estimated_strength": 0.91, ...}
# Map tone to descriptive moodmood = resolve_mood("edgy") # "edgy and provocative"
# Build input dict for hook generation workflowhook_input = build_hook_input( input, niche="AI productivity", num_hooks=15, platform="TikTok",)Cost Tracking
Section titled “Cost Tracking”from fabric_workflow_sdk import log_cost, get_cost_summary
# Log an API call costlog_cost( "image_generation", "imagen-4.0-fast", units=1, unit_label="image", estimated_usd=0.02,)
# Log token-based costlog_cost( "text_generation", "gemini-2.5-flash", input_tokens=500, output_tokens=1200, estimated_usd=0.001,)
# Get accumulated summarysummary = get_cost_summary()# {"entries": [...], "total_estimated_usd": 0.05, "num_calls": 12}Async Helpers
Section titled “Async Helpers”from fabric_workflow_sdk import run_sync, http_client
# Run blocking function in thread poolresult = await run_sync(heavy_cpu_function, arg1, arg2)
# Async HTTP client with defaultsasync with http_client(timeout=60) as client: resp = await client.get("https://api.example.com/data")Fabric API Helpers
Section titled “Fabric API Helpers”from fabric_workflow_sdk import fabric_api_url, fabric_api_headers
url = fabric_api_url(input) # "http://localhost:3001"headers = fabric_api_headers(input) # {"Authorization": "Bearer ...", ...}Logging
Section titled “Logging”from fabric_workflow_sdk import log
log.step("Generating script", "Using gemini-2.5-flash")log.result("Script generated", count=7) # "7 segments"log.progress(3, 10, "Processing segments")log.warn("Model not available, using fallback")log.error("Generation failed after 3 retries")log.section("Phase 2: Video Generation")Constants
Section titled “Constants”from fabric_workflow_sdk import ( DEFAULT_MOOD, # "high-energy and conversational" DEFAULT_PLATFORM, # "TikTok" DEFAULT_DURATION, # 45 DEFAULT_PRESENTER, # "confident young creator, casual style, natural lighting" DEFAULT_LANGUAGE, # "en" MOOD_MAP, # {"edgy": "edgy and provocative", ...})Local Generation
Section titled “Local Generation”The SDK includes local video and image generation. See Local Video & Image Models for full details.
from fabric_workflow_sdk._local_video import ( generate_video, # Local video generation generate_image, # Local image generation generate_talking_head, # Portrait + audio → talking head lipsync_video, # Lip-sync existing video is_available, # Check if any local backend exists)Keyframe Grid
Section titled “Keyframe Grid”The SDK includes the 2x2 grid keyframe system:
from fabric_workflow_sdk._keyframe_grid import ( generate_keyframe_grid, # Full orchestration select_grid_segments, # Pick segments for grid build_grid_prompt, # Construct grid prompt crop_grid_to_keyframes, # Crop grid into PNGs)