Skip to content

Python Workflow SDK

The Python Workflow SDK (fabric-workflow-sdk) is the toolkit for building Fabric workflows. It provides the Flow DSL for composing tasks into pipelines, AI generation helpers (text, image, video), model routing, web research, cost tracking, and file utilities.

Terminal window
pip install fabric-workflow-sdk
# With local model support
pip install "fabric-workflow-sdk[local-all]"

Requirements: Python >= 3.11

Core dependencies: sayiir (workflow DSL), httpx, google-genai, python-dotenv, pydantic

from fabric_workflow_sdk import Flow, task, generate, generate_json, get_model
@task(timeout="5m", retries=2)
async def research(input: dict) -> dict:
from fabric_workflow_sdk import search_web
results = await search_web(input["topic"], num=10)
return {**input, "results": results}
@task(timeout="3m")
async def summarize(input: dict) -> dict:
summary = generate(input, f"Summarize: {input['results']}")
return {**input, "summary": summary}
pipeline = (
Flow("my/research-pipeline")
.then(research)
.then(summarize)
.build()
)

Flow builds directed acyclic graphs of tasks. Chain tasks with .then(), fork into parallel branches, loop, delay, or compose child workflows.

from fabric_workflow_sdk import Flow, task
pipeline = (
Flow("my/pipeline")
.then(step_one)
.then(step_two)
.fork()
.branch(parallel_a)
.branch(parallel_b)
.branch(parallel_c)
.join(merge_results)
.then(final_step)
.build()
)
MethodDescription
.then(task_fn)Chain a task sequentially
.then_flow(child)Chain a child workflow
.fork()Start parallel branches
.branch(task_fn)Add a parallel branch (after .fork())
.join(merge_fn)Merge parallel branches
.route(key_fn, keys=[...])Conditional routing
.loop(task_fn, max_iterations=10)Loop with max iterations
.delay(name, duration)Delay execution
.wait_for_signal(signal, timeout=None)Wait for external signal
.build()Build to executable workflow
@task(timeout="5m", retries=2, tags=["python", "ai"], name="my_task")
async def my_task(input: dict) -> dict:
# Process input, return updated dict
return {**input, "result": "done"}
ParameterTypeDescription
timeoutstrMax execution time (e.g., "5m", "30s", "1h")
retriesintRetry count on failure
tagslist[str]Tags for filtering/routing
namestrOverride task name
from fabric_workflow_sdk import generate, generate_json
# Simple text generation
response = generate(input, "Explain quantum computing simply")
# With model override
response = generate(input, "Write a poem", model="gemini-2.5-flash")
# JSON generation (auto-parses, strips markdown fences)
data = generate_json(input, "Return a JSON list of 5 topics", default=[])
FunctionReturnsDescription
generate(input, prompt, *, model=None)strText generation via Gemini or local LLM
generate_json(input, prompt, *, model=None, default=None)AnyGenerate + parse JSON, returns default on parse failure
parse_json(text, *, default=None)AnyStrip markdown fences and parse JSON
get_client(input, *, required=True)genai.Client | NoneGet Gemini client instance
from fabric_workflow_sdk import generate_image
# Returns path to downloaded .png file
path = await generate_image(
input,
"A sunset over mountains, oil painting style",
aspect_ratio="16:9",
)
async def generate_image(
input: dict,
prompt: str,
*,
model: str | None = None, # resolved via get_model(input, "image_fast")
aspect_ratio: str = "3:4", # "9:16", "16:9", "1:1", "3:4"
) -> str: # returns file path

Auto-routes between Gemini Imagen (remote) and local models (SDXL, Flux, SD3.5) based on the resolved model.

from fabric_workflow_sdk import generate_video
path = await generate_video(
input,
"A cinematic ocean wave crashing on rocks",
)
async def generate_video(
input: dict,
prompt: str,
*,
model: str | None = None,
timeout_secs: int = 300,
poll_interval_secs: int = 5,
) -> str: # returns file path
from fabric_workflow_sdk import get_model
model = get_model(input, "text") # "gemini-2.5-flash"
model = get_model(input, "broll") # depends on quality profile
model = get_model(input, "keyframe_grid") # "skip" or image model
# With fallback for custom operations
model = get_model(input, "my_op", fallback="gemini-2.5-flash")

Resolution order (first match wins):

  1. Per-run input: --input text_model="..."
  2. Environment: FABRIC_TEXT_MODEL=...
  3. Project config: ./models.yaml
  4. Global config: ~/.fabric/models.yaml
  5. Quality profile: --input quality=local
  6. Built-in defaults
from fabric_workflow_sdk import has_llm, require_model
# Check if any LLM is available
if has_llm(input):
text = generate(input, prompt)
# Get model or raise if "skip"
model = require_model(input, "tts", label="Text-to-speech")

See Model Configuration for the full list of operations, defaults, and quality profiles.

from fabric_workflow_sdk import search_web, format_web_results
# Web search (Exa API → DuckDuckGo fallback)
results = await search_web("Rust vs Go performance", num=10, exa_api_key="...")
# [{"title": "...", "url": "...", "snippet": "...", "score": 0.95}, ...]
# Format as markdown
markdown = format_web_results(results)
FunctionDescription
search_web(query, num=5, *, exa_api_key=None)Search web, returns list of results
format_web_results(results, web_content=None)Format web results as markdown
format_youtube_results(results)Format YouTube results as markdown
format_reddit_results(results)Format Reddit posts as markdown
format_rss_results(results)Format RSS entries as markdown
format_all_sources(raw_sources, web_content=None)Format all sources into markdown
build_research_context(input)Extract research context from synthesis data
from fabric_workflow_sdk import save_temp, make_temp_path
# Write bytes to temp file (NOT auto-deleted)
path = save_temp(video_bytes, suffix=".mp4", prefix="fabric_")
# Create empty temp file, get path
path = make_temp_path(suffix=".png")
from fabric_workflow_sdk import get_video_inputs
# Extract video workflow inputs with defaults
params = get_video_inputs(input)
# {
# "topic": "...",
# "hook": "",
# "mood": "high-energy and conversational",
# "platform": "TikTok",
# "duration_secs": 45,
# "presenter_look": "confident young creator...",
# "visual_style": "",
# }
from fabric_workflow_sdk import pick_best_hook, resolve_mood, build_hook_input
# Pick highest-strength hook
best = pick_best_hook(hook_ideas)
# {"hook_text": "...", "estimated_strength": 0.91, ...}
# Map tone to descriptive mood
mood = resolve_mood("edgy") # "edgy and provocative"
# Build input dict for hook generation workflow
hook_input = build_hook_input(
input,
niche="AI productivity",
num_hooks=15,
platform="TikTok",
)
from fabric_workflow_sdk import log_cost, get_cost_summary
# Log an API call cost
log_cost(
"image_generation",
"imagen-4.0-fast",
units=1,
unit_label="image",
estimated_usd=0.02,
)
# Log token-based cost
log_cost(
"text_generation",
"gemini-2.5-flash",
input_tokens=500,
output_tokens=1200,
estimated_usd=0.001,
)
# Get accumulated summary
summary = get_cost_summary()
# {"entries": [...], "total_estimated_usd": 0.05, "num_calls": 12}
from fabric_workflow_sdk import run_sync, http_client
# Run blocking function in thread pool
result = await run_sync(heavy_cpu_function, arg1, arg2)
# Async HTTP client with defaults
async with http_client(timeout=60) as client:
resp = await client.get("https://api.example.com/data")
from fabric_workflow_sdk import fabric_api_url, fabric_api_headers
url = fabric_api_url(input) # "http://localhost:3001"
headers = fabric_api_headers(input) # {"Authorization": "Bearer ...", ...}
from fabric_workflow_sdk import log
log.step("Generating script", "Using gemini-2.5-flash")
log.result("Script generated", count=7) # "7 segments"
log.progress(3, 10, "Processing segments")
log.warn("Model not available, using fallback")
log.error("Generation failed after 3 retries")
log.section("Phase 2: Video Generation")
from fabric_workflow_sdk import (
DEFAULT_MOOD, # "high-energy and conversational"
DEFAULT_PLATFORM, # "TikTok"
DEFAULT_DURATION, # 45
DEFAULT_PRESENTER, # "confident young creator, casual style, natural lighting"
DEFAULT_LANGUAGE, # "en"
MOOD_MAP, # {"edgy": "edgy and provocative", ...}
)

The SDK includes local video and image generation. See Local Video & Image Models for full details.

from fabric_workflow_sdk._local_video import (
generate_video, # Local video generation
generate_image, # Local image generation
generate_talking_head, # Portrait + audio → talking head
lipsync_video, # Lip-sync existing video
is_available, # Check if any local backend exists
)

The SDK includes the 2x2 grid keyframe system:

from fabric_workflow_sdk._keyframe_grid import (
generate_keyframe_grid, # Full orchestration
select_grid_segments, # Pick segments for grid
build_grid_prompt, # Construct grid prompt
crop_grid_to_keyframes, # Crop grid into PNGs
)