Implement Server-Sent Events (SSE) with AIOHTTP in Python

Server-sent Events (SSE) enable real-time, unidirectional communication from servers to clients over HTTP.

This tutorial guides you through implementing SSE using AIOHTTP.

You’ll learn to create SSE endpoints, send events, handle client reconnections, generate events asynchronously, manage multiple clients, and more.

 

 

Create an SSE Endpoint with aiohttp

To create an SSE endpoint, you need to define an async handler function.

Here’s how you can do it:

from aiohttp import web
import asyncio
async def sse_handler(request):
    response = web.StreamResponse()
    await response.prepare(request)
    for i in range(5):
        await response.write(f"data: Event {i}\n\n".encode('utf-8'))
        await asyncio.sleep(1)
    return response
app = web.Application()
app.router.add_get('/events', sse_handler)
if __name__ == '__main__':
    web.run_app(app)

Output:

======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)

This code creates a basic SSE endpoint that sends five events, one per second.

If you access the SSE stream at /events, you’ll get a downloaded file with the following content:

data: Event 0

data: Event 1

data: Event 2

data: Event 3

data: Event 4

Set the appropriate content type

To ensure the client recognizes the response as an SSE stream, you need to set the correct content type which is text/event-stream:

async def sse_handler(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'text/event-stream'
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['Connection'] = 'keep-alive'
    await response.prepare(request)

    # Rest of the handler code...

If you access /events now, you’ll get the same output but this time it will be printed directly on the browser because the client treats the response as an SSE stream

 

Send Events from the Server

Format data for SSE

SSE has a specific format for events. You can include event types, IDs, and comments:

import time
async def sse_handler(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'text/event-stream'
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['Connection'] = 'keep-alive'
    await response.prepare(request)
    await response.write(b": This is a comment\n")
    await response.write(b"retry: 10000\n\n")
    for i in range(3):
        event_data = f"id: {i}\nevent: update\ndata: {{'time': {time.time()}}}\n\n"
        await response.write(event_data.encode('utf-8'))
        await asyncio.sleep(1)
    return response

Output:

: This is a comment
retry: 10000

id: 0
event: update
data: {'time': 1725794933.2915308}

id: 1
event: update
data: {'time': 1725794934.2951744}

id: 2
event: update
data: {'time': 1725794935.302102}

This code shows how to include comments, set a retry interval, and structure events with IDs and event types.

Send multiple events over time

To handle and send multiple events over time, you can use asyncio.gather():

import asyncio
import random
async def generate_temperature():
    while True:
        yield random.uniform(20, 30)
        await asyncio.sleep(2)
async def generate_humidity():
    while True:
        yield random.uniform(30, 60)
        await asyncio.sleep(3)
async def sse_handler(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'text/event-stream'
    await response.prepare(request)
    async def send_events():
        temp_gen = generate_temperature()
        humid_gen = generate_humidity()
        for _ in range(5):
            temp = await anext(temp_gen)
            humid = await anext(humid_gen)
            await response.write(f"data: {{'temperature': {temp:.2f}, 'humidity': {humid:.2f}}}\n\n".encode('utf-8'))

    await send_events()
    return response

Output:

data: {'temperature': 24.98, 'humidity': 47.48}

data: {'temperature': 26.09, 'humidity': 51.04}

data: {'temperature': 29.63, 'humidity': 50.43}

data: {'temperature': 26.20, 'humidity': 57.97}

data: {'temperature': 27.58, 'humidity': 30.31}

This code shows how to send multiple events (temperature and humidity) over time using asynchronous generators.

 

Client Reconnection Handling

Implement Last-Event-ID header support

To support client reconnection, implement the Last-Event-ID header:

async def sse_handler(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'text/event-stream'
    await response.prepare(request)
    last_id = request.headers.get('Last-Event-ID', '0')
    start_id = int(last_id) + 1
    for i in range(start_id, start_id + 5):
        await response.write(f"id: {i}\ndata: Event {i}\n\n".encode('utf-8'))
        await asyncio.sleep(1)
    return response

Using the following client to connect to the SSE server:

import aiohttp
import asyncio
import os
def get_last_event_id(file_path):
  if os.path.exists(file_path):
      with open(file_path, 'r') as file:
          return file.read().strip()
  return None

# Function to save the last event ID to a file
def save_last_event_id(file_path, last_event_id):
  with open(file_path, 'w') as file:
      file.write(last_event_id)
last_event_id_file = 'last_event_id.txt'
async def sse_client():
  last_event_id = get_last_event_id(last_event_id_file)
  headers = {}
  if last_event_id:
      headers['Last-Event-ID'] = last_event_id
  async with aiohttp.ClientSession() as session:
      async with session.get('http://localhost:8080/events', headers=headers) as response:
          async for line in response.content:
              if line.startswith(b'id:'):
                  last_event_id = line.decode('utf-8').strip().split(': ')[1]
                  save_last_event_id(last_event_id_file, last_event_id)
              elif line.startswith(b'data:'):
                  data = line.decode('utf-8').strip().split(': ')[1]
                  print(f"Received event: {data}")
if __name__ == '__main__':
  try:
      asyncio.run(sse_client())
  except KeyboardInterrupt:
      print("Client stopped.")

Output (first connection):

Received event: Event 1
Received event: Event 2
Received event: Event 3
Received event: Event 4
Received event: Event 5

Output (reconnection with Last-Event-ID: 2):

Received event: Event 3
Received event: Event 4
Received event: Event 5
Received event: Event 6
Received event: Event 7

This code handles client reconnections by using the Last-Event-ID header to resume the event stream from where the client left off.

 

Integrate with Other Asynchronous Data Sources

You can integrate SSE with other asynchronous data sources:

import aiohttp
import asyncio
async def fetch_stock_price(symbol):
    async with aiohttp.ClientSession() as session:
        url = f"https://api.example.com/stock/{symbol}"
        async with session.get(url) as response:
            data = await response.json()
            return data['price']
async def sse_handler(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'text/event-stream'
    await response.prepare(request)
    symbols = ['AAPL', 'GOOGL', 'MSFT']
    for _ in range(3):
        prices = await asyncio.gather(*[fetch_stock_price(symbol) for symbol in symbols])
        event_data = {symbol: price for symbol, price in zip(symbols, prices)}
        await response.write(f"data: {event_data}\n\n".encode('utf-8'))
        await asyncio.sleep(5)
    return response

Output:

data: {'AAPL': 150.25, 'GOOGL': 2750.80, 'MSFT': 305.15}

data: {'AAPL': 151.10, 'GOOGL': 2755.50, 'MSFT': 306.20}

data: {'AAPL': 150.75, 'GOOGL': 2748.30, 'MSFT': 305.90}

This code integrates SSE with an asynchronous stock price API and fetches real-time stock prices to the client.

 

Multiple Client Handling

Manage concurrent connections

To manage concurrent connections, you can use a set to keep track of active clients:

import asyncio
from aiohttp import web
class SSEServer:
    def __init__(self):
        self.clients = set()
    async def handle_client(self, request):
        response = web.StreamResponse()
        response.headers['Content-Type'] = 'text/event-stream'
        await response.prepare(request)
        self.clients.add(response)
        try:
            while True:
                await asyncio.sleep(10)
        finally:
            self.clients.remove(response)
        return response
    async def broadcast(self, message):
        for client in self.clients:
            await client.write(f"data: {message}\n\n".encode('utf-8'))
sse_server = SSEServer()
app = web.Application()
app.router.add_get('/events', sse_server.handle_client)
if __name__ == '__main__':
    web.run_app(app)

This code creates an SSEServer class that manages concurrent connections and provides a method to broadcast messages to all connected clients.

Broadcast events to all connected clients

You can use the broadcast method defined above which iterates through active clients to send events to all of them:

import asyncio
import random
async def generate_weather():
    while True:
        temperature = random.uniform(20, 30)
        humidity = random.uniform(30, 60)
        await sse_server.broadcast(f"{{'temperature': {temperature:.2f}, 'humidity': {humidity:.2f}}}")
        await asyncio.sleep(5)
async def start_background_tasks(app):
    app['weather_task'] = asyncio.create_task(generate_weather())
async def cleanup_background_tasks(app):
    app['weather_task'].cancel()
    await app['weather_task']
app = web.Application()
app.router.add_get('/events', sse_server.handle_client)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
if __name__ == '__main__':
    web.run_app(app)

Output (for multiple connected clients):

data: {'temperature': 24.56, 'humidity': 45.23}

data: {'temperature': 27.18, 'humidity': 38.91}

data: {'temperature': 22.73, 'humidity': 52.45}

This code shows how to broadcast weather updates to all connected clients every 5 seconds and send real-time updates to all of them simultaneously.

Leave a Reply

Your email address will not be published. Required fields are marked *