Implement Stream Responses with aiohttp in Python
Streaming responses in Python allow you to send data to clients in chunks and provide real-time updates.
In this tutorial, you’ll learn how to implement stream responses using the aiohttp library.
Create Basic Stream Response
To create a basic stream response, use the StreamResponse class from aiohttp.
from aiohttp import web async def handle(request): response = web.StreamResponse() await response.prepare(request) await response.write(b"Hello, ") await response.write(b"World!") await response.write_eof() return response app = web.Application() app.router.add_get('/', handle) web.run_app(app)
Output:
Hello, World!
This code sets up a basic HTTP server that streams “Hello, World!” in two separate chunks.
The response is prepared, written to in parts, and then finalized with write_eof().
Write Data in Chunks
You can use the write() method to send data in chunks:
async def handle(request): response = web.StreamResponse() await response.prepare(request) for i in range(5): await response.write(f"Chunk {i}\n".encode()) await response.write_eof() return response
Output:
Chunk 0 Chunk 1 Chunk 2 Chunk 3 Chunk 4
This code sends five separate chunks of data, each on a new line.
The for loop shows how you can dynamically generate and send content.
For non-blocking operations, you can use await with asynchronous functions.
import asyncio async def handle(request): response = web.StreamResponse() await response.prepare(request) for i in range(5): await response.write(f"Async Chunk {i}\n".encode()) await asyncio.sleep(1) await response.write_eof() return response
Output:
Async Chunk 0 Async Chunk 1 Async Chunk 2 Async Chunk 3 Async Chunk 4
This example introduces a 1-second delay between each chunk to simulate asynchronous data processing or retrieval.
Handle Client-Side Streaming
You can use aiohttp ClientSession to handle streaming responses on the client side.
import aiohttp import asyncio async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080') as response: async for chunk in response.content.iter_chunked(1024): print(chunk.decode()) asyncio.run(fetch())
Output:
Async Chunk 0 Async Chunk 1 Async Chunk 2 Async Chunk 3 Async Chunk 4
This client-side code connects to the server and processes the streamed data in chunks of 1024 bytes.
You can process streamed data as it arrives using iter_any()
regardless of chunk size:
async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080') as response: async for chunk in response.content.iter_any(): print(chunk.decode()) asyncio.run(fetch())
Output:
Async Chunk 0 Async Chunk 1 Async Chunk 2 Async Chunk 3 Async Chunk 4
Use Generators with Stream Responses
You can implement a generator function in the server to yield data for streaming.
async def data_generator(): for i in range(5): yield f"Generated Chunk {i}\n".encode() await asyncio.sleep(1) async def handle(request): response = web.StreamResponse() await response.prepare(request) async for chunk in data_generator(): await response.write(chunk) await response.write_eof() return response
Output:
Generated Chunk 0 Generated Chunk 1 Generated Chunk 2 Generated Chunk 3 Generated Chunk 4
This method uses a generator to produce data on-the-fly, which is efficient for large datasets or real-time data.
Stream JSON Responses
You can stream JSON objects one at a time:
import json async def handle(request): response = web.StreamResponse() await response.prepare(request) data = [{"id": i, "name": f"Item {i}"} for i in range(5)] for item in data: await response.write(json.dumps(item).encode() + b'\n') await response.write_eof() return response
Output:
{"id": 0, "name": "Item 0"} {"id": 1, "name": "Item 1"} {"id": 2, "name": "Item 2"} {"id": 3, "name": "Item 3"} {"id": 4, "name": "Item 4"}
Each JSON object is sent as a separate line.
Then you can parse incoming JSON data as it arrives from the client side like this:
async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080') as response: async for line in response.content: item = json.loads(line) print(f"Received: {item}") asyncio.run(fetch())
Output:
Received: {'id': 0, 'name': 'Item 0'} Received: {'id': 1, 'name': 'Item 1'} Received: {'id': 2, 'name': 'Item 2'} Received: {'id': 3, 'name': 'Item 3'} Received: {'id': 4, 'name': 'Item 4'}
This client-side code processes each JSON object as it’s received in realtime.
Simulate Real-time Data Streaming
To simulate a live data feed with periodic updates, you can use asynio.sleep()
:
import asyncio import random async def handle(request): response = web.StreamResponse() await response.prepare(request) for _ in range(5): data = f"Temperature: {random.uniform(20, 30):.2f}°C\n" await response.write(data.encode()) await asyncio.sleep(2) await response.write_eof() return response
Output:
Temperature: 23.72°C Temperature: 25.26°C Temperature: 25.62°C Temperature: 23.49°C Temperature: 26.25°C
This example simulates a temperature sensor sending updates every 2 seconds.
Now you can process real-time updates as they arrive from the client side like this:
async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080') as response: async for chunk in response.content.iter_any(): print(f"Update: {chunk.decode().strip()}") asyncio.run(fetch())
Output:
Update: Temperature: 29.79°C Update: Temperature: 27.04°C Update: Temperature: 28.63°C Update: Temperature: 28.14°C Update: Temperature: 21.34°C
The client processes and displays each temperature update as it’s received.
Compression in Streaming Responses
You can use zlip compression from the server side to reduce bandwidth usage in streaming responses.
import zlib async def handle(request): response = web.StreamResponse() await response.prepare(request) for _ in range(5): data = f"Temperature: {random.uniform(20, 30):.2f}°C\n" compressed_data = zlib.compress(data.encode()) await response.write(compressed_data) await asyncio.sleep(2) await response.write_eof() return response
This code compresses the data before sending it to reduce the amount of data transferred over the network.
If you try to show the results in a web browser, you’ll see gibberish text (compressed data).
Now let’s decompress the received data on the client side:
import zlip async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080') as response: async for chunk in response.content.iter_any(): decompressed_data = zlib.decompress(chunk) print(f"Update: {decompressed_data.decode().strip()}") asyncio.run(fetch())
Output:
Update: Temperature: 20.39°C Update: Temperature: 28.88°C Update: Temperature: 23.57°C Update: Temperature: 29.49°C Update: Temperature: 23.35°C
The client decompresses the received data and restores the original content.
Cancellation and cleanup
To detect and handle client disconnections gracefully, you can use a try-catch block:
async def handle(request): response = web.StreamResponse() await response.prepare(request) try: for _ in range(5): data = f"Temperature: {random.uniform(20, 30):.2f}°C\n" await response.write(data.encode()) await asyncio.sleep(2) except ConnectionResetError: print("Client disconnected unexpectedly.") except Exception as e: print(f"An unexpected error occurred: {e}") finally: try: await response.write_eof() except ConnectionResetError: print("Connection already closed, cannot write EOF.") return response
On the client side, we should use a try-catch block too:
async def fetch(): try: async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080') as response: async for chunk in response.content.iter_any(): print(f"Update: {chunk.decode().strip()}") except asyncio.CancelledError: print("Fetch operation was cancelled.") except Exception as e: print(f"An unexpected error occurred: {e}") try: asyncio.run(fetch()) except KeyboardInterrupt: print("Client operation was interrupted by the user.")
Output:
Update: Temperature: 21.46°C Update: Temperature: 23.22°C Fetch operation was cancelled.
This code catches connection errors when a client disconnects unexpectedly.
Rate Limiting and Throttling
You should control the rate of data sent to prevent overwhelming clients or the server.
import asyncio import time class RateLimiter: def __init__(self, rate_limit): self.rate_limit = rate_limit self.last_check = time.time() self.allowance = rate_limit async def wait(self): current = time.time() time_passed = current - self.last_check self.last_check = current self.allowance += time_passed * self.rate_limit if self.allowance > self.rate_limit: self.allowance = self.rate_limit if self.allowance < 1.0: await asyncio.sleep((1 - self.allowance) / self.rate_limit) self.allowance = 0.0 else: self.allowance -= 1.0 async def handle(request): response = web.StreamResponse() await response.prepare(request) limiter = RateLimiter(1) # 1 message per second for i in range(50): await limiter.wait() await response.write(f"Rate limited data {i}\n".encode()) await response.write_eof() return response
Using our client:
async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080') as response: async for chunk in response.content.iter_any(): print(f"{chunk.decode().strip()}") asyncio.run(fetch())
Output:
Rate limited data 0 Rate limited data 1 Rate limited data 2 Rate limited data 3 Rate limited data 4
This implementation ensures that data is sent at a controlled rate of one message per second.
If you increase the limiter, it will show more messages per second.
Mokhtar is the founder of LikeGeeks.com. He is a seasoned technologist and accomplished author, with expertise in Linux system administration and Python development. Since 2010, Mokhtar has built an impressive career, transitioning from system administration to Python development in 2015. His work spans large corporations to freelance clients around the globe. Alongside his technical work, Mokhtar has authored some insightful books in his field. Known for his innovative solutions, meticulous attention to detail, and high-quality work, Mokhtar continually seeks new challenges within the dynamic field of technology.