Streaming

Subscribe to thread updates using bidirectional gRPC streaming or Server-Sent Events (SSE).

Overview

The Thread API provides two methods for streaming real-time thread events: gRPC bidirectional streaming and REST API Server-Sent Events (SSE). Both methods deliver incremental message chunks, artifacts, and completion events as they occur during thread execution.

Choose the method that best fits your infrastructure:

  • gRPC streaming
  • SSE streaming

Server-Sent Events (SSE)

The SSE endpoint (GET /v2/threads/stream/{thread_id}) establishes a persistent HTTP connection that streams events as they occur. Open the stream connection before calling POST /v2/threads/{thread_id}/messages to capture all events.

How SSE streaming works

The server pushes events in this typical flow:

  1. message_work_in_progress logs may appear before or between chunks, indicating background processing.
  2. message_chunk events arrive incrementally as the AI generates its response. Each chunk contains the partial text under message_chunk.message and a final_chunk boolean flag.
  3. When final_chunk: true arrives, the complete message is ready.
  4. message events signal that a complete message has been assembled.
  5. task_status or execution_status events report progress. When status: "Complete" appears, the thread is idle.

Basic usage

1import os
2import requests
3
4BASE_URL = os.getenv("ALFA_API_BASE_URL", "https://sandbox.api.boosted.ai")
5ACCESS_TOKEN = os.environ["ALFA_ACCESS_TOKEN"]
6
7def auth_headers():
8 return {"Authorization": f"Bearer {ACCESS_TOKEN}"}
9
10def send_message(thread_id: str, message: str) -> dict:
11 resp = requests.post(
12 f"{BASE_URL}/v2/threads/{thread_id}/messages",
13 headers=auth_headers(),
14 data={"message": message},
15 timeout=30,
16 )
17 resp.raise_for_status()
18 return resp.json()

If the SSE channel closes before completion, call GET /v2/threads/{thread_id}/progress to check status. Issue DELETE /v2/threads/{thread_id}/task when a user manually stops work.

Complete SSE example

The following example demonstrates the full workflow: creating a thread, establishing an SSE stream, sending a message, and processing real-time events until completion.

The script uses Python’s threading module to establish the SSE stream before sending the message, ensuring all events are captured:

complete_sse_example.py
1import json
2import os
3import sys
4import threading
5import time
6import requests
7
8BASE_URL = "https://sandbox.api.boosted.ai"
9ACCESS_TOKEN = os.environ.get("ALFA_ACCESS_TOKEN", "your-access-token-here")
10
11stream_connected = threading.Event()
12stream_error = threading.Event()
13
14def create_thread(thread_name):
15 url = f"{BASE_URL}/v2/threads"
16 headers = {
17 "Authorization": f"Bearer {ACCESS_TOKEN}",
18 "Content-Type": "application/json"
19 }
20 data = {"thread_name": thread_name}
21
22 response = requests.post(url, headers=headers, json=data)
23 response.raise_for_status()
24 return response.json()["thread_id"]
25
26def send_message(thread_id, message):
27 url = f"{BASE_URL}/v2/threads/{thread_id}/messages"
28 headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
29 data = {"message": message}
30
31 print(f"\nSending message...")
32 response = requests.post(url, headers=headers, data=data)
33 response.raise_for_status()
34 print("Message sent successfully!")
35 return response.json()
36
37def stream_thread_events(thread_id):
38 url = f"{BASE_URL}/v2/threads/stream/{thread_id}"
39 headers = {
40 "Authorization": f"Bearer {ACCESS_TOKEN}",
41 "Accept": "text/event-stream",
42 "Cache-Control": "no-cache"
43 }
44
45 try:
46 print("\nConnecting to event stream...")
47 response = requests.get(url, headers=headers, stream=True, timeout=None)
48 response.raise_for_status()
49
50 print("Connected to stream, waiting for events...\n")
51 stream_connected.set()
52
53 for line in response.iter_lines(decode_unicode=True):
54 if line:
55 if line.startswith(':'):
56 continue
57
58 if line.startswith('data: '):
59 data = line[6:]
60 try:
61 event_data = json.loads(data)
62 event_type = event_data.get('event', {}).get('event_type', 'unknown')
63
64 if event_type == 'message_chunk':
65 chunk = event_data['event'].get('message_chunk', {})
66 sys.stdout.write(chunk.get('message', ''))
67 sys.stdout.flush()
68
69 if event_data['event'].get('final_chunk'):
70 print("\n-- final chunk --")
71 elif event_type == 'message':
72 print(f"\n\nComplete message received")
73 elif event_type == 'task_status' or event_type == 'execution_status':
74 status = event_data['event'].get('status', '')
75 if status == 'Complete':
76 print(f"\n\nTask completed!")
77 break
78 else:
79 print(f"\n[{event_type}]")
80 except json.JSONDecodeError:
81 print(f"Data: {data}")
82 elif line.startswith('event: '):
83 event_type = line[7:]
84 if event_type == 'close':
85 print("\n\nStream closed by server")
86 break
87
88 except requests.exceptions.RequestException as e:
89 print(f"\nStream error: {e}")
90 stream_error.set()
91 except Exception as e:
92 print(f"\nUnexpected error: {e}")
93 stream_error.set()
94
95def main():
96 try:
97 print("Creating a new thread...")
98 thread_id = create_thread("Research Thread")
99 print(f"Thread created with ID: {thread_id}")
100
101 message = input("\nEnter your message: ")
102
103 stream_thread = threading.Thread(
104 target=stream_thread_events,
105 args=(thread_id,),
106 daemon=True
107 )
108 stream_thread.start()
109
110 if not stream_connected.wait(timeout=10):
111 print("Failed to connect to stream within 10 seconds")
112 return
113
114 if stream_error.is_set():
115 print("Stream connection failed")
116 return
117
118 time.sleep(0.5)
119
120 send_message(thread_id, message)
121
122 stream_thread.join(timeout=120)
123
124 if stream_thread.is_alive():
125 print("\n\nStream still active after 120 seconds, exiting...")
126
127 except requests.exceptions.RequestException as e:
128 print(f"Error: {e}")
129 sys.exit(1)
130 except KeyboardInterrupt:
131 print("\n\nInterrupted by user")
132 sys.exit(0)
133
134if __name__ == "__main__":
135 if ACCESS_TOKEN == "your-access-token-here":
136 print("Please set your ALFA_ACCESS_TOKEN environment variable before running.")
137 sys.exit(1)
138 main()

Set the ALFA_ACCESS_TOKEN environment variable with your OAuth2 access token obtained via token exchange. See the authorization guide for details. The threading synchronization ensures the SSE listener is ready before the message triggers any events, preventing lost chunks.

SSE event types

The stream emits several event types. Here are the most common:

Work in progress logs (background processing updates):

1{
2 "event": {
3 "event_type": "message_work_in_progress",
4 "log": "Analyzing financial statements..."
5 }
6}

Message chunks (incremental response text):

1{
2 "event": {
3 "event_type": "message_chunk",
4 "message_chunk": {
5 "message": "Tesla reported revenue of ",
6 "message_id": "msg_xyz789"
7 },
8 "final_chunk": false
9 }
10}

Final chunk (last piece of the message):

1{
2 "event": {
3 "event_type": "message_chunk",
4 "message_chunk": {
5 "message": "$25.2B in Q3 2024.",
6 "message_id": "msg_xyz789"
7 },
8 "final_chunk": true
9 }
10}

Complete message (with metadata and tools):

1{
2 "event": {
3 "event_type": "message",
4 "message": {
5 "message_id": "msg_xyz789",
6 "message": "```{\"type\": \"output_artifact\", \"artifact_id\": \"art_123\"}```\nAnalysis complete.",
7 "message_metadata": {
8 "tools_called": {"screen_stocks": []},
9 "citations": []
10 }
11 }
12 }
13}

Artifacts are JSON strings embedded in the message text (wrapped in triple backticks). Use GET /v2/threads/{thread_id}/artifacts/{artifact_id} to fetch the actual artifact data.

Task status (progress updates):

1{
2 "event": {
3 "event_type": "task_status",
4 "status": "Complete",
5 "log": "Analysis complete"
6 }
7}

The message text is located at event.message_chunk.message. When final_chunk: true appears, the complete message is ready. The client must parse any artifact markers from the message text to extract artifact_id values. The script continues listening until it receives a task_status event with status: "Complete", signaling that all processing has finished.

gRPC bidirectional streaming

Protocol buffer definition

Download and compile the following protocol buffer file to generate client stubs:

thread_service.proto
1syntax = "proto3";
2
3package thread_service.v1;
4import "google/protobuf/timestamp.proto";
5option java_package = "com.gbi.protobuf_defs.generated.thread_service.v1";
6option go_package = "github.com/GBI-Core/protobuf-defs/gobuild/thread_service/v1";
7
8service ThreadService {
9 /////////////////////////////////////////////////////////////////
10 // Thread API
11 /////////////////////////////////////////////////////////////////
12 // Streams thread events from multiple threads
13 rpc StreamThreadEvents(stream StreamThreadEventsRequest)
14 returns (stream StreamThreadEventsResponse);
15}
16
17/////////////////////////////////////////////////////////////////
18// Common messages
19/////////////////////////////////////////////////////////////////
20
21message UUID {
22 string id = 1;
23}
24
25message Citation {
26 message WebCitation {
27 string link = 1;
28 google.protobuf.Timestamp last_fetched_at = 2;
29 int64 snippet_highlight_start = 3;
30 int64 snippet_highlight_end = 4;
31 string source_type = 5;
32 }
33 message NewsDevelopmentCitation {
34 int64 num_articles = 1;
35 google.protobuf.Timestamp development_start = 2;
36 }
37 message DocumentCitation {
38 int64 snippet_highlight_start = 1;
39 int64 snippet_highlight_end = 2;
40 bool is_snippet = 3;
41 }
42
43 UUID citation_id = 1;
44 string name = 2;
45 string citation_type = 3;
46 google.protobuf.Timestamp last_updated_at = 4;
47
48 oneof citation {
49 WebCitation web_citation = 5;
50 NewsDevelopmentCitation news_development_citation = 6;
51 DocumentCitation document_citation = 7;
52 }
53}
54
55message ThreadData {
56 enum ErrorType {
57 ERROR_TYPE_UNKNOWN = 0;
58 ERROR_TYPE_INTERNAL = 1;
59 }
60
61 message MessageEvent {
62 UUID message_id = 1;
63 string message = 2;
64 bool is_user_message = 3;
65 google.protobuf.Timestamp message_time = 4;
66 repeated Citation citations = 5;
67 }
68
69 message MessageChunkEvent {
70 UUID message_id = 1;
71 string message = 2;
72 bool final_chunk = 3;
73 repeated Citation citations = 4;
74 }
75
76 message WorkInProgressEvent {
77 string log = 1;
78 }
79
80 message ThreadNameEvent {
81 string thread_name = 1;
82 }
83
84 message WorkerErrorEvent {
85 ErrorType error_type = 1;
86 }
87
88 message MessageCancelledEvent {
89 string cancellation_message = 1;
90 }
91
92 oneof payload {
93 MessageEvent message_event = 1;
94 MessageChunkEvent message_chunk_event = 2;
95 WorkInProgressEvent work_in_progress_event = 3;
96 ThreadNameEvent thread_name_event = 4;
97 WorkerErrorEvent worker_error_event = 5;
98 MessageCancelledEvent message_cancelled_event = 6;
99 }
100}
101
102/////////////////////////////////////////////////////////////////
103// REQUEST/RESPONSE MESSAGES
104/////////////////////////////////////////////////////////////////
105
106message StreamThreadEventsRequest {
107 // The token for the user that is requesting access to the thread
108 string user_token = 1;
109 // The thread id the user is requesting access to
110 UUID thread_id = 2;
111 bool subscribe = 3;
112}
113
114message StreamThreadEventsResponse {
115 // The user(s) that requested access to the thread
116 repeated UUID user_ids = 1;
117 // The thread id the data corresponds to
118 UUID thread_id = 2;
119 // The data for the thread
120 ThreadData thread_data = 3;
121}

Building the client

Generate client code from the proto file using the gRPC tools for your language.

Python:

$python -m grpc_tools.protoc \
> -I./proto \
> --python_out=. \
> --grpc_python_out=. \
> --pyi_out=. \
> <path_to_proto_file>

This generates:

  • thread_service_pb2.py: Protocol buffer message classes
  • thread_service_pb2_grpc.py: gRPC service stub classes
  • thread_service_pb2.pyi: Type stubs for type checking

Go:

$protoc \
> -I./proto \
> --go_out=. \
> --go_opt=paths=source_relative \
> --go-grpc_out=. \
> --go-grpc_opt=paths=source_relative \
> <path_to_proto_file>

This generates Go code locally. Update the go_package option in the proto file to match your module path, or import the generated code using the path specified in go_package.

gRPC connection setup

Establish a secure gRPC channel with keepalive settings to maintain the connection:

1import grpc
2
3GRPC_SERVER = "sandbox.api.boosted.ai:443"
4SERVICE_ACCOUNT_TOKEN = "<YOUR SERVICE ACCOUNT TOKEN>"
5
6# Create secure channel with keepalive options
7credentials = grpc.ssl_channel_credentials()
8options = [
9 ('grpc.keepalive_time_ms', 10000), # Send keepalive ping every 10 seconds
10 ('grpc.keepalive_timeout_ms', 5000), # Wait 5 seconds for ping ack
11 ('grpc.keepalive_permit_without_calls', True), # Allow keepalive pings when no calls
12 ('grpc.http2.max_pings_without_data', 0), # No limit on pings without data
13 ('grpc.http2.min_time_between_pings_ms', 10000), # Min 10 seconds between pings
14 ('grpc.http2.min_ping_interval_without_data_ms', 5000), # Min 5 seconds between pings without data
15]
16channel = grpc.secure_channel(GRPC_SERVER, credentials, options=options)
17stub = thread_service_pb2_grpc.ThreadServiceStub(channel)
18metadata = [("authorization", f"Bearer {SERVICE_ACCOUNT_TOKEN}")]

The keepalive settings ensure the connection remains active even during periods of inactivity, preventing timeouts.

Complete gRPC example

The following example demonstrates subscribing to thread events via gRPC streaming:

grpc_streaming_example.py
1import time
2from collections.abc import Iterator
3
4import grpc
5
6from thread_service.v1 import thread_service_pb2, thread_service_pb2_grpc
7
8
9def test_streaming():
10 # Configuration - update these
11 GRPC_SERVER = "sandbox.api.boosted.ai:443"
12 SERVICE_ACCOUNT_TOKEN = "<YOUR SERVICE ACCOUNT TOKEN>"
13 THREAD_ID = "<YOUR THREAD ID>"
14 USER_TOKEN = "<YOUR USER TOKEN>"
15
16 # Connect
17 credentials = grpc.ssl_channel_credentials()
18 options = [
19 ('grpc.keepalive_time_ms', 10000),
20 ('grpc.keepalive_timeout_ms', 5000),
21 ('grpc.keepalive_permit_without_calls', True),
22 ('grpc.http2.max_pings_without_data', 0),
23 ('grpc.http2.min_time_between_pings_ms', 10000),
24 ('grpc.http2.min_ping_interval_without_data_ms', 5000),
25 ]
26 channel = grpc.secure_channel(GRPC_SERVER, credentials, options=options)
27 stub = thread_service_pb2_grpc.ThreadServiceStub(channel)
28 metadata = [("authorization", f"Bearer {SERVICE_ACCOUNT_TOKEN}")]
29
30 print(f"Connecting to {GRPC_SERVER}")
31
32 def request_generator():
33 """Send subscription request."""
34 yield thread_service_pb2.StreamThreadEventsRequest(
35 thread_id=thread_service_pb2.UUID(id=THREAD_ID),
36 user_token=USER_TOKEN,
37 subscribe=True,
38 )
39 # Keep stream alive
40 while True:
41 time.sleep(60)
42
43 try:
44 # Start streaming
45 stream: Iterator[thread_service_pb2.StreamThreadEventsResponse] = stub.StreamThreadEvents(
46 request_generator(), metadata=metadata
47 )
48 print(f"Subscribed to thread {THREAD_ID}")
49 print("Listening for events... (Ctrl+C to stop)")
50
51 # Receive events
52 for response in stream:
53 print("\nEvent received!")
54 print(f" Thread: {response.thread_id.id}")
55 print(f" Users: {[u.id for u in response.user_ids]}")
56 print(f" Data: {response.thread_data}")
57
58 except KeyboardInterrupt:
59 print("\nStopped")
60 except grpc.RpcError as e:
61 print(f"Error: {e.code()}: {e.details()}")
62 finally:
63 channel.close()
64
65
66if __name__ == "__main__":
67 test_streaming()

How gRPC streaming works

  1. Create a secure channel with SSL credentials and keepalive options to maintain the connection.
  2. Create a request generator that yields the initial subscription request with thread_id, user_token, and subscribe=True. The generator continues running to keep the stream alive.
  3. Call the streaming RPC using the generated stub, passing the request generator and authentication metadata.
  4. Iterate over responses as they arrive. Each StreamThreadEventsResponse contains:
    • thread_id: The thread the event belongs to
    • user_ids: List of user IDs that requested access
    • thread_data: A ThreadData message containing the event payload

The thread_data field is a ThreadData message with a oneof payload that can be one of:

  • message_event: A complete message with citations
  • message_chunk_event: An incremental message chunk (check final_chunk for completion)
  • work_in_progress_event: Background processing status
  • thread_name_event: Thread name updates
  • worker_error_event: Error notifications
  • message_cancelled_event: Cancellation confirmations

The request generator must continue yielding or sleeping to keep the bidirectional stream alive. If the generator exits, the stream will close.

Authentication

gRPC streaming uses Bearer token authentication via metadata. Include your service account token in the metadata:

1SERVICE_ACCOUNT_TOKEN = "<YOUR SERVICE ACCOUNT TOKEN>"
2metadata = [("authorization", f"Bearer {SERVICE_ACCOUNT_TOKEN}")]
3stream = stub.StreamThreadEvents(request_generator(), metadata=metadata)

Obtain your service account token from your Boosted account. See the authorization guide for details. Ensure your token has permission to access the specified thread.