Skip to content

Python Platform SDK

The Python Platform SDK (fabric-platform) provides a typed HTTP client for the Fabric control plane API. It wraps every REST endpoint with Python methods and handles authentication, token refresh, and SSE streaming.

Terminal window
pip install fabric-platform

Requirements: Python >= 3.9, httpx >= 0.25

from fabric_platform import FabricClient
# Connect (defaults to localhost:3001)
fabric = FabricClient()
# Or with API key + remote server
fabric = FabricClient(
base_url="https://fabric.example.com",
api_key="fab_live_abc123",
organization_id="org-uuid",
)
# Use as context manager
with FabricClient() as fabric:
status = fabric.health_check()
print(status)
FabricClient(
base_url: str = "http://localhost:3001",
api_key: str | None = None,
principal_id: str | None = None,
organization_id: str = "00000000-0000-0000-0000-000000000010",
timeout: float = 30.0,
)
ParameterDescription
base_urlFabric API base URL
api_keyfab_* API key for authentication
principal_idUser/service account ID (alternative to api_key)
organization_idDefault org for scoped operations
timeoutHTTP request timeout in seconds
# Email/password
fabric.signup("user@example.com", "password123")
fabric.login("user@example.com", "password123") # auto-stores tokens
# Passwordless
fabric.magic_link("user@example.com")
# OAuth
url = fabric.social_login_url("google") # redirect user here
# Token management
fabric.refresh_token(refresh_token)
fabric.logout()
# Password recovery
fabric.forgot_password("user@example.com")
fabric.reset_password(recovery_token, "new_password")
# Create a job
job = fabric.create_job(
modality="text",
input={"prompt": "Summarize this article"},
idempotency_key="unique-key-123",
)
# Retrieve
job = fabric.get_job(job["id"])
jobs = fabric.list_jobs(limit=20, cursor=None)
# Usage
usage = fabric.get_job_usage(job["id"])
# Register a workflow definition
wf_id = fabric.upsert_workflow(
name="my-pipeline",
nodes=[{"type": "ai_generate", "config": {...}}],
defaults={"timeout": 300},
)
# List and retrieve
workflows = fabric.list_workflows()
wf = fabric.get_workflow(wf_id)
# Start a run
run_id = fabric.run_workflow(
wf_id,
context={"topic": "AI agents"},
priority=5,
)
# Poll until complete (blocks)
result = fabric.wait_for_run(run_id, poll_interval=1.0, timeout=300.0)
# Or check manually
run = fabric.get_run(run_id)
print(run["status"]) # "pending", "running", "completed", "failed"
# Manage runs
fabric.cancel_run(run_id)
fabric.bulk_cancel_runs([run_id_1, run_id_2])
log = fabric.run_log(run_id)
children = fabric.list_children(run_id)
# Resume a paused node
fabric.resume_node(run_id, "approval_step", data={"approved": True})
# Create cron schedule for a workflow
schedule = fabric.create_schedule(
workflow_definition_id=wf_id,
cron_expression="0 9 * * 1-5", # weekdays at 9am
)
# Manage
schedules = fabric.list_schedules(wf_id)
fabric.update_schedule(schedule["id"], cron_expression="0 8 * * *")
fabric.trigger_schedule(schedule["id"]) # manual trigger
history = fabric.schedule_history(schedule["id"])
fabric.delete_schedule(schedule["id"])
# Organizations
org = fabric.create_organization(slug="acme", name="Acme Corp")
orgs = fabric.list_organizations()
org = fabric.get_organization(org["id"])
members = fabric.list_org_members(org["id"])
# Teams
team = fabric.create_team(org["id"], slug="engineering", name="Engineering")
teams = fabric.list_org_teams(org["id"])
# Invitations
inv = fabric.create_invitation(org["id"], "new@example.com", role="member")
fabric.accept_invitation(inv["id"])
fabric.revoke_invitation(inv["id"])
# Current user
me = fabric.get_me()
my_orgs = fabric.get_my_organizations()
my_teams = fabric.get_my_teams()
my_perms = fabric.get_my_permissions()
# Authorization checks
can_edit = fabric.check_permission("workflow.edit", resource_type="workflow", resource_id=wf_id)
# Batch check
results = fabric.check_permissions([
{"action": "workflow.run", "resource_type": "workflow"},
{"action": "job.create"},
])
key = fabric.create_api_key("ci-pipeline", org["id"], scopes=["workflow.run"])
keys = fabric.list_api_keys()
fabric.rotate_api_key(key["id"])
fabric.disable_api_key(key["id"])
fabric.delete_api_key(key["id"])
fabric.set_secret("OPENAI_API_KEY", "sk-...")
names = fabric.list_secrets() # ["OPENAI_API_KEY"]
fabric.delete_secret("OPENAI_API_KEY")
providers = fabric.providers()
# [{"id": "openai", "modalities": ["text", "embedding"]}, ...]
# Direct provider execution
result = fabric.execute_provider(
modality="text",
input={"prompt": "Hello"},
model="gpt-4o",
)
# Cost estimation
estimate = fabric.estimate_cost(modality="text", input={"prompt": "Hello"})
# Stream all events
for event in fabric.stream_events():
print(event["kind"], event.get("data"))
# Stream events for a specific run
for event in fabric.stream_run_events(run_id):
if event["kind"] == "workflow.run.completed":
break
# Stream events for a specific job
for event in fabric.stream_job_events(job_id):
print(event)
webhook = fabric.create_webhook(
org["id"],
url="https://example.com/hooks",
events=["workflow.run.completed", "workflow.run.failed"],
secret="whsec_...",
)
webhooks = fabric.list_webhooks(org["id"])
fabric.update_webhook(webhook["id"], url="https://new.example.com/hooks")
fabric.delete_webhook(webhook["id"])
# Organization usage
usage = fabric.get_org_usage(org["id"])
records = fabric.get_org_usage_records(org["id"])
daily = fabric.get_org_usage_daily(org["id"])
# Audit logs
org_logs = fabric.get_org_audit_logs(org["id"])
my_logs = fabric.get_audit_logs()
galleries = fabric.list_galleries(kind="portrait")
items = fabric.list_gallery_items(gallery_id, tags=["professional"])
# Close the HTTP client when done
fabric.close()
# Or use context manager (auto-closes)
with FabricClient() as fabric:
...