Threads

Learn how to orchestrate conversational threads with the Investing Assistant API.

This API is currently in Beta.

Overview

Threads keep every Investing Assistant API exchange stateful so you can execute long-running research, pause work, and resume with full context. Each thread stores metadata, message history, task progress, and structured artifacts generated during analysis.

  • Use POST /client/v1/threads to create a new thread and GET /client/v1/threads to discover existing ones.
  • Send instructions with POST /client/v1/threads/{thread_id}/messages, stream events from GET /client/v1/threads/stream/{thread_id}, and fall back to GET /client/v1/threads/{thread_id}/progress when the stream is unavailable.
  • See chat history and structured data table outputs with GET /client/v1/threads/{thread_id}/messages and GET /client/v1/threads/{thread_id}/artifacts/{artifact_id} respectively.

Review the API reference after finishing this guide to inspect every request and response schema.

Thread lifecycle

1

Create or reuse a thread

Call POST /client/v1/threads with a descriptive thread_name.

2

Send a message and stream updates

Submit the user prompt with POST /client/v1/threads/{thread_id}/messages. Immediately open GET /client/v1/threads/stream/{thread_id} to receive incremental reasoning, artifacts, and completion events through Server-Sent Events (SSE).

3

Collect outputs

Outputs are streamed through GET /client/v1/threads/stream/{thread_id}, but you can fetch the latest messages using GET /client/v1/threads/{thread_id}/messages?start_index=0&limit_num=25. Structured data artifacts are accessible via GET /client/v1/threads/{thread_id}/artifacts/{artifact_id}.

Create and manage threads

Threads are lightweight resources. You can list, rename, or delete them without touching other workloads.

1import os
2import requests
3
4BASE_URL = os.getenv("INVESTING_ASSISTANT_BASE_URL", "https://alfa.boosted.ai")
5API_KEY = os.environ["INVESTING_ASSISTANT_API_KEY"]
6
7def auth_headers() -> dict:
8 return {"x-api-key": API_KEY, "Content-Type": "application/json"}
9
10def create_thread(thread_name: str) -> str:
11 resp = requests.post(
12 f"{BASE_URL}/client/v1/threads",
13 headers=auth_headers(),
14 json={"thread_name": thread_name},
15 timeout=30,
16 )
17 resp.raise_for_status()
18 return resp.json()["thread_id"]
19
20def list_threads() -> list[dict]:
21 resp = requests.get(f"{BASE_URL}/client/v1/threads", headers=auth_headers(), timeout=30)
22 resp.raise_for_status()
23 return resp.json()["threads"]
24
25def rename_thread(thread_id: str, new_name: str) -> None:
26 payload = {"name": new_name}
27 resp = requests.patch(
28 f"{BASE_URL}/client/v1/threads/{thread_id}",
29 headers=auth_headers(),
30 json=payload,
31 timeout=30,
32 )
33 resp.raise_for_status()

Deleting a thread removes all artifacts and history permanently. Be careful when issuing DELETE /client/v1/threads/{thread_id}.

Send and stream messages

POST /client/v1/threads/{thread_id}/messages validates the caller has access to the thread, queues the task, and returns immediately. Use the SSE stream to watch execution without polling.

1import os
2import requests
3
4BASE_URL = os.getenv("INVESTING_ASSISTANT_BASE_URL", "https://alfa.boosted.ai")
5API_KEY = os.environ["INVESTING_ASSISTANT_API_KEY"]
6
7def auth_headers():
8 return {"x-api-key": API_KEY}
9
10def send_message(thread_id: str, message: str) -> dict:
11 resp = requests.post(
12 f"{BASE_URL}/client/v1/threads/{thread_id}/messages",
13 headers=auth_headers(),
14 data={"message": message},
15 timeout=30,
16 )
17 resp.raise_for_status()
18 return resp.json()

If the SSE channel closes before completion, call GET /client/v1/threads/{thread_id}/progress. Issue DELETE /client/v1/threads/{thread_id}/task when a user manually stops work.

Retrieve history and artifacts

Use the history endpoint to build up the log of chat messages sent over the course of a conversation, and the artifacts endpoint to retrieve structured data output.

Artifacts are structured data outputs (such as tables, charts, or JSON objects) generated during thread execution. Unlike plain text responses, artifacts contain formatted data that can be retrieved and rendered separately using their unique artifact_id.

history_and_artifacts.py
1import os
2
3BASE_URL = os.getenv("INVESTING_ASSISTANT_BASE_URL", "https://alfa.boosted.ai")
4API_KEY = os.environ["INVESTING_ASSISTANT_API_KEY"]
5
6def auth_headers():
7 return {"x-api-key": API_KEY, "Content-Type": "application/json"}
8
9def get_history(thread_id: str, *, start_index: int = 0, limit_num: int = 50) -> list[dict]:
10 params = {"start_index": start_index, "limit_num": limit_num}
11 resp = requests.get(
12 f"{BASE_URL}/client/v1/threads/{thread_id}/messages",
13 headers=auth_headers(),
14 params=params,
15 timeout=30,
16 )
17 resp.raise_for_status()
18 return resp.json()["messages"]
19
20def get_artifact(thread_id: str, artifact_id: str) -> dict:
21 resp = requests.get(
22 f"{BASE_URL}/client/v1/threads/{thread_id}/artifacts/{artifact_id}",
23 headers=auth_headers(),
24 timeout=30,
25 )
26 resp.raise_for_status()
27 return resp.json()["artifact"]

Tie artifact IDs to the message_id returned by GET /client/v1/threads/{thread_id}/messages so you can render both the prose answer and the structured data.

Upload user files to a thread

Attach supporting documents directly to a thread so the model can cite them during follow-up prompts. The endpoint expects multipart form data and associates the uploaded blob with the specified thread.

upload_file.py
1import os
2
3BASE_URL = os.getenv("INVESTING_ASSISTANT_BASE_URL", "https://alfa.boosted.ai")
4API_KEY = os.environ["INVESTING_ASSISTANT_API_KEY"]
5
6def upload_user_file(thread_id: str, file_path: str) -> dict:
7 with open(file_path, "rb") as fh:
8 files = {"file": fh}
9 resp = requests.post(
10 f"{BASE_URL}/client/v1/threads/{thread_id}/files",
11 headers={"x-api-key": API_KEY},
12 files=files,
13 timeout=60,
14 )
15 resp.raise_for_status()
16 return resp.json()
17
18response = upload_user_file(thread_id, "documents/q2.pdf")
19print("Uploaded file:", response["file_id"])

Store the returned file_id next to the thread metadata so later POST /client/v1/threads/{thread_id}/messages calls can reference the uploaded evidence in user prompts.

Cancel a running task

Use the cancellation endpoint to stop the currently processing message whenever a user changes their mind. The API records a “User cancelled” entry in the thread so downstream reviewers can see why work halted.

cancel.py
1def cancel_thread_task(thread_id: str) -> None:
2 resp = requests.delete(
3 f"{BASE_URL}/client/v1/threads/{thread_id}/task",
4 headers=auth_headers(),
5 timeout=15,
6 )
7 resp.raise_for_status()
8
9cancel_thread_task(thread_id)

Complete example

The following example demonstrates the full workflow: creating a thread, establishing an SSE stream, sending a message, and processing real-time events until completion.

How streaming works

The SSE endpoint (GET /client/v1/threads/stream/{thread_id}) delivers events as they occur in the thread. To capture all events, open the stream connection before calling POST /client/v1/threads/{thread_id}/messages. The server pushes events in this typical flow:

  1. message_work_in_progress logs may appear before or between chunks, indicating background processing.
  2. message_chunk events arrive incrementally as the AI generates its response. Each chunk contains the partial text under message_chunk.message and a final_chunk boolean flag.
  3. When final_chunk: true arrives, the complete message is ready.
  4. message events signal that a complete message has been assembled.
  5. task_status or execution_status events report progress. When status: "Complete" appears, the thread is idle.

Each message_chunk event follows this structure:

1{
2 "event": {
3 "event_type": "message_chunk",
4 "message_chunk": {
5 "message": "Based on recent filings, ",
6 "message_id": "msg_abc123"
7 },
8 "final_chunk": false
9 }
10}

The message text is located at event.message_chunk.message. When final_chunk: true, no more chunks will arrive for that message. The example below uses Python’s threading module to handle the stream in the background while the main thread sends messages.

Script flow

The example script follows this sequence to ensure no events are missed:

  1. Create a thread using POST /client/v1/threads and capture the thread_id.
  2. Start a background thread that opens the SSE connection to /client/v1/threads/stream/{thread_id}.
  3. Wait for confirmation that the stream is connected using stream_connected.wait().
  4. Send the message via POST /client/v1/threads/{thread_id}/messages. This triggers server-side processing.
  5. Process events in the background thread: write message chunks to stdout, detect the final chunk, and exit when status: "Complete" arrives.

The threading synchronization ensures the SSE listener is ready before the message triggers any events. Without this coordination, the first few chunks might be lost if the stream connects slowly.

complete_example.py
1import json
2import sys
3import threading
4import time
5import requests
6
7BASE_URL = "https://alfa.boosted.ai"
8API_KEY = "your-api-key-here"
9
10stream_connected = threading.Event()
11stream_error = threading.Event()
12
13def create_thread(thread_name):
14 url = f"{BASE_URL}/client/v1/threads"
15 headers = {
16 "x-api-key": API_KEY,
17 "Content-Type": "application/json"
18 }
19 data = {"thread_name": thread_name}
20
21 response = requests.post(url, headers=headers, json=data)
22 response.raise_for_status()
23 return response.json()["thread_id"]
24
25def send_message(thread_id, message):
26 url = f"{BASE_URL}/client/v1/threads/{thread_id}/messages"
27 headers = {"x-api-key": API_KEY}
28 data = {"message": message}
29
30 print(f"\nSending message...")
31 response = requests.post(url, headers=headers, data=data)
32 response.raise_for_status()
33 print("Message sent successfully!")
34 return response.json()
35
36def stream_thread_events(thread_id):
37 url = f"{BASE_URL}/client/v1/threads/stream/{thread_id}"
38 headers = {
39 "x-api-key": API_KEY,
40 "Accept": "text/event-stream",
41 "Cache-Control": "no-cache"
42 }
43
44 try:
45 print("\nConnecting to event stream...")
46 response = requests.get(url, headers=headers, stream=True, timeout=None)
47 response.raise_for_status()
48
49 print("Connected to stream, waiting for events...\n")
50 stream_connected.set()
51
52 for line in response.iter_lines(decode_unicode=True):
53 if line:
54 if line.startswith(':'):
55 continue
56
57 if line.startswith('data: '):
58 data = line[6:]
59 try:
60 event_data = json.loads(data)
61 event_type = event_data.get('event', {}).get('event_type', 'unknown')
62
63 if event_type == 'message_chunk':
64 chunk = event_data['event'].get('message_chunk', {})
65 sys.stdout.write(chunk.get('message', ''))
66 sys.stdout.flush()
67
68 if event_data['event'].get('final_chunk'):
69 print("\n-- final chunk --")
70 elif event_type == 'message':
71 print(f"\n\nComplete message received")
72 elif event_type == 'task_status' or event_type == 'execution_status':
73 status = event_data['event'].get('status', '')
74 if status == 'Complete':
75 print(f"\n\nTask completed!")
76 break
77 else:
78 print(f"\n[{event_type}]")
79 except json.JSONDecodeError:
80 print(f"Data: {data}")
81 elif line.startswith('event: '):
82 event_type = line[7:]
83 if event_type == 'close':
84 print("\n\nStream closed by server")
85 break
86
87 except requests.exceptions.RequestException as e:
88 print(f"\nStream error: {e}")
89 stream_error.set()
90 except Exception as e:
91 print(f"\nUnexpected error: {e}")
92 stream_error.set()
93
94def main():
95 try:
96 print("Creating a new thread...")
97 thread_id = create_thread("Research Thread")
98 print(f"Thread created with ID: {thread_id}")
99
100 message = input("\nEnter your message: ")
101
102 stream_thread = threading.Thread(
103 target=stream_thread_events,
104 args=(thread_id,),
105 daemon=True
106 )
107 stream_thread.start()
108
109 if not stream_connected.wait(timeout=10):
110 print("Failed to connect to stream within 10 seconds")
111 return
112
113 if stream_error.is_set():
114 print("Stream connection failed")
115 return
116
117 time.sleep(0.5)
118
119 send_message(thread_id, message)
120
121 stream_thread.join(timeout=120)
122
123 if stream_thread.is_alive():
124 print("\n\nStream still active after 120 seconds, exiting...")
125
126 except requests.exceptions.RequestException as e:
127 print(f"Error: {e}")
128 sys.exit(1)
129 except KeyboardInterrupt:
130 print("\n\nInterrupted by user")
131 sys.exit(0)
132
133if __name__ == "__main__":
134 if API_KEY == "your-api-key-here":
135 print("Please set your API key in the script before running.")
136 sys.exit(1)
137 main()

Replace your-api-key-here with your actual API key. The example uses Python’s threading module to establish the SSE stream before sending the message, ensuring all events are captured.

Event types and structure

The stream emits several event types. Here are the most common:

Work in progress logs (background processing updates):

1{
2 "event": {
3 "event_type": "message_work_in_progress",
4 "log": "Analyzing financial statements..."
5 }
6}

Message chunks (incremental response text):

1{
2 "event": {
3 "event_type": "message_chunk",
4 "message_chunk": {
5 "message": "Tesla reported revenue of ",
6 "message_id": "msg_xyz789"
7 },
8 "final_chunk": false
9 }
10}

Final chunk (last piece of the message):

1{
2 "event": {
3 "event_type": "message_chunk",
4 "message_chunk": {
5 "message": "$25.2B in Q3 2024.",
6 "message_id": "msg_xyz789"
7 },
8 "final_chunk": true
9 }
10}

Complete message (with metadata and tools):

1{
2 "event": {
3 "event_type": "message",
4 "message": {
5 "message_id": "msg_xyz789",
6 "message": "```{\"type\": \"output_artifact\", \"artifact_id\": \"art_123\"}```\nAnalysis complete.",
7 "message_metadata": {
8 "tools_called": {"screen_stocks": []},
9 "citations": []
10 }
11 }
12 }
13}

Artifacts are JSON strings embedded in the message text (wrapped in triple backticks). Use GET /client/v1/threads/{thread_id}/artifacts/{artifact_id} to fetch the actual artifact data.

Task status (progress updates):

1{
2 "event": {
3 "event_type": "task_status",
4 "status": "Complete",
5 "log": "Analysis complete"
6 }
7}

The example script writes message chunks directly to stdout as they arrive by reading event.message_chunk.message, creating a real-time typing effect. Work-in-progress logs may appear before or between chunks. When final_chunk: true appears, the complete message is ready. The client must parse any artifact markers from the message text to extract artifact_id values. The script continues listening until it receives a task_status event with status: "Complete", signaling that all processing has finished.