Implement Stream Responses with aiohttp in Python

Streaming responses in Python allow you to send data to clients in chunks and provide real-time updates.

In this tutorial, you’ll learn how to implement stream responses using the aiohttp library.

 

 

Create Basic Stream Response

To create a basic stream response, use the StreamResponse class from aiohttp.

from aiohttp import web
async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    await response.write(b"Hello, ")
    await response.write(b"World!")
    await response.write_eof()
    return response
app = web.Application()
app.router.add_get('/', handle)
web.run_app(app)

Output:

Hello, World!

This code sets up a basic HTTP server that streams “Hello, World!” in two separate chunks.

The response is prepared, written to in parts, and then finalized with write_eof().

 

Write Data in Chunks

You can use the write() method to send data in chunks:

async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    for i in range(5):
        await response.write(f"Chunk {i}\n".encode())
    await response.write_eof()
    return response

Output:

Chunk 0
Chunk 1
Chunk 2
Chunk 3
Chunk 4

This code sends five separate chunks of data, each on a new line.

The for loop shows how you can dynamically generate and send content.

For non-blocking operations, you can use await with asynchronous functions.

import asyncio
async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    for i in range(5):
        await response.write(f"Async Chunk {i}\n".encode())
        await asyncio.sleep(1)
    await response.write_eof()
    return response

Output:

Async Chunk 0
Async Chunk 1
Async Chunk 2
Async Chunk 3
Async Chunk 4

This example introduces a 1-second delay between each chunk to simulate asynchronous data processing or retrieval.

 

Handle Client-Side Streaming

You can use aiohttp ClientSession to handle streaming responses on the client side.

import aiohttp
import asyncio
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8080') as response:
            async for chunk in response.content.iter_chunked(1024):
                print(chunk.decode())
asyncio.run(fetch())

Output:

Async Chunk 0

Async Chunk 1

Async Chunk 2

Async Chunk 3

Async Chunk 4

This client-side code connects to the server and processes the streamed data in chunks of 1024 bytes.

You can process streamed data as it arrives using iter_any() regardless of chunk size:

async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8080') as response:
            async for chunk in response.content.iter_any():
                print(chunk.decode())
asyncio.run(fetch())

Output:

Async Chunk 0

Async Chunk 1

Async Chunk 2

Async Chunk 3

Async Chunk 4

 

Use Generators with Stream Responses

You can implement a generator function in the server to yield data for streaming.

async def data_generator():
    for i in range(5):
        yield f"Generated Chunk {i}\n".encode()
        await asyncio.sleep(1)
async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    async for chunk in data_generator():
        await response.write(chunk)
    await response.write_eof()
    return response

Output:

Generated Chunk 0
Generated Chunk 1
Generated Chunk 2
Generated Chunk 3
Generated Chunk 4

This method uses a generator to produce data on-the-fly, which is efficient for large datasets or real-time data.

 

Stream JSON Responses

You can stream JSON objects one at a time:

import json
async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    data = [{"id": i, "name": f"Item {i}"} for i in range(5)]
    for item in data:
        await response.write(json.dumps(item).encode() + b'\n')
    await response.write_eof()
    return response

Output:

{"id": 0, "name": "Item 0"}
{"id": 1, "name": "Item 1"}
{"id": 2, "name": "Item 2"}
{"id": 3, "name": "Item 3"}
{"id": 4, "name": "Item 4"}

Each JSON object is sent as a separate line.

Then you can parse incoming JSON data as it arrives from the client side like this:

async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8080') as response:
            async for line in response.content:
                item = json.loads(line)
                print(f"Received: {item}")
asyncio.run(fetch())

Output:

Received: {'id': 0, 'name': 'Item 0'}
Received: {'id': 1, 'name': 'Item 1'}
Received: {'id': 2, 'name': 'Item 2'}
Received: {'id': 3, 'name': 'Item 3'}
Received: {'id': 4, 'name': 'Item 4'}

This client-side code processes each JSON object as it’s received in realtime.

 

Simulate Real-time Data Streaming

To simulate a live data feed with periodic updates, you can use asynio.sleep():

import asyncio
import random
async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    for _ in range(5):
        data = f"Temperature: {random.uniform(20, 30):.2f}°C\n"
        await response.write(data.encode())
        await asyncio.sleep(2)
    await response.write_eof()
    return response

Output:

Temperature: 23.72°C
Temperature: 25.26°C
Temperature: 25.62°C
Temperature: 23.49°C
Temperature: 26.25°C

This example simulates a temperature sensor sending updates every 2 seconds.

Now you can process real-time updates as they arrive from the client side like this:

async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8080') as response:
            async for chunk in response.content.iter_any():
                print(f"Update: {chunk.decode().strip()}")
asyncio.run(fetch())

Output:

Update: Temperature: 29.79°C
Update: Temperature: 27.04°C
Update: Temperature: 28.63°C
Update: Temperature: 28.14°C
Update: Temperature: 21.34°C

The client processes and displays each temperature update as it’s received.

 

Compression in Streaming Responses

You can use zlip compression from the server side to reduce bandwidth usage in streaming responses.

import zlib
async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    for _ in range(5):
        data = f"Temperature: {random.uniform(20, 30):.2f}°C\n"
        compressed_data = zlib.compress(data.encode())
        await response.write(compressed_data)
        await asyncio.sleep(2)
    await response.write_eof()
    return response

This code compresses the data before sending it to reduce the amount of data transferred over the network.

If you try to show the results in a web browser, you’ll see gibberish text (compressed data).

Now let’s decompress the received data on the client side:

import zlip
async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8080') as response:
            async for chunk in response.content.iter_any():
                decompressed_data = zlib.decompress(chunk)
                print(f"Update: {decompressed_data.decode().strip()}")
asyncio.run(fetch())

Output:

Update: Temperature: 20.39°C
Update: Temperature: 28.88°C
Update: Temperature: 23.57°C
Update: Temperature: 29.49°C
Update: Temperature: 23.35°C

The client decompresses the received data and restores the original content.

 

Cancellation and cleanup

To detect and handle client disconnections gracefully, you can use a try-catch block:

async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    try:
        for _ in range(5):
            data = f"Temperature: {random.uniform(20, 30):.2f}°C\n"
            await response.write(data.encode())
            await asyncio.sleep(2)
    except ConnectionResetError:
        print("Client disconnected unexpectedly.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        try:
            await response.write_eof()
        except ConnectionResetError:
            print("Connection already closed, cannot write EOF.")
    return response

On the client side, we should use a try-catch block too:

async def fetch():
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get('http://localhost:8080') as response:
                async for chunk in response.content.iter_any():
                    print(f"Update: {chunk.decode().strip()}")
    except asyncio.CancelledError:
        print("Fetch operation was cancelled.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
try:
    asyncio.run(fetch())
except KeyboardInterrupt:
    print("Client operation was interrupted by the user.")

Output:

Update: Temperature: 21.46°C
Update: Temperature: 23.22°C
Fetch operation was cancelled.

This code catches connection errors when a client disconnects unexpectedly.

 

Rate Limiting and Throttling

You should control the rate of data sent to prevent overwhelming clients or the server.

import asyncio
import time
class RateLimiter:
    def __init__(self, rate_limit):
        self.rate_limit = rate_limit
        self.last_check = time.time()
        self.allowance = rate_limit
    async def wait(self):
        current = time.time()
        time_passed = current - self.last_check
        self.last_check = current
        self.allowance += time_passed * self.rate_limit
        if self.allowance > self.rate_limit:
            self.allowance = self.rate_limit
        if self.allowance < 1.0:
            await asyncio.sleep((1 - self.allowance) / self.rate_limit)
            self.allowance = 0.0
        else:
            self.allowance -= 1.0

async def handle(request):
    response = web.StreamResponse()
    await response.prepare(request)
    limiter = RateLimiter(1)  # 1 message per second
    for i in range(50):
        await limiter.wait()
        await response.write(f"Rate limited data {i}\n".encode())
    await response.write_eof()
    return response

Using our client:

async def fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8080') as response:
            async for chunk in response.content.iter_any():
                print(f"{chunk.decode().strip()}")
asyncio.run(fetch())

Output:

Rate limited data 0
Rate limited data 1
Rate limited data 2
Rate limited data 3
Rate limited data 4

This implementation ensures that data is sent at a controlled rate of one message per second.

If you increase the limiter, it will show more messages per second.

Leave a Reply

Your email address will not be published. Required fields are marked *