Skip to content

Python Platform SDK

The Python Platform SDK (fabric-platform) provides a typed HTTP client for the Fabric control plane API. It wraps every REST endpoint with Python methods and handles authentication, token refresh, and SSE streaming.

Terminal window
pip install fabric-platform

Requirements: Python >= 3.9, httpx >= 0.25

from fabric_platform import FabricClient
# Connect (defaults to gofabric.dev)
fabric = FabricClient()
# Or with API key + remote server
fabric = FabricClient(
base_url="https://fabric.example.com",
api_key="fab_live_abc123",
organization_id="org-uuid",
)
# Use as context manager
with FabricClient() as fabric:
status = fabric.health_check()
print(status)
FabricClient(
base_url: str = "https://gofabric.dev",
api_key: str | None = None,
principal_id: str | None = None,
organization_id: str = "00000000-0000-0000-0000-000000000010",
timeout: float = 30.0,
)
ParameterDescription
base_urlFabric API base URL
api_keyfab_* API key for authentication
principal_idUser/service account ID (alternative to api_key)
organization_idDefault org for scoped operations
timeoutHTTP request timeout in seconds
# Email/password
fabric.signup("user@example.com", "password123")
fabric.login("user@example.com", "password123") # auto-stores tokens
# Passwordless
fabric.magic_link("user@example.com")
# OAuth
url = fabric.social_login_url("google") # redirect user here
# Token management
fabric.refresh_token(refresh_token)
fabric.logout()
# Password recovery
fabric.forgot_password("user@example.com")
fabric.reset_password(recovery_token, "new_password")
# Create a job
job = fabric.create_job(
modality="text",
input={"prompt": "Summarize this article"},
idempotency_key="unique-key-123",
)
# Retrieve
job = fabric.get_job(job["id"])
jobs = fabric.list_jobs(limit=20, cursor=None)
# Usage
usage = fabric.get_job_usage(job["id"])
# Register a workflow definition
wf_id = fabric.upsert_workflow(
name="my-pipeline",
nodes=[{"type": "ai_generate", "config": {...}}],
defaults={"timeout": 300},
)
# List and retrieve
workflows = fabric.list_workflows()
wf = fabric.get_workflow(wf_id)
# Start a run
run_id = fabric.run_workflow(
wf_id,
context={"topic": "AI agents"},
priority=5,
)
# Poll until complete (blocks)
result = fabric.wait_for_run(run_id, poll_interval=1.0, timeout=300.0)
# Or check manually
run = fabric.get_run(run_id)
print(run["status"]) # "pending", "running", "completed", "failed"
# Manage runs
fabric.cancel_run(run_id)
fabric.bulk_cancel_runs([run_id_1, run_id_2])
log = fabric.run_log(run_id)
children = fabric.list_children(run_id)
# Resume a paused node
fabric.resume_node(run_id, "approval_step", data={"approved": True})

Set variants (1–10) to fan out N parallel runs of the same workflow with the same input. The SDK lifts the kwarg into input.variants (the wire-format home for variants). The returned RunOutput has one entry per variant in outputs:

result = fabric.workflows.runs.submit_and_get_output(
"video/ai_shorts",
input={"topic": "AI trends"},
variants=3,
)
assert len(result.outputs) == 3
for variant in result.outputs:
print(variant.variant_index, variant.kind, variant.output)
for a in variant.artifacts:
print("", a.filename, a.download_url)

A bundle runs N independent sub-workflows in parallel from a single submission. Each output entry carries its sub-workflow_name and (when the workflow declared one) kind:

result = fabric.workflows.runs.submit_bundle(
bundle=[
{"workflow": "video/ai_shorts", "input": {"topic": "AI trends"}},
{"workflow": "image/generate-post", "input": {"topic": "AI trends", "slides": 6}},
{"workflow": "content/threads-post", "input": {"topic": "AI trends"}},
],
)
for variant in result.outputs:
if variant.kind == "video":
...
elif variant.kind == "carousel":
...

bundle and variants are mutually exclusive — bundle is “N different workflows”, variants is “N copies of one”. The SDK raises ValueError if both are passed; the server returns 400.

Regenerate — Spawn a New Run as a Variation of an Existing One

Section titled “Regenerate — Spawn a New Run as a Variation of an Existing One”

Pass regenerate on submit to mark the run as a regeneration of an earlier run/variant. The server persists parent_run_id + parent_variant_index as run lineage:

result = fabric.workflows.runs.submit_and_get_output(
"video/ai_shorts",
input={"topic": "AI trends"},
regenerate={
"direction": "punchier", # or deeper / contrarian / visual / data-first / surprise
"keep": ["platform", "tone_of_voice"],
"extra_instructions": "shorter, tighter hooks",
"parent_run_id": prior_run["id"],
"parent_variant_index": 0,
},
)

direction is a prompt-level hint — workflows opt in to honor it. The lineage fields (parent_run_id, parent_variant_index) always persist regardless of which workflow you target; the prompt-modulating bits (direction, keep, extra_instructions) are read by stages via regenerate_directive(input.get("regenerate")) and prepended to the relevant LLM/image prompt.

Workflows that currently wire the directive into their prompts:

WorkflowWhat direction shifts
image/generateBoth the visual prompt and the overlay-text distillation — “punchier” yields a tighter hook + sharper composition, “contrarian” flips the angle, etc.

For workflows not on this list, direction is silently ignored — the run still happens, lineage is still recorded, but the output is whatever the unmodulated prompt produces. If a workflow you care about should honor it, the change is small (one regenerate_directive call in the relevant stage); see sdks/fabric-workflow-sdk/fabric_workflow_sdk/stages/thumbnail.py for the canonical pattern.

When kicking off a regen by replaying a previous run’s variant data, strip the output-shaped fields before submitting. Workflows declare their input contract narrowly (e.g. image/generate reads topic, hook, platform, thumbnail_text, thumbnail_style, num_thumbnails); fields like thumbnail_paths, thumbnail_asset_ids, selected_thumbnail, overlay_text, thumbnail_size, platform_exports, evaluation, video_asset_id are workflow outputs and should be filtered out of any replay. They survive validation as pass-through extras (so the run won’t fail), but they’re noise in storage and may surprise a future stage if it starts reading them.

RunOutput — Typed, Uniform Across Variants/Bundle/Single

Section titled “RunOutput — Typed, Uniform Across Variants/Bundle/Single”

get_output and submit_and_get_output return a typed RunOutput:

class RunOutput(BaseModel):
run_id: str
status: str
error: str | None
variants: int # equals len(outputs)
outputs: list[VariantOutput] # one entry per variant; default 1
@property
def first_output(self) -> Any | None: ... # outputs[0].output
@property
def all_artifacts(self) -> list[RunArtifact]: ... # flat across variants
class VariantOutput(BaseModel):
variant_index: int
workflow_name: str | None # sub-workflow that produced this entry
kind: Literal["video","carousel","image","text"] | None
output: Any | None
artifacts: list[RunArtifact]

Single-workflow runs have one entry; iterate result.outputs uniformly — no shape branching.

Fabric auto-derives JSON Schemas from each task’s Pydantic types at server boot (see the Typed Workflows guide for the full design). The Python SDK exposes the schemas via a dedicated method:

# Get the contract for a specific workflow — input schema, output
# schema, and per-task breakdown.
schemas = fabric.get_workflow_schemas("research/trends")
print(schemas["input_schema"]) # JSON Schema for workflow input
print(schemas["output_schema"]) # JSON Schema for workflow output
print(schemas["task_schemas"]) # Per-task array (excluding shim tasks)
print(schemas["warnings"]) # Any type-coverage warnings
# Workflows without Pydantic types come back with all-null schemas.
untyped = fabric.get_workflow_schemas("some/untyped-workflow")
assert untyped["input_schema"] is None

Opt in to JSON Schema validation of submission payloads with validate=True. The server compiles the workflow’s input_schema and rejects mismatched payloads with a structured 400 before enqueuing the run:

from fabric_platform.errors import FabricValidationError
try:
run = fabric.submit_run(
"research/trends",
input={"topic": 42}, # wrong type — topic should be a string
validate=True,
)
except FabricValidationError as e:
# The error carries the validation details on the response body.
body = e.response.json()
for err in body["error"]["details"]["errors"]:
print(f" {err['path']}: {err['message']}")

Validation is opt-in — workflows whose input_schema is null (no Pydantic types declared) skip validation gracefully, so opting in is safe against partially-typed workflows.

# Face swap — swap a persona face onto a source image
result = fabric.face_swap(
source_url="https://example.com/dance-still.jpg",
target_url="https://example.com/persona-face.png",
)
print(result["output"]["face_swap_url"])
# Or pull the persona from an org gallery
result = fabric.face_swap(
source_url="https://example.com/dance-still.jpg",
persona_gallery_id="gallery-uuid",
persona_index=0, # first persona in gallery
)
# Motion transfer — animate a persona with a reference video's motion
result = fabric.motion_transfer(
driving_video_url="https://example.com/dance-reference.mp4",
source_image_url="https://example.com/persona.png",
)
print(result["output"]["motion_transfer_url"])
# With Seedance for full-body motion (premium)
result = fabric.motion_transfer(
driving_video_url="https://example.com/choreography.mp4",
persona_gallery_id="gallery-uuid",
motion_model="seedance/2.0",
)
# Non-blocking
run = fabric.motion_transfer(
driving_video_url="https://example.com/dance.mp4",
source_image_url="https://example.com/persona.png",
wait=False,
)
# Poll later
result = fabric.wait_for_run(run["id"], timeout=600)
# Create cron schedule for a workflow
schedule = fabric.create_schedule(
workflow_definition_id=wf_id,
cron_expression="0 9 * * 1-5", # weekdays at 9am
)
# Manage
schedules = fabric.list_schedules(wf_id)
fabric.update_schedule(schedule["id"], cron_expression="0 8 * * *")
fabric.trigger_schedule(schedule["id"]) # manual trigger
history = fabric.schedule_history(schedule["id"])
fabric.delete_schedule(schedule["id"])
# Organizations
org = fabric.create_organization(slug="acme", name="Acme Corp")
orgs = fabric.list_organizations()
org = fabric.get_organization(org["id"])
members = fabric.list_org_members(org["id"])
# Teams
team = fabric.create_team(org["id"], slug="engineering", name="Engineering")
teams = fabric.list_org_teams(org["id"])
# Invitations (see Invitations & Notifications reference)
inv = fabric.create_invitation(org["id"], "new@example.com", role="member")
# Accept happens via email link (GET /v1/invitations/{id}/accept?token=xxx)
fabric.revoke_invitation(inv["id"])
invitations = fabric.list_invitations(org["id"])
# Current user
me = fabric.get_me()
my_orgs = fabric.get_my_organizations()
my_teams = fabric.get_my_teams()
my_perms = fabric.get_my_permissions()
# Authorization checks
can_edit = fabric.check_permission("workflow.edit", resource=f"workflow:{wf_id}")
# Batch check
results = fabric.check_permissions([
{"resource": "workflow:all", "action": "workflow.run"},
{"resource": "job:all", "action": "job.create"},
])
key = fabric.create_api_key("ci-pipeline", org["id"], scopes=["workflow.run"])
keys = fabric.list_api_keys()
fabric.rotate_api_key(key["id"])
fabric.disable_api_key(key["id"])
fabric.delete_api_key(key["id"])
fabric.set_secret("OPENAI_API_KEY", "sk-...")
names = fabric.list_secrets() # ["OPENAI_API_KEY"]
fabric.delete_secret("OPENAI_API_KEY")
providers = fabric.providers()
# [{"id": "openai", "modalities": ["text", "embedding"]}, ...]
# Direct provider execution
result = fabric.execute_provider(
modality="text",
input={"prompt": "Hello"},
model="gpt-4o",
)
# Cost estimation
estimate = fabric.estimate_cost(modality="text", input={"prompt": "Hello"})
# Stream all events
for event in fabric.stream_events():
print(event["kind"], event.get("data"))
# Stream events for a specific run
for event in fabric.stream_run_events(run_id):
if event["kind"] == "workflow.run.completed":
break
# Stream events for a specific job
for event in fabric.stream_job_events(job_id):
print(event)
webhook = fabric.create_webhook(
org["id"],
url="https://example.com/hooks",
events=["workflow.run.completed", "workflow.run.failed"],
secret="whsec_...",
)
webhooks = fabric.list_webhooks(org["id"])
fabric.update_webhook(webhook["id"], url="https://new.example.com/hooks")
fabric.delete_webhook(webhook["id"])
# Organization usage
usage = fabric.get_org_usage(org["id"])
records = fabric.get_org_usage_records(org["id"])
daily = fabric.get_org_usage_daily(org["id"])
# Audit logs
org_logs = fabric.get_org_audit_logs(org["id"])
my_logs = fabric.get_audit_logs()
galleries = fabric.list_galleries(kind="portrait")
items = fabric.list_gallery_items(gallery_id, tags=["professional"])
# Close the HTTP client when done
fabric.close()
# Or use context manager (auto-closes)
with FabricClient() as fabric:
...