Agent Message System
The agent message system is Vertesia's real-time communication layer between agent workflows and client applications. It handles message creation, delivery, streaming, and rendering through a well-defined pipeline.
Use the ModernAgentConversation component for building agent UIs. It handles streaming, reconnection, plan visualization, file uploads, and message rendering out of the box. See Embedding the Conversation UI below.
Architecture Overview
Messages flow through the following pipeline:
User Input --> Signal (UserInput) --> Temporal Workflow
|
Agent/Tool Execution
|
postUpdateMessage activity
|
POST /runs/:runId/updates
|
Redis (list + pub/sub)
|
+--------------------------------------+
| Phase 1: GET /updates (gzip) |
| Phase 2: SSE /stream (real-time) |
+--------------------------------------+
|
ModernAgentConversation UI
Key participants:
- Temporal Workflows orchestrate agent execution and tool calls
- Redis stores message history (up to 1000 messages, 90-day TTL) and delivers real-time updates via pub/sub
- zeno-server exposes REST and SSE endpoints for message retrieval and streaming
@vertesia/clientprovides theWorkflowsApithat handles both historical fetch and real-time SSE@vertesia/uiprovidesModernAgentConversation, the recommended UI component
Message Types
Every agent message has a type field from the AgentMessageType enum (@vertesia/common):
| Type | Value | Description |
|---|---|---|
SYSTEM | 0 | Internal system messages (e.g., file processing status) |
THOUGHT | 1 | Agent reasoning and intermediate thinking |
PLAN | 2 | Plan proposals with structured tasks |
UPDATE | 3 | Progress updates during execution |
COMPLETE | 4 | Task or workstream completion |
WARNING | 5 | Non-fatal warnings |
ERROR | 6 | Error messages |
ANSWER | 7 | Final answers to the user |
QUESTION | 8 | User messages (displayed as questions in the conversation) |
REQUEST_INPUT | 9 | Agent requests user input, optionally with structured UX options |
IDLE | 10 | Agent is idle, waiting for input |
TERMINATED | 11 | Agent has been terminated |
STREAMING_CHUNK | 12 | Real-time LLM token streaming |
BATCH_PROGRESS | 13 | Progress updates for batch tool executions |
Message Formats
The system uses two message formats for different purposes:
CompactMessage-- the wire format, optimized for bandwidth (~85% smaller). Used between server and client over SSE/WebSocket.AgentMessage-- the UI format, with readable field names. Used by UI components and application code.
The client SDK converts wire messages to AgentMessage automatically, so application code always works with the friendlier format.
CompactMessage (wire format)
Used over the wire between server and client. You should not need to work with this directly.
interface CompactMessage {
t: AgentMessageType; // Message type
m?: string; // Message content
w?: string; // Workstream ID (omitted when "main")
d?: unknown; // Type-specific details
f?: 0 | 1; // Is final chunk (streaming only)
ts?: number; // Timestamp
i?: string; // Activity ID (for deduplication)
}
AgentMessage (UI format)
The format used by UI components and application code. This is what you receive in streamMessages callbacks and what ModernAgentConversation works with internally.
interface AgentMessage {
timestamp: number;
workflow_run_id: string;
type: AgentMessageType;
message: string;
details?: any;
workstream_id?: string;
}
Conversion Utilities
The @vertesia/common package provides utilities for working with both formats:
parseMessage(data)-- Accepts string or object, returnsCompactMessageregardless of input formattoAgentMessage(compact, runId)-- Converts wire format to UI format (done automatically by the client SDK)toCompactMessage(msg)-- Converts UI format to wire formatcreateCompactMessage(type, options)-- Convenience constructor for server-side code
Real-Time Streaming
The client uses a two-phase streaming strategy for optimal performance:
-
Phase 1 -- Historical fetch:
GET /runs/{workflowId}/{runId}/updatesreturns all past messages as gzip-compressed JSON. This is efficient because HTTP responses support compression while SSE streams do not. -
Phase 2 -- Real-time SSE:
GET /runs/{workflowId}/{runId}/stream?skipHistory=trueopens an SSE connection for live updates only. Thesinceparameter ensures no messages are lost between phases.
Reconnection is handled automatically with exponential backoff (1s base, 30s max, 10% jitter, up to 10 attempts).
LLM Token Streaming
During LLM generation, tokens are batched into STREAMING_CHUNK messages at 16ms intervals or 200 characters before being published to Redis. When the final THOUGHT or ANSWER message arrives with a matching activity_id, streaming chunks are replaced by the final message to avoid duplication.
Workstreams
Messages carry a workstream_id (defaults to "main"). This enables parallel agent execution where multiple workstreams run concurrently. The conversation only closes when the main workstream sends COMPLETE or TERMINATED -- other workstreams can complete independently.
Embedding the Conversation UI
The ModernAgentConversation component from @vertesia/ui is the recommended way to display agent conversations in your application. It handles all the complexity of message streaming, reconnection, plan visualization, file uploads, and message rendering.
Do not implement raw message streaming yourself. The ModernAgentConversation component handles streaming, chunk aggregation, deduplication, reconnection, plan extraction, optimistic updates, and many edge cases that are difficult to replicate correctly.
Basic Usage
Display an existing agent conversation:
import { ModernAgentConversation } from "@vertesia/ui/features";
function MyAgentView({ workflowId, runId }) {
return (
);
}
Starting a New Conversation
Provide a startWorkflow callback to let users initiate conversations:
import { ModernAgentConversation } from "@vertesia/ui/features";
import { useUserSession } from "@vertesia/ui/session";
function MyAgentView() {
const { client } = useUserSession();
const startWorkflow = async (message?: string) => {
const run = await client.interactions.executeAsync({
type: "conversation",
interaction: "MyAgent",
prompt_data: { task: message },
interactive: true,
});
return {
run_id: run.runId,
workflow_id: run.workflowId,
};
};
return (
);
}
Component Properties
- Name
run- Type
- AsyncExecutionResult | { workflow_id, run_id }
- Description
An existing workflow run to display. When provided, the component connects to the message stream and renders the conversation.
- Name
startWorkflow- Type
- (message?: string) => Promise<{ run_id, workflow_id }>
- Description
Callback to start a new workflow. When provided without
run, the component renders a start view with a message input. The returned IDs are used to connect to the new conversation.
- Name
interactive- Type
- boolean
- Modifier
- default: true
- Description
Whether users can send messages to the agent. When
false, user input is only shown when the agent sends aREQUEST_INPUTmessage.
- Name
title- Type
- string
- Modifier
- optional
- Description
Title displayed in the conversation header. Defaults to the workflow ID.
- Name
placeholder- Type
- string
- Modifier
- default: 'Type your message...'
- Description
Placeholder text for the message input field.
- Name
startButtonText- Type
- string
- Modifier
- default: 'Start Agent'
- Description
Label for the button that starts a new conversation.
- Name
initialMessage- Type
- string
- Modifier
- optional
- Description
An initial message displayed above the start view to give context to the user.
- Name
onClose- Type
- () => void
- Modifier
- optional
- Description
Called when the user clicks the close button.
- Name
isModal- Type
- boolean
- Modifier
- default: false
- Description
Adjusts layout for modal display (narrower max-width, different close button placement).
- Name
fullWidth- Type
- boolean
- Modifier
- default: false
- Description
When
true, the conversation area uses full width instead of a centered max-width layout.
- Name
resetWorkflow- Type
- () => void
- Modifier
- optional
- Description
Callback to reset the workflow. When provided, a reset button is shown in the header.
- Name
hideUserInput- Type
- boolean
- Modifier
- optional
- Description
Completely hide the user input area.
File Upload Support
The component supports drag-and-drop and button-based file uploads. Files are uploaded to the workflow's artifact storage and the agent is signaled via FileUploaded.
- Name
onFilesSelected- Type
- (files: File[]) => void
- Modifier
- optional
- Description
Custom handler for file selection. If omitted, the component handles uploads internally using artifact storage.
- Name
uploadedFiles- Type
- UploadedFile[]
- Modifier
- optional
- Description
External file upload state to display in the input area.
- Name
onRemoveFile- Type
- (fileId: string) => void
- Modifier
- optional
- Description
Called when the user removes an uploaded file.
- Name
acceptedFileTypes- Type
- string
- Modifier
- default: '.pdf,.doc,...'
- Description
Accepted MIME types / extensions for file uploads.
- Name
maxFiles- Type
- number
- Modifier
- default: 5
- Description
Maximum number of files that can be uploaded simultaneously.
- Name
isUploading- Type
- boolean
- Modifier
- default: false
- Description
Disables the send/start buttons while files are uploading.
Document Search Integration
You can integrate a custom document search UI using a render prop:
(
)}
selectedDocuments={selectedDocs}
onRemoveDocument={(docId) => removeDoc(docId)}
/>
Context and Attachments
- Name
getAttachedDocs- Type
- () => string[]
- Modifier
- optional
- Description
Returns document IDs to include as
store:references in messages.
- Name
onAttachmentsSent- Type
- () => void
- Modifier
- optional
- Description
Called after attachments are sent, allowing you to clear the attachment list.
- Name
getMessageContext- Type
- () => Record<string, unknown>
- Modifier
- optional
- Description
Returns additional metadata to include in every user signal. Useful for passing context like entity IDs.
Fusion Fragment Support
When building data-driven applications, you can provide data for fusion-fragment code blocks in agent responses:
Styling
- Name
inputContainerClassName- Type
- string
- Modifier
- optional
- Description
Additional Tailwind classes for the input container.
- Name
inputClassName- Type
- string
- Modifier
- optional
- Description
Additional Tailwind classes for the input field.
SDK-Only Integration
If you cannot use the React component (e.g., in a Node.js backend or non-React frontend), use the WorkflowsApi directly from @vertesia/client. See the SDK page for examples of streamMessages and sendSignal.
The SDK approach requires you to handle message parsing, deduplication, streaming chunk aggregation, and reconnection yourself. Prefer the ModernAgentConversation component whenever possible.
Key Source Files
| Component | Package | Path |
|---|---|---|
| Message types | @vertesia/common | composableai/packages/common/src/store/workflow.ts |
| Conversation state | @vertesia/common | composableai/packages/common/src/store/conversation-state.ts |
| Signals | @vertesia/common | composableai/packages/common/src/store/signals.ts |
| Client streaming API | @vertesia/client | composableai/packages/client/src/store/WorkflowsApi.ts |
| Conversation component | @vertesia/ui | composableai/packages/ui/src/features/agent/chat/ModernAgentConversation.tsx |
