The Sse response type enables streaming events to clients using the Server-Sent Events (SSE) protocol. SSE is a simple HTTP-based protocol for pushing updates from the server to the client.
Basic usage
Return a stream of events wrapped in Sse:
use axum::{
response::sse::{Event, Sse},
routing::get,
Router,
};
use futures_util::stream::{self, Stream};
use std::{convert::Infallible, time::Duration};
use tokio_stream::StreamExt as _;
async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
// A stream that repeats an event every second
let stream = stream::repeat_with(|| Event::default().data("hi!"))
.map(Ok)
.throttle(Duration::from_secs(1));
Sse::new(stream).keep_alive(KeepAlive::default())
}
let app = Router::new().route("/sse", get(sse_handler));
Example from axum/src/response/sse.rs:6-26 and examples/sse/src/main.rs.
How it works
IntoResponse implementation
The Sse type sets proper headers and wraps the stream in a body:
impl<S, E> IntoResponse for Sse<S>
where
S: Stream<Item = Result<Event, E>> + Send + 'static,
E: Into<BoxError>,
{
fn into_response(self) -> Response {
(
[
(http::header::CONTENT_TYPE, "text/event-stream"),
(http::header::CACHE_CONTROL, "no-cache"),
],
Body::new(SseBody { /* ... */ }),
)
.into_response()
}
}
See implementation in axum/src/response/sse.rs:90-107.
Response fields
Automatically set to text/event-stream
Automatically set to no-cache
A stream of SSE events formatted according to the SSE protocol
Event API
Creating events
use axum::response::sse::Event;
// Simple data event
let event = Event::default().data("message");
// Event with all fields
let event = Event::default()
.event("update") // Event type
.data("payload") // Event data
.id("unique-id") // Event ID
.retry(Duration::from_secs(30)); // Retry timeout
Event fields
The event payload. Corresponds to MessageEvent.data in the browser. Newlines are automatically handled with multiple data: fields.
The event type. Used with addEventListener(type, ...) in the browser. Defaults to "message" if not set.
Event identifier. Sets MessageEvent.lastEventId in the browser. Cannot contain newlines or null characters.
Reconnection timeout hint for the client in milliseconds.
Comment field (ignored by most clients). Can be called multiple times. Cannot contain newlines.
JSON data
use serde::Serialize;
#[derive(Serialize)]
struct Update {
count: i32,
}
let event = Event::default()
.json_data(Update { count: 42 })
.unwrap();
See axum/src/response/sse.rs:246-277.
Custom data writer
For advanced use cases, write data incrementally:
use std::fmt::Write;
let mut writer = Event::default().into_data_writer();
write!(&mut writer, "custom {}", "data").unwrap();
let event = writer.into_event();
See axum/src/response/sse.rs:179-222.
Keep-alive
Keep connections alive by sending periodic messages:
use axum::response::sse::KeepAlive;
use std::time::Duration;
let stream = /* your event stream */;
Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive-text")
)
KeepAlive configuration
Time between keep-alive messages. Default is 15 seconds.
Custom keep-alive message text. Default is an empty comment.
Custom keep-alive event. Allows full control over the keep-alive message.
See implementation in axum/src/response/sse.rs:514-573.
Complete example
use axum::{
response::sse::{Event, KeepAlive, Sse},
routing::get,
Router,
};
use futures_util::stream::{self, Stream};
use std::{convert::Infallible, time::Duration};
use tokio_stream::StreamExt as _;
#[tokio::main]
async fn main() {
let app = Router::new().route("/events", get(event_stream));
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
axum::serve(listener, app).await;
}
async fn event_stream() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream::repeat_with(|| {
Event::default()
.event("update")
.data(format!("timestamp: {}", std::time::SystemTime::now()))
})
.map(Ok)
.throttle(Duration::from_secs(1));
Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(5))
.text("keep-alive")
)
}
Browser usage
const eventSource = new EventSource('/events');
// Listen for default "message" events
eventSource.onmessage = (event) => {
console.log('Received:', event.data);
};
// Listen for custom event types
eventSource.addEventListener('update', (event) => {
console.log('Update:', event.data);
});
eventSource.onerror = (error) => {
console.error('SSE error:', error);
};
See also