Streaming

Subscribe to thread updates using bidirectional gRPC streaming or Server-Sent Events (SSE).

Overview

The Thread API provides two methods for streaming real-time thread events: gRPC bidirectional streaming and REST API Server-Sent Events (SSE). Both methods deliver incremental message chunks, artifacts, and completion events as they occur during thread execution.

Choose the method that best fits your infrastructure:

  • gRPC streaming
  • SSE streaming

Server-Sent Events (SSE)

The SSE endpoint (GET /v2/threads/stream/{thread_id}) establishes a persistent HTTP connection that streams events as they occur. Open the stream connection before calling POST /v2/threads/{thread_id}/messages to capture all events.

How SSE streaming works

The server pushes events in this typical flow:

  1. message_work_in_progress logs may appear before or between chunks, indicating background processing.
  2. message_chunk events arrive incrementally as the AI generates its response. Each chunk contains the partial text under message_chunk.message and a final_chunk boolean flag.
  3. When final_chunk: true arrives, the complete message is ready.
  4. message events signal that a complete message has been assembled.
  5. task_status or execution_status events report progress. When status: "Complete" appears, the thread is idle.

Basic usage

1import os
2import requests
3
4BASE_URL = os.getenv("ALFA_API_BASE_URL", "https://sandbox.api.boosted.ai")
5ACCESS_TOKEN = os.environ["ALFA_ACCESS_TOKEN"]
6
7def auth_headers():
8 return {"Authorization": f"Bearer {ACCESS_TOKEN}"}
9
10def send_message(thread_id: str, message: str) -> dict:
11 resp = requests.post(
12 f"{BASE_URL}/v2/threads/{thread_id}/messages",
13 headers=auth_headers(),
14 data={"message": message},
15 timeout=30,
16 )
17 resp.raise_for_status()
18 return resp.json()

If the SSE channel closes before completion, call GET /v2/threads/{thread_id}/progress to check status. Issue DELETE /v2/threads/{thread_id}/task when a user manually stops work.

Complete SSE example

The following example demonstrates the full workflow: creating a thread, establishing an SSE stream, sending a message, and processing real-time events until completion.

The script uses Python’s threading module to establish the SSE stream before sending the message, ensuring all events are captured:

complete_sse_example.py
1import json
2import os
3import sys
4import threading
5import time
6import requests
7
8BASE_URL = "https://sandbox.api.boosted.ai"
9ACCESS_TOKEN = os.environ.get("ALFA_ACCESS_TOKEN", "your-access-token-here")
10
11stream_connected = threading.Event()
12stream_error = threading.Event()
13
14def create_thread(thread_name):
15 url = f"{BASE_URL}/v2/threads"
16 headers = {
17 "Authorization": f"Bearer {ACCESS_TOKEN}",
18 "Content-Type": "application/json"
19 }
20 data = {"thread_name": thread_name}
21
22 response = requests.post(url, headers=headers, json=data)
23 response.raise_for_status()
24 return response.json()["thread_id"]
25
26def send_message(thread_id, message):
27 url = f"{BASE_URL}/v2/threads/{thread_id}/messages"
28 headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
29 data = {"message": message}
30
31 print(f"\nSending message...")
32 response = requests.post(url, headers=headers, data=data)
33 response.raise_for_status()
34 print("Message sent successfully!")
35 return response.json()
36
37def stream_thread_events(thread_id):
38 url = f"{BASE_URL}/v2/threads/stream/{thread_id}"
39 headers = {
40 "Authorization": f"Bearer {ACCESS_TOKEN}",
41 "Accept": "text/event-stream",
42 "Cache-Control": "no-cache"
43 }
44
45 try:
46 print("\nConnecting to event stream...")
47 response = requests.get(url, headers=headers, stream=True, timeout=None)
48 response.raise_for_status()
49
50 print("Connected to stream, waiting for events...\n")
51 stream_connected.set()
52
53 for line in response.iter_lines(decode_unicode=True):
54 if line:
55 if line.startswith(':'):
56 continue
57
58 if line.startswith('data: '):
59 data = line[6:]
60 try:
61 event_data = json.loads(data)
62 event_type = event_data.get('event', {}).get('event_type', 'unknown')
63
64 if event_type == 'message_chunk':
65 chunk = event_data['event'].get('message_chunk', {})
66 sys.stdout.write(chunk.get('message', ''))
67 sys.stdout.flush()
68
69 if event_data['event'].get('final_chunk'):
70 print("\n-- final chunk --")
71 elif event_type == 'message':
72 print(f"\n\nComplete message received")
73 elif event_type == 'task_status' or event_type == 'execution_status':
74 status = event_data['event'].get('status', '')
75 if status == 'Complete':
76 print(f"\n\nTask completed!")
77 break
78 else:
79 print(f"\n[{event_type}]")
80 except json.JSONDecodeError:
81 print(f"Data: {data}")
82 elif line.startswith('event: '):
83 event_type = line[7:]
84 if event_type == 'close':
85 print("\n\nStream closed by server")
86 break
87
88 except requests.exceptions.RequestException as e:
89 print(f"\nStream error: {e}")
90 stream_error.set()
91 except Exception as e:
92 print(f"\nUnexpected error: {e}")
93 stream_error.set()
94
95def main():
96 try:
97 print("Creating a new thread...")
98 thread_id = create_thread("Research Thread")
99 print(f"Thread created with ID: {thread_id}")
100
101 message = input("\nEnter your message: ")
102
103 stream_thread = threading.Thread(
104 target=stream_thread_events,
105 args=(thread_id,),
106 daemon=True
107 )
108 stream_thread.start()
109
110 if not stream_connected.wait(timeout=10):
111 print("Failed to connect to stream within 10 seconds")
112 return
113
114 if stream_error.is_set():
115 print("Stream connection failed")
116 return
117
118 time.sleep(0.5)
119
120 send_message(thread_id, message)
121
122 stream_thread.join(timeout=120)
123
124 if stream_thread.is_alive():
125 print("\n\nStream still active after 120 seconds, exiting...")
126
127 except requests.exceptions.RequestException as e:
128 print(f"Error: {e}")
129 sys.exit(1)
130 except KeyboardInterrupt:
131 print("\n\nInterrupted by user")
132 sys.exit(0)
133
134if __name__ == "__main__":
135 if ACCESS_TOKEN == "your-access-token-here":
136 print("Please set your ALFA_ACCESS_TOKEN environment variable before running.")
137 sys.exit(1)
138 main()

Set the ALFA_ACCESS_TOKEN environment variable with your OAuth2 access token obtained via token exchange. See the authorization guide for details. The threading synchronization ensures the SSE listener is ready before the message triggers any events, preventing lost chunks.

SSE event types

The stream emits several event types. Here are the most common:

Work in progress logs (background processing updates):

1{
2 "event": {
3 "event_type": "message_work_in_progress",
4 "log": "Analyzing financial statements..."
5 }
6}

Message chunks (incremental response text):

1{
2 "event": {
3 "event_type": "message_chunk",
4 "message_chunk": {
5 "message": "Tesla reported revenue of ",
6 "message_id": "msg_xyz789"
7 },
8 "final_chunk": false
9 }
10}

Final chunk (last piece of the message):

1{
2 "event": {
3 "event_type": "message_chunk",
4 "message_chunk": {
5 "message": "$25.2B in Q3 2024.",
6 "message_id": "msg_xyz789"
7 },
8 "final_chunk": true
9 }
10}

Complete message (with metadata and citations):

1{
2 "event": {
3 "event_type": "message",
4 "message": {
5 "message_id": "msg_xyz789",
6 "message": "Apple reported revenue of $124.3B in Q1 2025, beating analyst expectations.",
7 "message_metadata": {
8 "tools_called": {},
9 "citations": [
10 {
11 "citation_id": "cit_abc123",
12 "citation_type": "web",
13 "name": "Apple Q1 2025 Revenue Report",
14 "summary": "Apple exceeded analyst expectations with record Q1 revenue.",
15 "last_updated_at": "2026-02-19T10:00:00Z",
16 "link": "https://example.com/aapl-q1-results",
17 "last_fetched_at": "2026-02-19T09:30:00Z",
18 "snippet_highlight_start": 120,
19 "snippet_highlight_end": 350,
20 "source_type": "web"
21 }
22 ]
23 }
24 }
25 }
26}

Artifacts, when present, are JSON strings embedded in the message text (wrapped in triple backticks). Use GET /v2/threads/{thread_id}/artifacts/{artifact_id} to fetch the actual artifact data.

Task status (progress updates):

1{
2 "event": {
3 "event_type": "task_status",
4 "status": "Complete",
5 "log": "Analysis complete"
6 }
7}

The message text is located at event.message_chunk.message. When final_chunk: true appears, the complete message is ready. The client must parse any artifact markers from the message text to extract artifact_id values. The script continues listening until it receives a task_status event with status: "Complete", signaling that all processing has finished.

SSE citation types

Complete messages include a citations array in message_metadata. Each citation has a citation_type field that determines which additional fields are present. All citations share these common fields:

FieldTypeDescription
citation_idstringUnique identifier for the citation
citation_typestringOne of: web, news_development, text
namestringDisplay name of the source
summarystring | nullBrief summary of the cited content
last_updated_atdatetime | nullWhen the source was last updated
source_categorystring | nullCategory of the source
inline_offsetinteger | nullPosition offset in the message text
is_snippetbooleanWhether the citation references a text snippet

Web citation (citation_type: "web"):

References a web page with a direct link and snippet highlight positions.

1{
2 "citation_id": "cit_001",
3 "citation_type": "web",
4 "name": "Apple Q1 2025 Revenue Report",
5 "summary": "Apple exceeded analyst expectations with record Q1 revenue.",
6 "last_updated_at": "2026-02-19T10:00:00Z",
7 "link": "https://example.com/aapl-q1-results",
8 "last_fetched_at": "2026-02-19T09:30:00Z",
9 "snippet_highlight_start": 120,
10 "snippet_highlight_end": 350,
11 "source_type": "web"
12}

News development citation (citation_type: "news_development"):

References a news development that aggregates multiple related articles.

1{
2 "citation_id": "cit_002",
3 "citation_type": "news_development",
4 "name": "Apple Announces New MacBook Pro Lineup",
5 "summary": "Coverage of Apple's latest product announcement spanning 15 articles.",
6 "last_updated_at": "2026-02-18T14:00:00Z",
7 "num_articles": 15,
8 "development_start": "2026-02-15T08:00:00Z"
9}

Document citation (citation_type: "text"):

References a document such as an SEC filing, with snippet highlight positions.

1{
2 "citation_id": "cit_003",
3 "citation_type": "text",
4 "name": "AAPL 10-K Filing 2025",
5 "summary": "Annual report filing for Apple Inc.",
6 "last_updated_at": "2026-01-30T12:00:00Z",
7 "snippet_highlight_start": 500,
8 "snippet_highlight_end": 780,
9 "is_snippet": true
10}

gRPC bidirectional streaming

Protocol buffer definition

Download and compile the following protocol buffer file to generate client stubs:

thread_service.proto
1syntax = "proto3";
2
3package thread_service.v1;
4import "google/protobuf/wrappers.proto";
5import "google/protobuf/timestamp.proto";
6option java_package = "com.gbi.protobuf_defs.generated.thread_service.v1";
7option go_package = "github.com/GBI-Core/protobuf-defs/gobuild/thread_service/v1";
8
9service ThreadService {
10
11 /////////////////////////////////////////////////////////////////
12 // Thread API
13 /////////////////////////////////////////////////////////////////
14 // Streams thread events from multiple threads
15 rpc StreamThreadEvents(stream StreamThreadEventsRequest)
16 returns (stream StreamThreadEventsResponse);
17}
18
19/////////////////////////////////////////////////////////////////
20// Common messages
21/////////////////////////////////////////////////////////////////
22
23message UUID {
24 string id = 1;
25}
26
27message Citation {
28 message WebCitation {
29 string link = 1;
30 google.protobuf.Timestamp last_fetched_at = 2;
31 int64 snippet_highlight_start = 3;
32 int64 snippet_highlight_end = 4;
33 string source_type = 5;
34 }
35 message NewsDevelopmentCitation {
36 int64 num_articles = 1;
37 google.protobuf.Timestamp development_start = 2;
38 }
39 message NewsArticleCitation {
40 string link = 1;
41 UUID article_id = 2;
42 }
43 message DocumentCitation {
44 int64 snippet_highlight_start = 1;
45 int64 snippet_highlight_end = 2;
46 bool is_snippet = 3;
47 }
48 message FiscalQuarterEarningsCitation {
49 int64 snippet_highlight_start = 1;
50 int64 snippet_highlight_end = 2;
51 bool is_snippet = 3;
52 google.protobuf.Timestamp call_datetime = 4;
53 google.protobuf.Timestamp report_datetime = 5;
54 google.protobuf.Int32Value fiscal_year = 6;
55 google.protobuf.Int32Value fiscal_quarter = 7;
56 }
57 message VariableCitation {
58 enum VariablePeriodicity {
59 VARIABLE_PERIODICITY_UNSPECIFIED = 0;
60 VARIABLE_PERIODICITY_DAILY = 1;
61 VARIABLE_PERIODICITY_MONTHLY = 2;
62 VARIABLE_PERIODICITY_QUARTERLY = 3;
63 VARIABLE_PERIODICITY_YEARLY = 4;
64 }
65
66 message FunctionMetadata {
67 string name = 1;
68 string description = 2;
69 }
70
71 message VariableMetadata {
72 string name = 1;
73 string description = 2;
74 string var_id = 3;
75 }
76
77 string description = 1;
78 string data_provider = 2;
79 google.protobuf.Timestamp as_of_timestamp = 3;
80 google.protobuf.Timestamp date_range_start = 4;
81 google.protobuf.Timestamp date_range_end = 5;
82 string period_start = 6;
83 string period_end = 7;
84 string formula = 8;
85 repeated FunctionMetadata functions_used = 9;
86 repeated VariableMetadata components_used = 10;
87 VariablePeriodicity variable_periodicity = 11;
88 }
89
90 UUID citation_id = 1;
91 string name = 2;
92 string citation_type = 3;
93 google.protobuf.Timestamp last_updated_at = 4;
94 string summary = 8;
95
96 oneof citation {
97 WebCitation web_citation = 5;
98 NewsDevelopmentCitation news_development_citation = 6;
99 DocumentCitation document_citation = 7;
100 NewsArticleCitation news_article_citation = 9;
101 FiscalQuarterEarningsCitation fiscal_quarter_earnings_citation = 10;
102 VariableCitation variable_citation = 11;
103 }
104}
105
106message ThreadData {
107 enum ErrorType {
108 ERROR_TYPE_UNKNOWN = 0;
109 ERROR_TYPE_INTERNAL = 1;
110 }
111
112 message MessageEvent {
113 UUID message_id = 1;
114 string message = 2;
115 bool is_user_message = 3;
116 google.protobuf.Timestamp message_time = 4;
117 repeated Citation citations = 5;
118 }
119
120 message MessageChunkEvent {
121 UUID message_id = 1;
122 string message = 2;
123 bool final_chunk = 3;
124 repeated Citation citations = 4;
125 }
126
127 message WorkInProgressEvent {
128 string log = 1;
129 }
130
131 message ThreadNameEvent {
132 string thread_name = 1;
133 }
134
135 message WorkerErrorEvent {
136 ErrorType error_type = 1;
137 }
138
139 message MessageCancelledEvent {
140 string cancellation_message = 1;
141 }
142
143 oneof payload {
144 MessageEvent message_event = 1;
145 MessageChunkEvent message_chunk_event = 2;
146 WorkInProgressEvent work_in_progress_event = 3;
147 ThreadNameEvent thread_name_event = 4;
148 WorkerErrorEvent worker_error_event = 5;
149 MessageCancelledEvent message_cancelled_event = 6;
150 }
151}
152
153/////////////////////////////////////////////////////////////////
154// REQUEST/RESPONSE MESSAGES
155/////////////////////////////////////////////////////////////////
156
157message StreamThreadEventsRequest {
158 // The token for the user that is requesting access to the thread
159 string user_token = 1;
160 // The thread id the user is requesting access to
161 UUID thread_id = 2;
162 bool subscribe = 3;
163}
164
165message StreamThreadEventsResponse {
166 // The user(s) that requested access to the thread
167 repeated UUID user_ids = 1;
168 // The thread id the data corresponds to
169 UUID thread_id = 2;
170 // The data for the thread
171 ThreadData thread_data = 3;
172}

Building the client

Generate client code from the proto file using the gRPC tools for your language.

Python:

$python -m grpc_tools.protoc \
> -I./proto \
> --python_out=. \
> --grpc_python_out=. \
> --pyi_out=. \
> <path_to_proto_file>

This generates:

  • thread_service_pb2.py: Protocol buffer message classes
  • thread_service_pb2_grpc.py: gRPC service stub classes
  • thread_service_pb2.pyi: Type stubs for type checking

Go:

$protoc \
> -I./proto \
> --go_out=. \
> --go_opt=paths=source_relative \
> --go-grpc_out=. \
> --go-grpc_opt=paths=source_relative \
> <path_to_proto_file>

This generates Go code locally. Update the go_package option in the proto file to match your module path, or import the generated code using the path specified in go_package.

gRPC connection setup

Establish a secure gRPC channel with keepalive settings to maintain the connection:

1import grpc
2
3GRPC_SERVER = "sandbox.api.boosted.ai:443"
4SERVICE_ACCOUNT_TOKEN = "<YOUR SERVICE ACCOUNT TOKEN>"
5
6# Create secure channel with keepalive options
7credentials = grpc.ssl_channel_credentials()
8options = [
9 ('grpc.keepalive_time_ms', 10000), # Send keepalive ping every 10 seconds
10 ('grpc.keepalive_timeout_ms', 5000), # Wait 5 seconds for ping ack
11 ('grpc.keepalive_permit_without_calls', True), # Allow keepalive pings when no calls
12 ('grpc.http2.max_pings_without_data', 0), # No limit on pings without data
13 ('grpc.http2.min_time_between_pings_ms', 10000), # Min 10 seconds between pings
14 ('grpc.http2.min_ping_interval_without_data_ms', 5000), # Min 5 seconds between pings without data
15]
16channel = grpc.secure_channel(GRPC_SERVER, credentials, options=options)
17stub = thread_service_pb2_grpc.ThreadServiceStub(channel)
18metadata = [("authorization", f"Bearer {SERVICE_ACCOUNT_TOKEN}")]

The keepalive settings ensure the connection remains active even during periods of inactivity, preventing timeouts.

Complete gRPC example

The following example demonstrates subscribing to thread events via gRPC streaming:

grpc_streaming_example.py
1import time
2from collections.abc import Iterator
3
4import grpc
5
6from thread_service.v1 import thread_service_pb2, thread_service_pb2_grpc
7
8
9def test_streaming():
10 # Configuration - update these
11 GRPC_SERVER = "sandbox.api.boosted.ai:443"
12 SERVICE_ACCOUNT_TOKEN = "<YOUR SERVICE ACCOUNT TOKEN>"
13 THREAD_ID = "<YOUR THREAD ID>"
14 USER_TOKEN = "<YOUR USER TOKEN>"
15
16 # Connect
17 credentials = grpc.ssl_channel_credentials()
18 options = [
19 ('grpc.keepalive_time_ms', 10000),
20 ('grpc.keepalive_timeout_ms', 5000),
21 ('grpc.keepalive_permit_without_calls', True),
22 ('grpc.http2.max_pings_without_data', 0),
23 ('grpc.http2.min_time_between_pings_ms', 10000),
24 ('grpc.http2.min_ping_interval_without_data_ms', 5000),
25 ]
26 channel = grpc.secure_channel(GRPC_SERVER, credentials, options=options)
27 stub = thread_service_pb2_grpc.ThreadServiceStub(channel)
28 metadata = [("authorization", f"Bearer {SERVICE_ACCOUNT_TOKEN}")]
29
30 print(f"Connecting to {GRPC_SERVER}")
31
32 def request_generator():
33 """Send subscription request."""
34 yield thread_service_pb2.StreamThreadEventsRequest(
35 thread_id=thread_service_pb2.UUID(id=THREAD_ID),
36 user_token=USER_TOKEN,
37 subscribe=True,
38 )
39 # Keep stream alive
40 while True:
41 time.sleep(60)
42
43 try:
44 # Start streaming
45 stream: Iterator[thread_service_pb2.StreamThreadEventsResponse] = stub.StreamThreadEvents(
46 request_generator(), metadata=metadata
47 )
48 print(f"Subscribed to thread {THREAD_ID}")
49 print("Listening for events... (Ctrl+C to stop)")
50
51 # Receive events
52 for response in stream:
53 print("\nEvent received!")
54 print(f" Thread: {response.thread_id.id}")
55 print(f" Users: {[u.id for u in response.user_ids]}")
56 print(f" Data: {response.thread_data}")
57
58 except KeyboardInterrupt:
59 print("\nStopped")
60 except grpc.RpcError as e:
61 print(f"Error: {e.code()}: {e.details()}")
62 finally:
63 channel.close()
64
65
66if __name__ == "__main__":
67 test_streaming()

How gRPC streaming works

  1. Create a secure channel with SSL credentials and keepalive options to maintain the connection.
  2. Create a request generator that yields the initial subscription request with thread_id, user_token, and subscribe=True. The generator continues running to keep the stream alive.
  3. Call the streaming RPC using the generated stub, passing the request generator and authentication metadata.
  4. Iterate over responses as they arrive. Each StreamThreadEventsResponse contains:
    • thread_id: The thread the event belongs to
    • user_ids: List of user IDs that requested access
    • thread_data: A ThreadData message containing the event payload

The thread_data field is a ThreadData message with a oneof payload that can be one of:

  • message_event: A complete message with citations
  • message_chunk_event: An incremental message chunk with optional citations (check final_chunk for completion)
  • work_in_progress_event: Background processing status
  • thread_name_event: Thread name updates
  • worker_error_event: Error notifications
  • message_cancelled_event: Cancellation confirmations

The request generator must continue yielding or sleeping to keep the bidirectional stream alive. If the generator exits, the stream will close.

gRPC citation types

Complete messages and message chunks both include a citations array. Each citation carries common metadata plus a type-specific object determined by citation_type:

FieldTypeDescription
citation_idUUIDUnique identifier for the citation
namestringDisplay name of the source
citation_typestringOne of: web, news_development, news_article, document, fiscal_quarter_earnings, variable
last_updated_atTimestampWhen the source was last updated
summarystringBrief summary of the cited content

Web citation (web_citation):

References a web page with a direct link and snippet highlight positions.

1{
2 "citationId": {"id": "cit-001"},
3 "name": "Apple Q1 2025 Revenue Report",
4 "citationType": "web",
5 "lastUpdatedAt": "2026-02-19T10:00:00Z",
6 "summary": "Apple exceeded analyst expectations with record Q1 revenue.",
7 "webCitation": {
8 "link": "https://example.com/aapl-q1-results",
9 "lastFetchedAt": "2026-02-19T09:30:00Z",
10 "snippetHighlightStart": 120,
11 "snippetHighlightEnd": 350,
12 "sourceType": "web"
13 }
14}

News development citation (news_development_citation):

References a news development that aggregates multiple related articles.

1{
2 "citationId": {"id": "cit-002"},
3 "name": "Apple Announces New MacBook Pro Lineup",
4 "citationType": "news_development",
5 "lastUpdatedAt": "2026-02-18T14:00:00Z",
6 "summary": "Coverage of Apple's latest product announcement spanning 15 articles.",
7 "newsDevelopmentCitation": {
8 "numArticles": 15,
9 "developmentStart": "2026-02-15T08:00:00Z"
10 }
11}

News article citation (news_article_citation):

References a specific news article with a direct link and article identifier.

1{
2 "citationId": {"id": "cit-003"},
3 "name": "Apple Q1 Results Beat Expectations",
4 "citationType": "news_article",
5 "lastUpdatedAt": "2026-02-17T16:30:00Z",
6 "summary": "Apple exceeded analyst expectations for Q1 2026 earnings.",
7 "newsArticleCitation": {
8 "link": "https://example.com/aapl-q1-beat",
9 "articleId": {"id": "art-456"}
10 }
11}

Document citation (document_citation):

References a document such as an SEC filing, with snippet highlight positions.

1{
2 "citationId": {"id": "cit-004"},
3 "name": "AAPL 10-K Filing 2025",
4 "citationType": "document",
5 "lastUpdatedAt": "2026-01-30T12:00:00Z",
6 "summary": "Annual report filing for Apple Inc.",
7 "documentCitation": {
8 "snippetHighlightStart": 500,
9 "snippetHighlightEnd": 780,
10 "isSnippet": true
11 }
12}

Fiscal quarter earnings citation (fiscal_quarter_earnings_citation):

References an earnings call transcript or report for a specific fiscal quarter.

1{
2 "citationId": {"id": "cit-005"},
3 "name": "AAPL Q4 2025 Earnings Call",
4 "citationType": "fiscal_quarter_earnings",
5 "lastUpdatedAt": "2026-01-28T18:00:00Z",
6 "summary": "Apple Q4 2025 earnings call transcript and financial report.",
7 "fiscalQuarterEarningsCitation": {
8 "snippetHighlightStart": 200,
9 "snippetHighlightEnd": 450,
10 "isSnippet": true,
11 "callDatetime": "2025-10-30T17:00:00Z",
12 "reportDatetime": "2025-10-30T16:30:00Z",
13 "fiscalYear": 2025,
14 "fiscalQuarter": 4
15 }
16}

Variable citation (variable_citation):

References a computed financial variable or metric, including its formula, data provider, time range, and periodicity.

1{
2 "citationId": {"id": "cit-006"},
3 "name": "AAPL Revenue Growth",
4 "citationType": "variable",
5 "lastUpdatedAt": "2026-02-19T08:00:00Z",
6 "summary": "Year-over-year revenue growth for Apple Inc.",
7 "variableCitation": {
8 "description": "Revenue growth rate",
9 "dataProvider": "financial_data",
10 "asOfTimestamp": "2026-02-19T00:00:00Z",
11 "dateRangeStart": "2025-02-19T00:00:00Z",
12 "dateRangeEnd": "2026-02-19T00:00:00Z",
13 "periodStart": "Q1 2025",
14 "periodEnd": "Q4 2025",
15 "formula": "revenue_growth(AAPL)",
16 "functionsUsed": [
17 {"name": "revenue_growth", "description": "Calculates YoY revenue growth"}
18 ],
19 "componentsUsed": [
20 {"name": "Revenue", "description": "Total revenue", "varId": "var-rev-001"}
21 ],
22 "variablePeriodicity": "VARIABLE_PERIODICITY_QUARTERLY"
23 }
24}

Authentication

gRPC streaming uses Bearer token authentication via metadata. Include your service account token in the metadata:

1SERVICE_ACCOUNT_TOKEN = "<YOUR SERVICE ACCOUNT TOKEN>"
2metadata = [("authorization", f"Bearer {SERVICE_ACCOUNT_TOKEN}")]
3stream = stub.StreamThreadEvents(request_generator(), metadata=metadata)

Obtain your service account token from your Boosted account. See the authorization guide for details. Ensure your token has permission to access the specified thread.