Implement Server-Sent Events (SSE) with AIOHTTP in Python
Server-sent Events (SSE) enable real-time, unidirectional communication from servers to clients over HTTP.
This tutorial guides you through implementing SSE using AIOHTTP.
You’ll learn to create SSE endpoints, send events, handle client reconnections, generate events asynchronously, manage multiple clients, and more.
Create an SSE Endpoint with aiohttp
To create an SSE endpoint, you need to define an async handler function.
Here’s how you can do it:
from aiohttp import web import asyncio async def sse_handler(request): response = web.StreamResponse() await response.prepare(request) for i in range(5): await response.write(f"data: Event {i}\n\n".encode('utf-8')) await asyncio.sleep(1) return response app = web.Application() app.router.add_get('/events', sse_handler) if __name__ == '__main__': web.run_app(app)
Output:
======== Running on http://0.0.0.0:8080 ======== (Press CTRL+C to quit)
This code creates a basic SSE endpoint that sends five events, one per second.
If you access the SSE stream at /events, you’ll get a downloaded file with the following content:
data: Event 0 data: Event 1 data: Event 2 data: Event 3 data: Event 4
Set the appropriate content type
To ensure the client recognizes the response as an SSE stream, you need to set the correct content type which is text/event-stream
:
async def sse_handler(request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/event-stream' response.headers['Cache-Control'] = 'no-cache' response.headers['Connection'] = 'keep-alive' await response.prepare(request) # Rest of the handler code...
If you access /events now, you’ll get the same output but this time it will be printed directly on the browser because the client treats the response as an SSE stream
Send Events from the Server
Format data for SSE
SSE has a specific format for events. You can include event types, IDs, and comments:
import time async def sse_handler(request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/event-stream' response.headers['Cache-Control'] = 'no-cache' response.headers['Connection'] = 'keep-alive' await response.prepare(request) await response.write(b": This is a comment\n") await response.write(b"retry: 10000\n\n") for i in range(3): event_data = f"id: {i}\nevent: update\ndata: {{'time': {time.time()}}}\n\n" await response.write(event_data.encode('utf-8')) await asyncio.sleep(1) return response
Output:
: This is a comment retry: 10000 id: 0 event: update data: {'time': 1725794933.2915308} id: 1 event: update data: {'time': 1725794934.2951744} id: 2 event: update data: {'time': 1725794935.302102}
This code shows how to include comments, set a retry interval, and structure events with IDs and event types.
Send multiple events over time
To handle and send multiple events over time, you can use asyncio.gather():
import asyncio import random async def generate_temperature(): while True: yield random.uniform(20, 30) await asyncio.sleep(2) async def generate_humidity(): while True: yield random.uniform(30, 60) await asyncio.sleep(3) async def sse_handler(request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/event-stream' await response.prepare(request) async def send_events(): temp_gen = generate_temperature() humid_gen = generate_humidity() for _ in range(5): temp = await anext(temp_gen) humid = await anext(humid_gen) await response.write(f"data: {{'temperature': {temp:.2f}, 'humidity': {humid:.2f}}}\n\n".encode('utf-8')) await send_events() return response
Output:
data: {'temperature': 24.98, 'humidity': 47.48} data: {'temperature': 26.09, 'humidity': 51.04} data: {'temperature': 29.63, 'humidity': 50.43} data: {'temperature': 26.20, 'humidity': 57.97} data: {'temperature': 27.58, 'humidity': 30.31}
This code shows how to send multiple events (temperature and humidity) over time using asynchronous generators.
Client Reconnection Handling
Implement Last-Event-ID header support
To support client reconnection, implement the Last-Event-ID header:
async def sse_handler(request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/event-stream' await response.prepare(request) last_id = request.headers.get('Last-Event-ID', '0') start_id = int(last_id) + 1 for i in range(start_id, start_id + 5): await response.write(f"id: {i}\ndata: Event {i}\n\n".encode('utf-8')) await asyncio.sleep(1) return response
Using the following client to connect to the SSE server:
import aiohttp import asyncio import os def get_last_event_id(file_path): if os.path.exists(file_path): with open(file_path, 'r') as file: return file.read().strip() return None # Function to save the last event ID to a file def save_last_event_id(file_path, last_event_id): with open(file_path, 'w') as file: file.write(last_event_id) last_event_id_file = 'last_event_id.txt' async def sse_client(): last_event_id = get_last_event_id(last_event_id_file) headers = {} if last_event_id: headers['Last-Event-ID'] = last_event_id async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080/events', headers=headers) as response: async for line in response.content: if line.startswith(b'id:'): last_event_id = line.decode('utf-8').strip().split(': ')[1] save_last_event_id(last_event_id_file, last_event_id) elif line.startswith(b'data:'): data = line.decode('utf-8').strip().split(': ')[1] print(f"Received event: {data}") if __name__ == '__main__': try: asyncio.run(sse_client()) except KeyboardInterrupt: print("Client stopped.")
Output (first connection):
Received event: Event 1 Received event: Event 2 Received event: Event 3 Received event: Event 4 Received event: Event 5
Output (reconnection with Last-Event-ID: 2):
Received event: Event 3 Received event: Event 4 Received event: Event 5 Received event: Event 6 Received event: Event 7
This code handles client reconnections by using the Last-Event-ID header to resume the event stream from where the client left off.
Integrate with Other Asynchronous Data Sources
You can integrate SSE with other asynchronous data sources:
import aiohttp import asyncio async def fetch_stock_price(symbol): async with aiohttp.ClientSession() as session: url = f"https://api.example.com/stock/{symbol}" async with session.get(url) as response: data = await response.json() return data['price'] async def sse_handler(request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/event-stream' await response.prepare(request) symbols = ['AAPL', 'GOOGL', 'MSFT'] for _ in range(3): prices = await asyncio.gather(*[fetch_stock_price(symbol) for symbol in symbols]) event_data = {symbol: price for symbol, price in zip(symbols, prices)} await response.write(f"data: {event_data}\n\n".encode('utf-8')) await asyncio.sleep(5) return response
Output:
data: {'AAPL': 150.25, 'GOOGL': 2750.80, 'MSFT': 305.15} data: {'AAPL': 151.10, 'GOOGL': 2755.50, 'MSFT': 306.20} data: {'AAPL': 150.75, 'GOOGL': 2748.30, 'MSFT': 305.90}
This code integrates SSE with an asynchronous stock price API and fetches real-time stock prices to the client.
Multiple Client Handling
Manage concurrent connections
To manage concurrent connections, you can use a set to keep track of active clients:
import asyncio from aiohttp import web class SSEServer: def __init__(self): self.clients = set() async def handle_client(self, request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/event-stream' await response.prepare(request) self.clients.add(response) try: while True: await asyncio.sleep(10) finally: self.clients.remove(response) return response async def broadcast(self, message): for client in self.clients: await client.write(f"data: {message}\n\n".encode('utf-8')) sse_server = SSEServer() app = web.Application() app.router.add_get('/events', sse_server.handle_client) if __name__ == '__main__': web.run_app(app)
This code creates an SSEServer class that manages concurrent connections and provides a method to broadcast messages to all connected clients.
Broadcast events to all connected clients
You can use the broadcast
method defined above which iterates through active clients to send events to all of them:
import asyncio import random async def generate_weather(): while True: temperature = random.uniform(20, 30) humidity = random.uniform(30, 60) await sse_server.broadcast(f"{{'temperature': {temperature:.2f}, 'humidity': {humidity:.2f}}}") await asyncio.sleep(5) async def start_background_tasks(app): app['weather_task'] = asyncio.create_task(generate_weather()) async def cleanup_background_tasks(app): app['weather_task'].cancel() await app['weather_task'] app = web.Application() app.router.add_get('/events', sse_server.handle_client) app.on_startup.append(start_background_tasks) app.on_cleanup.append(cleanup_background_tasks) if __name__ == '__main__': web.run_app(app)
Output (for multiple connected clients):
data: {'temperature': 24.56, 'humidity': 45.23} data: {'temperature': 27.18, 'humidity': 38.91} data: {'temperature': 22.73, 'humidity': 52.45}
This code shows how to broadcast weather updates to all connected clients every 5 seconds and send real-time updates to all of them simultaneously.
Mokhtar is the founder of LikeGeeks.com. He is a seasoned technologist and accomplished author, with expertise in Linux system administration and Python development. Since 2010, Mokhtar has built an impressive career, transitioning from system administration to Python development in 2015. His work spans large corporations to freelance clients around the globe. Alongside his technical work, Mokhtar has authored some insightful books in his field. Known for his innovative solutions, meticulous attention to detail, and high-quality work, Mokhtar continually seeks new challenges within the dynamic field of technology.