SSE Chatbot example
The Fasthtml SSE chatbot example
"""Example from https://github.com/fabge/fasthtml-sse/"""
from fasthtml.common import *
from claudette import *
import asyncio
from starlette.responses import StreamingResponse
# Set up the app, including daisyui and tailwind and the htmx sse extension for the chat component
= (Script(src="https://cdn.tailwindcss.com"),)
tlink = Link(
dlink ="stylesheet",
rel="https://cdn.jsdelivr.net/npm/daisyui@4.11.1/dist/full.min.css",
href
)= Script(src="https://unpkg.com/htmx-ext-sse@2.2.1/sse.js")
sselink = FastHTML(hdrs=(tlink, dlink, picolink, sselink), live=True)
app
# Set up a chat model client and list of messages (https://claudette.answer.ai/)
= Client(models[-1])
cli = """You are a helpful and concise assistant."""
sp = []
messages
# @app.get("/{fname:path}.{ext:static}")
# def static(fname: str, ext: str):
# return FileResponse(f"{fname}.{ext}")
# Send messages to the chat model and yield the responses
async def message_generator():
print("message_generator", messages)
= cli(messages[:-1], sp=sp, stream=True)
r for chunk in r:
-1]["content"] += chunk
messages[yield f"event: message\ndata: {chunk}\n\n"
await asyncio.sleep(0.5)
yield f"event: close\ndata: \n\n"
# Chat message component (renders a chat bubble)
# Now with a unique ID for the content and the message
def ChatMessage(msg_idx, **kwargs):
= messages[msg_idx]
msg = (
bubble_class "chat-bubble-primary" if msg["role"] == "user" else "chat-bubble-secondary"
)= "chat-end" if msg["role"] == "user" else "chat-start"
chat_class return Div(
"role"], cls="chat-header"),
Div(msg[
Div("content"],
msg[id=f"chat-content-{msg_idx}", # Target if updating the content
=f"chat-bubble {bubble_class}",
cls**kwargs,
),id=f"chat-message-{msg_idx}", # Target if replacing the whole message
=f"chat {chat_class}",
cls
)
# The input field for the user message. Also used to clear the
# input field after sending a message via an OOB swap
def ChatInput():
return Input(
type="text",
="msg",
nameid="msg-input",
="Type a message",
placeholder="input input-bordered w-full",
cls="true",
hx_swap_oob
)
# The main screen
@app.route("/")
def get():
= Body(
page "Chatbot SSE (server-sent events) Demo"),
H1(
Div(*[ChatMessage(msg) for msg in messages],
id="chatlist",
="chat-box h-[73vh] overflow-y-auto",
cls
),
Form("Send", cls="btn btn-primary")),
Group(ChatInput(), Button(="/send-message",
hx_post="#chatlist",
hx_target="beforeend",
hx_swap="flex space-x-2 mt-2",
cls
),="p-4 max-w-lg mx-auto",
cls
)return Title("Chatbot Demo"), page
@app.get("/get-message")
async def get_message():
return StreamingResponse(message_generator(), media_type="text/event-stream")
@app.post("/send-message")
async def send_message(msg: str):
"role": "user", "content": msg})
messages.append({= Div(ChatMessage(len(messages) - 1))
user_msg "role": "assistant", "content": ""})
messages.append({# The returned assistant message uses the SSE extension, connect to the /get-message endpoint and get all messages until the close event
= Div(
assistant_msg
ChatMessage(len(messages) - 1,
="sse",
hx_ext="/get-message",
sse_connect="message",
sse_swap="close",
sse_close="beforeend show:bottom",
hx_swap
)
)return user_msg, assistant_msg
serve()
Corresponding HTML:
<html><head>
<title>My</template></body></title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, viewport-fit=cover">
<script src="https://unpkg.com/htmx.org@next/dist/htmx.min.js"></script>
<script src="https://cdn.jsdelivr.net/gh/answerdotai/fasthtml-js@main/fasthtml.js"></script>
<script src="https://cdn.jsdelivr.net/gh/answerdotai/surreal@main/surreal.js"></script>
<script src="https://cdn.jsdelivr.net/gh/gnat/css-scope-inline@main/script.js"></script>
<script src="https://cdn.tailwindcss.com"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/daisyui@4.11.1/dist/full.min.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@picocss/pico@latest/css/pico.min.css">
<style>:root { --pico-font-size: 100%; }</style>
<script src="https://unpkg.com/htmx-ext-sse@2.2.1/sse.js"></script>
<style>/* ! tailwindcss v3.4.5 | MIT License | https://tailwindcss.com */*,::after,::before{box-sizing:border-box;border-width:0;border-style:solid;border-color:#e5e7eb}::after,::before{--tw-content:''}:host,html{line-height:1.5;-webkit-text-size-adjust:100%;-moz-tab-size:4;tab-size:4;font-family:ui-sans-serif, system-ui, sans-serif, "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji";font-feature-settings:normal;font-variation-settings:normal;-webkit-tap-highlight-color:transparent}body{margin:0;line-height:inherit}hr{height:0;color:inherit;border-top-width:1px}abbr:where([title]){-webkit-text-decoration:underline dotted;text-decoration:underline dotted}h1,h2,h3,h4,h5,h6{font-size:inherit;font-weight:inherit}a{color:inherit;text-decoration:inherit}b,strong{font-weight:bolder}code,kbd,pre,samp{font-family:ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;font-feature-settings:normal;font-variation-settings:normal;font-size:1em}small{font-size:80%}sub,sup{font-size:75%;line-height:0;position:relative;vertical-align:baseline}sub{bottom:-.25em}sup{top:-.5em}table{text-indent:0;border-color:inherit;border-collapse:collapse}button,input,optgroup,select,textarea{font-family:inherit;font-feature-settings:inherit;font-variation-settings:inherit;font-size:100%;font-weight:inherit;line-height:inherit;letter-spacing:inherit;color:inherit;margin:0;padding:0}button,select{text-transform:none}button,input:where([type=button]),input:where([type=reset]),input:where([type=submit]){-webkit-appearance:button;background-color:transparent;background-image:none}:-moz-focusring{outline:auto}:-moz-ui-invalid{box-shadow:none}progress{vertical-align:baseline}::-webkit-inner-spin-button,::-webkit-outer-spin-button{height:auto}[type=search]{-webkit-appearance:textfield;outline-offset:-2px}::-webkit-search-decoration{-webkit-appearance:none}::-webkit-file-upload-button{-webkit-appearance:button;font:inherit}summary{display:list-item}blockquote,dd,dl,figure,h1,h2,h3,h4,h5,h6,hr,p,pre{margin:0}fieldset{margin:0;padding:0}legend{padding:0}menu,ol,ul{list-style:none;margin:0;padding:0}dialog{padding:0}textarea{resize:vertical}input::placeholder,textarea::placeholder{opacity:1;color:#9ca3af}[role=button],button{cursor:pointer}:disabled{cursor:default}audio,canvas,embed,iframe,img,object,svg,video{display:block;vertical-align:middle}img,video{max-width:100%;height:auto}[hidden]{display:none}*, ::before, ::after{--tw-border-spacing-x:0;--tw-border-spacing-y:0;--tw-translate-x:0;--tw-translate-y:0;--tw-rotate:0;--tw-skew-x:0;--tw-skew-y:0;--tw-scale-x:1;--tw-scale-y:1;--tw-pan-x: ;--tw-pan-y: ;--tw-pinch-zoom: ;--tw-scroll-snap-strictness:proximity;--tw-gradient-from-position: ;--tw-gradient-via-position: ;--tw-gradient-to-position: ;--tw-ordinal: ;--tw-slashed-zero: ;--tw-numeric-figure: ;--tw-numeric-spacing: ;--tw-numeric-fraction: ;--tw-ring-inset: ;--tw-ring-offset-width:0px;--tw-ring-offset-color:#fff;--tw-ring-color:rgb(59 130 246 / 0.5);--tw-ring-offset-shadow:0 0 #0000;--tw-ring-shadow:0 0 #0000;--tw-shadow:0 0 #0000;--tw-shadow-colored:0 0 #0000;--tw-blur: ;--tw-brightness: ;--tw-contrast: ;--tw-grayscale: ;--tw-hue-rotate: ;--tw-invert: ;--tw-saturate: ;--tw-sepia: ;--tw-drop-shadow: ;--tw-backdrop-blur: ;--tw-backdrop-brightness: ;--tw-backdrop-contrast: ;--tw-backdrop-grayscale: ;--tw-backdrop-hue-rotate: ;--tw-backdrop-invert: ;--tw-backdrop-opacity: ;--tw-backdrop-saturate: ;--tw-backdrop-sepia: ;--tw-contain-size: ;--tw-contain-layout: ;--tw-contain-paint: ;--tw-contain-style: }::backdrop{--tw-border-spacing-x:0;--tw-border-spacing-y:0;--tw-translate-x:0;--tw-translate-y:0;--tw-rotate:0;--tw-skew-x:0;--tw-skew-y:0;--tw-scale-x:1;--tw-scale-y:1;--tw-pan-x: ;--tw-pan-y: ;--tw-pinch-zoom: ;--tw-scroll-snap-strictness:proximity;--tw-gradient-from-position: ;--tw-gradient-via-position: ;--tw-gradient-to-position: ;--tw-ordinal: ;--tw-slashed-zero: ;--tw-numeric-figure: ;--tw-numeric-spacing: ;--tw-numeric-fraction: ;--tw-ring-inset: ;--tw-ring-offset-width:0px;--tw-ring-offset-color:#fff;--tw-ring-color:rgb(59 130 246 / 0.5);--tw-ring-offset-shadow:0 0 #0000;--tw-ring-shadow:0 0 #0000;--tw-shadow:0 0 #0000;--tw-shadow-colored:0 0 #0000;--tw-blur: ;--tw-brightness: ;--tw-contrast: ;--tw-grayscale: ;--tw-hue-rotate: ;--tw-invert: ;--tw-saturate: ;--tw-sepia: ;--tw-drop-shadow: ;--tw-backdrop-blur: ;--tw-backdrop-brightness: ;--tw-backdrop-contrast: ;--tw-backdrop-grayscale: ;--tw-backdrop-hue-rotate: ;--tw-backdrop-invert: ;--tw-backdrop-opacity: ;--tw-backdrop-saturate: ;--tw-backdrop-sepia: ;--tw-contain-size: ;--tw-contain-layout: ;--tw-contain-paint: ;--tw-contain-style: }.mx-auto{margin-left:auto;margin-right:auto}.mt-2{margin-top:0.5rem}.flex{display:flex}.h-\[73vh\]{height:73vh}.w-full{width:100%}.max-w-lg{max-width:32rem}.space-x-2 > :not([hidden]) ~ :not([hidden]){--tw-space-x-reverse:0;margin-right:calc(0.5rem * var(--tw-space-x-reverse));margin-left:calc(0.5rem * calc(1 - var(--tw-space-x-reverse)))}.overflow-y-auto{overflow-y:auto}.p-4{padding:1rem}</style><style> .htmx-indicator{opacity:0} .htmx-request .htmx-indicator{opacity:1; transition: opacity 200ms ease-in;} .htmx-request.htmx-indicator{opacity:1; transition: opacity 200ms ease-in;} </style></head>
<body live="" class="p-4 max-w-lg mx-auto">
<h1>Chatbot SSE (server-sent events) Demo</h1>
<div id="chatlist" class="chat-box h-[73vh] overflow-y-auto"> <div>
<div id="chat-message-0" class="chat chat-end">
<div class="chat-header">user</div>
<div id="chat-content-0" class="chat-bubble chat-bubble-primary">Hi</div>
</div>
</div>
<div>
<div id="chat-message-1" class="chat chat-start">
<div class="chat-header">assistant</div>
<div hx-ext="sse" sse-connect="/get-message" sse-swap="message" sse-close="close" hx-swap="beforeend show:bottom" id="chat-content-1" class="chat-bubble chat-bubble-secondary">Hello! How can I assist you today?</div>
</div>
</div>
<div>
<div id="chat-message-2" class="chat chat-end">
<div class="chat-header">user</div>
<div id="chat-content-2" class="chat-bubble chat-bubble-primary">how to create a fasthtml app?</div>
</div>
</div>
</div>
<form enctype="multipart/form-data" hx-post="/send-message" hx-target="#chatlist" hx-swap="beforeend" class="flex space-x-2 mt-2">
<fieldset role="group">
<input type="text" name="msg" placeholder="Type a message" hx-swap-oob="true" id="msg-input" class="input input-bordered w-full">
<button class="btn btn-primary">Send</button>
</fieldset>
</form>
</body></html>
Using htmx SSE for message streaming
The htmx server-sent-events
(SSE) extension allows you to connect to an EventSource directly from HTML, enabling real-time updates to your webpage.
SSE overview
Server-Sent Events (SSE) is a technology that allows a server to push updates to the client over a single HTTP connection. Unlike WebSockets, SSE is uni-directional, meaning the server can send data to the client, but the client cannot send data back over the same connection.
In contrast, WebSockets provides a bi-directional real-time communications channel. Imagine a scenario where the client not only receives messages from the server but also needs to notify the server immediately when is any client side update. Here, WebSockets are ideal because they allow bidirectional communication—both the client and server can send and receive messages in real-time.
Key Attributes
hx-ext="sse"
: Installs the SSE extension on the HTML element.sse-connect="<url>"
: Specifies the URL of the SSE server.sse-swap="<message-name>"
: Specifies the name of the message to swap into the DOM.hx-swap
: Controls the swap strategy for the content.
Let’s break down the HTML code and explain how the SSE extension is used.
Explanation
- HTML Head Section:
- Includes the necessary scripts for htmx, FastHTML, and other libraries.
- Includes the
sse.js
script to enable the SSE extension.
- Body Section:
- Contains a
div
with the idchatlist
that holds the chat messages. - Each chat message is wrapped in a
div
with a unique id and appropriate classes for styling.
- Contains a
- SSE Integration:
- The
div
with idchat-content-1
is where the SSE extension is applied. hx-ext="sse"
: Installs the SSE extension on this element.sse-connect="/get-message"
: Connects to the/get-message
endpoint on the server to receive SSE messages.sse-swap="message"
: Specifies that the content of the SSE message should be swapped into this element.sse-close="close"
: Specifies that the connection should be closed when aclose
event is received.hx-swap="beforeend show:bottom"
: Controls the swap strategy, appending new content to the end and ensuring it is visible through automatic scrolling.
- The
Server-Side getting message from the LLM
Here’s the relevant server-side code that sends SSE messages:
async def message_generator():
print("message_generator", messages)
= cli(messages[:-1], sp=sp, stream=True)
r for chunk in r:
-1]["content"] += chunk
messages[yield f"event: message\ndata: {chunk}\n\n"
await asyncio.sleep(0.5)
yield f"event: close\ndata: \n\n"
@app.get("/get-message")
async def get_message():
return StreamingResponse(message_generator(), media_type="text/event-stream")
Understanding yield
When a function contains a yield
statement, it becomes a generator function. Instead of returning a single value and terminating, a generator function can yield multiple values, one at a time, pausing its state between each yield and resuming from where it left off when the next value is requested.
Let’s look at a simple example to understand how yield
works:
def simple_generator():
yield 1
yield 2
yield 3
= simple_generator()
gen print(next(gen)) # Output: 1
print(next(gen)) # Output: 2
print(next(gen)) # Output: 3
In this example: - The simple_generator
function yields three values: 1, 2, and 3. - Each call to next(gen)
resumes the function from where it last yielded a value.
Using yield
with message_generator
In the message_generator
function, yield
is used to send chunks of data as server-sent events (SSE) to the client. Here’s a detailed breakdown:
async def message_generator():
print("message_generator", messages)
= cli(messages[:-1], sp=sp, stream=True)
r for chunk in r:
-1]["content"] += chunk
messages[yield f"event: message\ndata: {chunk}\n\n"
await asyncio.sleep(0.5)
yield f"event: close\ndata: \n\n"
Print the Messages List
print("message_generator", messages)
This prints the current state of the
messages
list for debugging.Call the Chat Model Client
= cli(messages[:-1], sp=sp, stream=True) r
This calls the chat model client with all messages except the last one (which is a placeholder for the response). The
sp
parameter is the system prompt, andstream=True
indicates that the response will be streamed.Process the Streamed Response
for chunk in r: -1]["content"] += chunk messages[yield f"event: message\ndata: {chunk}\n\n" await asyncio.sleep(0.5)
for chunk in r:
: Iterates over each chunk of the streamed response.messages[-1]["content"] += chunk
: Appends the chunk to the content of the last message (the placeholder for the response).yield f"event: message\ndata: {chunk}\n\n"
: Yields a server-sent event with the chunk of data. This sends the chunk to the client.await asyncio.sleep(0.5)
: Pauses for 0.5 seconds between chunks to simulate streaming.
Close the Stream
yield f"event: close\ndata: \n\n"
This yields a closing event to indicate the end of the stream.
Example with Detailed Steps
Let’s assume the messages
list initially contains:
= [
messages "role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi, how can I help you?"},
{"role": "user", "content": "Tell me a joke"},
{"role": "assistant", "content": ""} # Placeholder for the response
{ ]
When message_generator
is called, the following steps occur:
Print the Messages List
print("message_generator", messages)
Output:
message_generator [{'role': 'user', 'content': 'Hello'}, {'role': 'assistant', 'content': 'Hi, how can I help you?'}, {'role': 'user', 'content': 'Tell me a joke'}, {'role': 'assistant', 'content': ''}]
Call the Chat Model Client
= cli(messages[:-1], sp=sp, stream=True) r
The
cli
function is called with:-1] = [ messages[:"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi, how can I help you?"}, {"role": "user", "content": "Tell me a joke"} { ]
Process the Streamed Response Let’s assume the chat model returns the following chunks:
= ["Why did the scarecrow win an award?", "Because he was outstanding in his field!"] r
The loop processes each chunk:
First chunk:
-1]["content"] += "Why did the scarecrow win an award?" messages[yield f"event: message\ndata: Why did the scarecrow win an award?\n\n" await asyncio.sleep(0.5)
Updated
messages
:= [ messages "role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi, how can I help you?"}, {"role": "user", "content": "Tell me a joke"}, {"role": "assistant", "content": "Why did the scarecrow win an award?"} { ]
Second chunk:
-1]["content"] += "Because he was outstanding in his field!" messages[yield f"event: message\ndata: Because he was outstanding in his field!\n\n" await asyncio.sleep(0.5)
Updated
messages
:= [ messages "role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi, how can I help you?"}, {"role": "user", "content": "Tell me a joke"}, {"role": "assistant", "content": "Why did the scarecrow win an award?Because he was outstanding in his field!"} { ]
Close the Stream
yield f"event: close\ndata: \n\n"
This final yield indicates the end of the server-sent events stream, signaling to the client that no more data will be sent.
the get_message
Endpoint
- This endpoint returns a
StreamingResponse
that streams the output of themessage_generator
function with the media typetext/event-stream
.
Server-Side sending message to the client
@app.post("/send-message")
async def send_message(msg: str):
"role": "user", "content": msg})
messages.append({= Div(ChatMessage(len(messages) - 1))
user_msg "role": "assistant", "content": ""})
messages.append({# The returned assistant message uses the SSE extension, connect to the /get-message endpoint and get all messages until the close event
= Div(
assistant_msg
ChatMessage(len(messages) - 1,
="sse",
hx_ext="/get-message",
sse_connect="message",
sse_swap="close",
sse_close="beforeend show:bottom",
hx_swap
)
)return user_msg, assistant_msg
Function Overview
The send_message
function is an asynchronous endpoint that handles POST requests to the /send-message
URL. It processes the user’s message, updates the messages
list, and prepares the response for both the user and the assistant using the Server-Sent Events (SSE) extension.
Detailed Explanation
Append User Message:
"role": "user", "content": msg}) messages.append({
- This line appends the user’s message to the
messages
list. - The message is stored as a dictionary with the role
"user"
and the content of the message.
- This line appends the user’s message to the
Create User Message Div:
= Div(ChatMessage(len(messages) - 1)) user_msg
- This line creates a
Div
element for the user’s message using theChatMessage
function. ChatMessage(len(messages) - 1)
generates the HTML for the user’s message, wherelen(messages) - 1
is the index of the last message (the user’s message).
- This line creates a
Append Placeholder for Assistant’s Response:
"role": "assistant", "content": ""}) messages.append({
- This line appends a placeholder for the assistant’s response to the
messages
list. - The placeholder is stored as a dictionary with the role
"assistant"
and an empty content string.
- This line appends a placeholder for the assistant’s response to the
Create Assistant Message Div with SSE:
= Div( assistant_msg ChatMessage(len(messages) - 1, ="sse", hx_ext="/get-message", sse_connect="message", sse_swap="close", sse_close="beforeend show:bottom", hx_swap ) )
- This line creates a
Div
element for the assistant’s response using theChatMessage
function. ChatMessage(len(messages) - 1, ...)
generates the HTML for the assistant’s message, wherelen(messages) - 1
is the index of the last message (the placeholder for the assistant’s response).- The
Div
element includes several attributes to enable SSE:hx_ext="sse"
: Installs the SSE extension on this element.sse_connect="/get-message"
: Connects to the/get-message
endpoint to receive SSE messages.sse_swap="message"
: Specifies that the content of the SSE message should be swapped into this element.sse_close="close"
: Specifies that the connection should be closed when aclose
event is received.hx_swap="beforeend show:bottom"
: Controls the swap strategy, appending new content to the end and ensuring it is visible.
- This line creates a
Return User and Assistant Messages:
return user_msg, assistant_msg
- This line returns the
Div
elements for both the user’s message and the assistant’s response. - These elements are sent back to the client, where they are rendered in the chat interface.
- This line returns the
How the Client and Server Work Together
- Client-Side:
- When the page loads, the
div
withhx-ext="sse"
connects to the/get-message
endpoint. - The SSE connection is established, and the client starts listening for messages.
- When the page loads, the
- Server-Side:
- The
message_generator
function generates messages and sends them to the client as SSE messages. - Each chunk of the response is sent as an SSE message with the event name
message
. - The client receives these messages and swaps their content into the
div
withsse-swap="message"
.
- The
- Real-Time Updates:
- As the server sends chunks of the response, the client updates the chat bubble in real-time.
- When the server sends the
close
event, the SSE connection is closed.
Summary
Whenever the server send message to the client, we simutaneously send two messages, one user message (the question) and one assistant message (the answer with an empty message as placeholder). In the assistant message, we add the hx_sse
related parameters.
="sse",
hx_ext="/get-message",
sse_connect="message",
sse_swap="close",
sse_close="beforeend show:bottom", hx_swap
[{'role': 'user', 'content': 'Hi'}, {'role': 'assistant', 'content': ''}]
This enables that element to receive updates on-demand from the server through the established connection to the /get-message
endpoint.
The server then stream the assistant message to complete its content.
<div hx-ext="sse" sse-connect="/get-message" sse-swap="message" sse-close="close" hx-swap="beforeend show:bottom" id="chat-content-1" class="chat-bubble chat-bubble-secondary">Hello! How can I assist you today?</div>
adapt the websocket version to SSE
- Start with the main screen
/
route- Create form for input
- Create input field with oob for reset after send
- Create
/send-message
route- create user message
- append an empty assistant message
- create assistant message with sse connected to
/get-message
route - return both of them with input reset
- Create
/get-message
route- Create a
message_generator
to generate messages - Create a
StreamingResponse
to stream the messages
- Create a