Agent Message System

The agent message system is Vertesia's real-time communication layer between agent workflows and client applications. It handles message creation, delivery, streaming, and rendering through a well-defined pipeline.

Architecture Overview

Messages flow through the following pipeline:

User Input --> Signal (UserInput) --> Temporal Workflow
                                          |
                                  Agent/Tool Execution
                                          |
                              postUpdateMessage activity
                                          |
                          POST /runs/:runId/updates
                                          |
                             Redis (list + pub/sub)
                                          |
                  +--------------------------------------+
                  | Phase 1: GET /updates (gzip)         |
                  | Phase 2: SSE /stream (real-time)     |
                  +--------------------------------------+
                                          |
                       ModernAgentConversation UI

Key participants:

  • Temporal Workflows orchestrate agent execution and tool calls
  • Redis stores message history (up to 1000 messages, 90-day TTL) and delivers real-time updates via pub/sub
  • zeno-server exposes REST and SSE endpoints for message retrieval and streaming
  • @vertesia/client provides the WorkflowsApi that handles both historical fetch and real-time SSE
  • @vertesia/ui provides ModernAgentConversation, the recommended UI component

Message Types

Every agent message has a type field from the AgentMessageType enum (@vertesia/common):

TypeValueDescription
SYSTEM0Internal system messages (e.g., file processing status)
THOUGHT1Agent reasoning and intermediate thinking
PLAN2Plan proposals with structured tasks
UPDATE3Progress updates during execution
COMPLETE4Task or workstream completion
WARNING5Non-fatal warnings
ERROR6Error messages
ANSWER7Final answers to the user
QUESTION8User messages (displayed as questions in the conversation)
REQUEST_INPUT9Agent requests user input, optionally with structured UX options
IDLE10Agent is idle, waiting for input
TERMINATED11Agent has been terminated
STREAMING_CHUNK12Real-time LLM token streaming
BATCH_PROGRESS13Progress updates for batch tool executions

Message Formats

The system uses two message formats for different purposes:

  • CompactMessage -- the wire format, optimized for bandwidth (~85% smaller). Used between server and client over SSE/WebSocket.
  • AgentMessage -- the UI format, with readable field names. Used by UI components and application code.

The client SDK converts wire messages to AgentMessage automatically, so application code always works with the friendlier format.

CompactMessage (wire format)

Used over the wire between server and client. You should not need to work with this directly.

interface CompactMessage {
    t: AgentMessageType;  // Message type
    m?: string;           // Message content
    w?: string;           // Workstream ID (omitted when "main")
    d?: unknown;          // Type-specific details
    f?: 0 | 1;           // Is final chunk (streaming only)
    ts?: number;          // Timestamp
    i?: string;           // Activity ID (for deduplication)
}

AgentMessage (UI format)

The format used by UI components and application code. This is what you receive in streamMessages callbacks and what ModernAgentConversation works with internally.

interface AgentMessage {
    timestamp: number;
    workflow_run_id: string;
    type: AgentMessageType;
    message: string;
    details?: any;
    workstream_id?: string;
}

Conversion Utilities

The @vertesia/common package provides utilities for working with both formats:

  • parseMessage(data) -- Accepts string or object, returns CompactMessage regardless of input format
  • toAgentMessage(compact, runId) -- Converts wire format to UI format (done automatically by the client SDK)
  • toCompactMessage(msg) -- Converts UI format to wire format
  • createCompactMessage(type, options) -- Convenience constructor for server-side code

Real-Time Streaming

The client uses a two-phase streaming strategy for optimal performance:

  1. Phase 1 -- Historical fetch: GET /runs/{workflowId}/{runId}/updates returns all past messages as gzip-compressed JSON. This is efficient because HTTP responses support compression while SSE streams do not.

  2. Phase 2 -- Real-time SSE: GET /runs/{workflowId}/{runId}/stream?skipHistory=true opens an SSE connection for live updates only. The since parameter ensures no messages are lost between phases.

Reconnection is handled automatically with exponential backoff (1s base, 30s max, 10% jitter, up to 10 attempts).

LLM Token Streaming

During LLM generation, tokens are batched into STREAMING_CHUNK messages at 16ms intervals or 200 characters before being published to Redis. When the final THOUGHT or ANSWER message arrives with a matching activity_id, streaming chunks are replaced by the final message to avoid duplication.

Workstreams

Messages carry a workstream_id (defaults to "main"). This enables parallel agent execution where multiple workstreams run concurrently. The conversation only closes when the main workstream sends COMPLETE or TERMINATED -- other workstreams can complete independently.

Embedding the Conversation UI

The ModernAgentConversation component from @vertesia/ui is the recommended way to display agent conversations in your application. It handles all the complexity of message streaming, reconnection, plan visualization, file uploads, and message rendering.

Basic Usage

Display an existing agent conversation:

import { ModernAgentConversation } from "@vertesia/ui/features";

function MyAgentView({ workflowId, runId }) {
    return (
        
    );
}

Starting a New Conversation

Provide a startWorkflow callback to let users initiate conversations:

import { ModernAgentConversation } from "@vertesia/ui/features";
import { useUserSession } from "@vertesia/ui/session";

function MyAgentView() {
    const { client } = useUserSession();

    const startWorkflow = async (message?: string) => {
        const run = await client.interactions.executeAsync({
            type: "conversation",
            interaction: "MyAgent",
            prompt_data: { task: message },
            interactive: true,
        });
        return {
            run_id: run.runId,
            workflow_id: run.workflowId,
        };
    };

    return (
        
    );
}

Component Properties

  • Name
    run
    Type
    AsyncExecutionResult | { workflow_id, run_id }
    Description

    An existing workflow run to display. When provided, the component connects to the message stream and renders the conversation.

  • Name
    startWorkflow
    Type
    (message?: string) => Promise<{ run_id, workflow_id }>
    Description

    Callback to start a new workflow. When provided without run, the component renders a start view with a message input. The returned IDs are used to connect to the new conversation.

  • Name
    interactive
    Type
    boolean
    Modifier
    default: true
    Description

    Whether users can send messages to the agent. When false, user input is only shown when the agent sends a REQUEST_INPUT message.

  • Name
    title
    Type
    string
    Modifier
    optional
    Description

    Title displayed in the conversation header. Defaults to the workflow ID.

  • Name
    placeholder
    Type
    string
    Modifier
    default: 'Type your message...'
    Description

    Placeholder text for the message input field.

  • Name
    startButtonText
    Type
    string
    Modifier
    default: 'Start Agent'
    Description

    Label for the button that starts a new conversation.

  • Name
    initialMessage
    Type
    string
    Modifier
    optional
    Description

    An initial message displayed above the start view to give context to the user.

  • Name
    onClose
    Type
    () => void
    Modifier
    optional
    Description

    Called when the user clicks the close button.

  • Name
    isModal
    Type
    boolean
    Modifier
    default: false
    Description

    Adjusts layout for modal display (narrower max-width, different close button placement).

  • Name
    fullWidth
    Type
    boolean
    Modifier
    default: false
    Description

    When true, the conversation area uses full width instead of a centered max-width layout.

  • Name
    resetWorkflow
    Type
    () => void
    Modifier
    optional
    Description

    Callback to reset the workflow. When provided, a reset button is shown in the header.

  • Name
    hideUserInput
    Type
    boolean
    Modifier
    optional
    Description

    Completely hide the user input area.

File Upload Support

The component supports drag-and-drop and button-based file uploads. Files are uploaded to the workflow's artifact storage and the agent is signaled via FileUploaded.


  • Name
    onFilesSelected
    Type
    (files: File[]) => void
    Modifier
    optional
    Description

    Custom handler for file selection. If omitted, the component handles uploads internally using artifact storage.

  • Name
    uploadedFiles
    Type
    UploadedFile[]
    Modifier
    optional
    Description

    External file upload state to display in the input area.

  • Name
    onRemoveFile
    Type
    (fileId: string) => void
    Modifier
    optional
    Description

    Called when the user removes an uploaded file.

  • Name
    acceptedFileTypes
    Type
    string
    Modifier
    default: '.pdf,.doc,...'
    Description

    Accepted MIME types / extensions for file uploads.

  • Name
    maxFiles
    Type
    number
    Modifier
    default: 5
    Description

    Maximum number of files that can be uploaded simultaneously.

  • Name
    isUploading
    Type
    boolean
    Modifier
    default: false
    Description

    Disables the send/start buttons while files are uploading.

Document Search Integration

You can integrate a custom document search UI using a render prop:

 (
        
    )}
    selectedDocuments={selectedDocs}
    onRemoveDocument={(docId) => removeDoc(docId)}
/>

Context and Attachments

  • Name
    getAttachedDocs
    Type
    () => string[]
    Modifier
    optional
    Description

    Returns document IDs to include as store: references in messages.

  • Name
    onAttachmentsSent
    Type
    () => void
    Modifier
    optional
    Description

    Called after attachments are sent, allowing you to clear the attachment list.

  • Name
    getMessageContext
    Type
    () => Record<string, unknown>
    Modifier
    optional
    Description

    Returns additional metadata to include in every user signal. Useful for passing context like entity IDs.

Fusion Fragment Support

When building data-driven applications, you can provide data for fusion-fragment code blocks in agent responses:


Styling

  • Name
    inputContainerClassName
    Type
    string
    Modifier
    optional
    Description

    Additional Tailwind classes for the input container.

  • Name
    inputClassName
    Type
    string
    Modifier
    optional
    Description

    Additional Tailwind classes for the input field.

SDK-Only Integration

If you cannot use the React component (e.g., in a Node.js backend or non-React frontend), use the WorkflowsApi directly from @vertesia/client. See the SDK page for examples of streamMessages and sendSignal.

Key Source Files

ComponentPackagePath
Message types@vertesia/commoncomposableai/packages/common/src/store/workflow.ts
Conversation state@vertesia/commoncomposableai/packages/common/src/store/conversation-state.ts
Signals@vertesia/commoncomposableai/packages/common/src/store/signals.ts
Client streaming API@vertesia/clientcomposableai/packages/client/src/store/WorkflowsApi.ts
Conversation component@vertesia/uicomposableai/packages/ui/src/features/agent/chat/ModernAgentConversation.tsx

Was this page helpful?