Threads

Learn how to orchestrate conversational threads with the Alfa (Preview) API.

This API is currently in Preview.

Overview

Threads keep every Alfa API exchange stateful so you can execute long-running research, pause work, and resume with full context. Each thread stores metadata, message history, task progress, and structured artifacts generated during analysis.

  • Use POST /v2/threads to create a new thread and GET /v2/threads to discover existing ones.
  • Send instructions with POST /v2/threads/{thread_id}/messages and stream events using gRPC or SSE streaming.
  • See chat history and structured data table outputs with GET /v2/threads/{thread_id}/messages and GET /v2/threads/{thread_id}/artifacts/{artifact_id} respectively.

Review the API reference after finishing this guide to inspect every request and response schema.

Thread lifecycle

1

Create or reuse a thread

Call POST /v2/threads with a descriptive thread_name.

2

Send a message and stream updates

Submit the user prompt with POST /v2/threads/{thread_id}/messages. Use gRPC or SSE streaming to receive incremental reasoning, artifacts, and completion events in real time.

3

Collect outputs

Outputs are streamed through the streaming endpoints, but you can fetch the latest complete messages using GET /v2/threads/{thread_id}/messages?start_index=0&limit_num=25. Structured data artifacts are accessible via GET /v2/threads/{thread_id}/artifacts/{artifact_id}.

Create and manage threads

Threads are lightweight resources. You can list, rename, or delete them without touching other workloads.

1import os
2import requests
3
4BASE_URL = os.getenv("ALFA_API_BASE_URL", "https://sandbox.api.boosted.ai")
5ACCESS_TOKEN = os.environ["ALFA_ACCESS_TOKEN"]
6
7def auth_headers() -> dict:
8 return {"Authorization": f"Bearer {ACCESS_TOKEN}", "Content-Type": "application/json"}
9
10def create_thread(thread_name: str) -> str:
11 resp = requests.post(
12 f"{BASE_URL}/v2/threads",
13 headers=auth_headers(),
14 json={"thread_name": thread_name},
15 timeout=30,
16 )
17 resp.raise_for_status()
18 return resp.json()["thread_id"]
19
20def list_threads() -> list[dict]:
21 resp = requests.get(f"{BASE_URL}/v2/threads", headers=auth_headers(), timeout=30)
22 resp.raise_for_status()
23 return resp.json()["threads"]
24
25def rename_thread(thread_id: str, new_name: str) -> None:
26 payload = {"name": new_name}
27 resp = requests.patch(
28 f"{BASE_URL}/v2/threads/{thread_id}",
29 headers=auth_headers(),
30 json=payload,
31 timeout=30,
32 )
33 resp.raise_for_status()

Deleting a thread removes all artifacts and history permanently. Be careful when issuing DELETE /v2/threads/{thread_id}.

Send messages

POST /v2/threads/{thread_id}/messages validates the caller has access to the thread, queues the task, and returns immediately. To receive real-time updates, use gRPC or SSE streaming.

send_message.py
1import os
2import requests
3
4BASE_URL = os.getenv("ALFA_API_BASE_URL", "https://sandbox.api.boosted.ai")
5ACCESS_TOKEN = os.environ["ALFA_ACCESS_TOKEN"]
6
7def auth_headers():
8 return {"Authorization": f"Bearer {ACCESS_TOKEN}"}
9
10def send_message(thread_id: str, message: str) -> dict:
11 resp = requests.post(
12 f"{BASE_URL}/v2/threads/{thread_id}/messages",
13 headers=auth_headers(),
14 data={"message": message},
15 timeout=30,
16 )
17 resp.raise_for_status()
18 return resp.json()

To stream thread events in real time, see the streaming guide for both gRPC and SSE options. If a stream closes before completion, call GET /v2/threads/{thread_id}/progress to check status.

Retrieve history and artifacts

Use the history endpoint to build up the log of chat messages sent over the course of a conversation, and the artifacts endpoint to retrieve structured data output.

Artifacts are structured data outputs (such as tables, charts, or JSON objects) generated during thread execution. Unlike plain text responses, artifacts contain formatted data that can be retrieved and rendered separately using their unique artifact_id.

history_and_artifacts.py
1import os
2import requests
3
4BASE_URL = os.getenv("ALFA_API_BASE_URL", "https://sandbox.api.boosted.ai")
5ACCESS_TOKEN = os.environ["ALFA_ACCESS_TOKEN"]
6
7def auth_headers():
8 return {"Authorization": f"Bearer {ACCESS_TOKEN}", "Content-Type": "application/json"}
9
10def get_history(thread_id: str, *, start_index: int = 0, limit_num: int = 50) -> list[dict]:
11 params = {"start_index": start_index, "limit_num": limit_num}
12 resp = requests.get(
13 f"{BASE_URL}/v2/threads/{thread_id}/messages",
14 headers=auth_headers(),
15 params=params,
16 timeout=30,
17 )
18 resp.raise_for_status()
19 return resp.json()["messages"]
20
21def get_artifact(thread_id: str, artifact_id: str) -> dict:
22 resp = requests.get(
23 f"{BASE_URL}/v2/threads/{thread_id}/artifacts/{artifact_id}",
24 headers=auth_headers(),
25 timeout=30,
26 )
27 resp.raise_for_status()
28 return resp.json()["artifact"]

Tie artifact IDs to the message_id returned by GET /v2/threads/{thread_id}/messages so you can render both the prose answer and the structured data.

Upload user files to a thread

Attach supporting documents directly to a thread so the model can cite them during follow-up prompts. The endpoint expects multipart form data and associates the uploaded blob with the specified thread.

upload_file.py
1import os
2import requests
3
4BASE_URL = os.getenv("ALFA_API_BASE_URL", "https://sandbox.api.boosted.ai")
5ACCESS_TOKEN = os.environ["ALFA_ACCESS_TOKEN"]
6
7def upload_user_file(thread_id: str, file_path: str) -> dict:
8 with open(file_path, "rb") as fh:
9 files = {"file": fh}
10 resp = requests.post(
11 f"{BASE_URL}/v2/threads/{thread_id}/files",
12 headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
13 files=files,
14 timeout=60,
15 )
16 resp.raise_for_status()
17 return resp.json()
18
19response = upload_user_file(thread_id, "documents/q2.pdf")
20print("Uploaded file:", response["file_id"])

Store the returned file_id next to the thread metadata so later POST /v2/threads/{thread_id}/messages calls can reference the uploaded evidence in user prompts.

Cancel a running task

Use the cancellation endpoint to stop the currently processing message whenever a user changes their mind. The API records a “User cancelled” entry in the thread so downstream reviewers can see why work halted.

cancel.py
1def cancel_thread_task(thread_id: str) -> None:
2 resp = requests.delete(
3 f"{BASE_URL}/v2/threads/{thread_id}/task",
4 headers=auth_headers(),
5 timeout=15,
6 )
7 resp.raise_for_status()
8
9cancel_thread_task(thread_id)