> ## Documentation Index
> Fetch the complete documentation index at: https://motiadev-docs-deployment-guide.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Stream Real-Time Data

> How to push live updates to connected clients using the Stream module.

## Goal

Push real-time updates (chat messages, notifications, live dashboards) to connected clients over WebSocket.

## Steps

### 1. Enable the Stream module

```yaml title="iii-config.yaml" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
modules:
  - class: modules::stream::StreamModule
    config:
      port: ${STREAM_PORT:3112}
      host: localhost
      adapter:
        class: modules::stream::adapters::KvStore
        config:
          store_method: file_based  # Options: in_memory, file_based
          file_path: ./data/stream_store  # required for file_based
        # class: modules::stream::adapters::RedisAdapter
        # config:
        #   redis_url: redis://localhost:6379
```

### 2. Write to a stream

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript title="stream-writer.ts" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import { registerWorker, Logger } from 'iii-sdk'

    const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

    iii.registerFunction({ id: 'chat::send' }, async (input) => {
      const logger = new Logger()
      const messageId = crypto.randomUUID()

      await iii.trigger({
        function_id: 'stream::set',
        payload: {
          stream_name: 'chat',
          group_id: input.roomId,
          item_id: messageId,
          data: { text: input.text, author: input.author },
        },
      })

      logger.info('Message sent to stream', { messageId })
      return { messageId }
    })

    // Then call from another function or worker
    const { messageId } = await iii.trigger({
      function_id: 'chat::send',
      payload: { roomId: 'room-123', text: 'Hello world', author: 'alice' },
    })
    logger.info('Sent message', { messageId })
    ```
  </Tab>

  <Tab title="Python">
    ```python title="stream_writer.py" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import os
    import uuid

    from iii import Logger, register_worker

    iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))


    def send_message(input):
        logger = Logger()
        message_id = str(uuid.uuid4())

        iii.trigger({
            "function_id": "stream::set",
            "payload": {
                "stream_name": "chat",
                "group_id": input["roomId"],
                "item_id": message_id,
                "data": {"text": input["text"], "author": input["author"]},
            },
        })

        logger.info("Message sent to stream", {"messageId": message_id})
        return {"messageId": message_id}


    iii.register_function({"id": "chat::send"}, send_message)

    # Then call from another function or worker
    result = iii.trigger({
        "function_id": "chat::send",
        "payload": {"roomId": "room-123", "text": "Hello world", "author": "alice"},
    })
    print("Sent message:", result["messageId"])
    ```
  </Tab>

  <Tab title="Rust">
    ```rust title="stream_writer.rs" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunctionMessage, TriggerRequest};
    use serde_json::{json, Value};
    use tokio::signal;

    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let url = std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string());
        let iii = register_worker(&url, InitOptions::default());

        let iii_clone = iii.clone();
        iii.register_function(
            RegisterFunctionMessage::with_id("chat::send".into()),
            move |input: Value| {
                let iii = iii_clone.clone();
                async move {
                    let logger = Logger::new();
                    let message_id = uuid::Uuid::new_v4().to_string();
                    let room_id = input["roomId"].as_str().unwrap_or("");
                    let text = input["text"].as_str().unwrap_or("");
                    let author = input["author"].as_str().unwrap_or("");

                    iii.trigger(TriggerRequest {
                        function_id: "stream::set".into(),
                        payload: json!({
                            "stream_name": "chat",
                            "group_id": room_id,
                            "item_id": message_id,
                            "data": { "text": text, "author": author },
                        }),
                        action: None,
                        timeout_ms: None,
                    })
                    .await?;

                    logger.info("Message sent to stream", Some(json!({ "messageId": message_id })));
                    Ok(json!({ "messageId": message_id }))
                }
            },
        );

        signal::ctrl_c().await?;
        Ok(())
    }

    // Then call from another function or worker
    let result = iii
        .trigger(TriggerRequest {
            function_id: "chat::send".into(),
            payload: json!({ "roomId": "room-123", "text": "Hello world", "author": "alice" }),
            action: None,
            timeout_ms: None,
        })
        .await?;
    println!("Sent message: {}", result["messageId"]);
    ```
  </Tab>
</Tabs>

### 3. Read from a stream

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript title="stream-reader.ts" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    iii.registerFunction({ id: 'chat::list' }, async (input) => {
      const logger = new Logger()

      const messages = await iii.trigger({
        function_id: 'stream::list',
        payload: { stream_name: 'chat', group_id: input.roomId },
      })

      logger.info('Messages retrieved', { count: messages.length })
      return messages
    })

    // Then call from another function or worker
    const messages = await iii.trigger({
      function_id: 'chat::list',
      payload: { roomId: 'room-123' },
    })
    logger.info('Messages', { messages })
    ```
  </Tab>

  <Tab title="Python">
    ```python title="stream_reader.py" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    def list_messages(input):
        logger = Logger()

        messages = iii.trigger({
            "function_id": "stream::list",
            "payload": {"stream_name": "chat", "group_id": input["roomId"]},
        })

        logger.info("Messages retrieved", {"count": len(messages)})
        return messages


    iii.register_function({"id": "chat::list"}, list_messages)

    # Then call from another function or worker
    messages = iii.trigger({
        "function_id": "chat::list",
        "payload": {"roomId": "room-123"},
    })
    print("Messages:", messages)
    ```
  </Tab>

  <Tab title="Rust">
    ```rust title="stream_reader.rs" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    let iii_clone = iii.clone();
    iii.register_function(
        RegisterFunctionMessage::with_id("chat::list".into()),
        move |input: Value| {
            let iii = iii_clone.clone();
            async move {
                let logger = Logger::new();
                let room_id = input["roomId"].as_str().unwrap_or("");

                let messages = iii
                    .trigger(TriggerRequest {
                        function_id: "stream::list".into(),
                        payload: json!({ "stream_name": "chat", "group_id": room_id }),
                        action: None,
                        timeout_ms: None,
                    })
                    .await?;

                logger.info("Messages retrieved", Some(json!({ "count": messages.as_array().map(|a| a.len()).unwrap_or(0) })));
                Ok(messages)
            }
        },
    );

    // Then call from another function or worker
    let messages = iii
        .trigger(TriggerRequest {
            function_id: "chat::list".into(),
            payload: json!({ "roomId": "room-123" }),
            action: None,
            timeout_ms: None,
        })
        .await?;
    println!("Messages: {:?}", messages);
    ```
  </Tab>
</Tabs>

### 4. Connect a client

Clients connect to the stream WebSocket endpoint to receive live updates:

```javascript title="client.js" theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
const ws = new WebSocket('ws://localhost:3112/stream/chat/room-123')

ws.onmessage = (event) => {
  const update = JSON.parse(event.data)
  console.log('New message:', update)
}
```

## Result

Any data written to the stream via `stream::set` is immediately pushed to all connected WebSocket clients subscribed to that stream and group. No polling needed.

{/* <Info title="See also">
For stream operations like `update`, `delete`, `listGroups`, and authentication, see the [Stream module reference](../modules/module-stream).
</Info> */}
