Using Chunked Transfer Encoding with aiohttp
Chunked Transfer Encoding in HTTP/1.1 allows you to send data in chunks without knowing the total content length.
In this tutorial, you’ll learn how to implement both server-side and client-side chunked transfer encoding using aiohttp in Python.
In HTTP/1.1, chunked transfer encoding is indicated by the “Transfer-Encoding: chunked” header.
Each chunk consists of its size in hexadecimal format, followed by a newline, the chunk data, and another newline.
Advantages Over Fixed-length Responses
Chunked transfer encoding offers several benefits over fixed-length responses:
Let’s compare chunked and non-chunked responses:
import asyncio from aiohttp import web async def chunked_response(request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/plain' await response.prepare(request) for i in range(5): chunk = f"Chunk {i}\n".encode() await response.write(chunk) await asyncio.sleep(1) # Simulate processing time await response.write_eof() return response async def non_chunked_response(request): content = "".join([f"Chunk {i}\n" for i in range(5)]) await asyncio.sleep(5) # Simulate processing time return web.Response(text=content) app = web.Application() app.router.add_get('/chunked', chunked_response) app.router.add_get('/non-chunked', non_chunked_response) web.run_app(app)
You can check the response difference using curl:
>curl -N http://localhost:8080/chunked
Output (Chunked):
Chunk 0 Chunk 1 Chunk 2 Chunk 3 Chunk 4
For non-chunked:
>curl -N http://localhost:8080/non-chunked
Output (Non-chunked):
Chunk 0 Chunk 1 Chunk 2 Chunk 3 Chunk 4
The chunked response starts sending data immediately and continues to send chunks every second.
The non-chunked response waits for all data to be ready before sending which results in a 5-second delay before any data is received.
Client-Side Handling of Chunked Responses
Instead of using curl as a client, you can use aiohttp ClientSession to handle chunked responses on the client side:
import aiohttp import asyncio async def fetch_and_process_chunked_data(): async with aiohttp.ClientSession() as session: async with session.get('http://localhost:8080/chunked') as response: async for chunk in response.content.iter_chunked(1024): processed_chunk = chunk.decode().upper() print(processed_chunk, end='') asyncio.run(fetch_and_process_chunked_data())
Output:
CHUNK 0 CHUNK 1 CHUNK 2 CHUNK 3 CHUNK 4
The client receives and processes each chunk as it arrives.
Chunked Transfer Encoding with JSON Data
You can use chunked transfer encoding to stream JSON data:
import json from aiohttp import web import asyncio async def stream_json(request): response = web.StreamResponse() response.headers['Content-Type'] = 'application/json' response.enable_chunked_encoding() await response.prepare(request) data = [ {"id": 1, "name": "Tom"}, {"id": 2, "name": "Adam"}, {"id": 3, "name": "Sam"} ] await response.write(b'[\n') for i, item in enumerate(data): chunk = json.dumps(item).encode() + b'\n' if i < len(data) - 1: chunk += b',' await response.write(chunk) await asyncio.sleep(1) # Simulate processing time await response.write(b']') await response.write_eof() return response app = web.Application() app.router.add_get('/', stream_json) web.run_app(app)
Let’s use curl to check the response:
>curl -N http://localhost:8080
Output:
[ {"id": 1, "name": "Tom"} , {"id": 2, "name": "Adam"} , {"id": 3, "name": "Sam"} ]
This example streams a JSON array by sending each object as a separate chunk.
The client can start processing the data as soon as the first object arrives.
Handle nested JSON objects and arrays in chunks
For nested JSON structures, you can use a recursive approach:
import json import asyncio from aiohttp import web async def stream_nested_json(request): response = web.StreamResponse() response.headers['Content-Type'] = 'application/json' response.enable_chunked_encoding() await response.prepare(request) data = { "users": [ {"id": 1, "name": "Tom", "roles": ["admin", "user"]}, {"id": 2, "name": "Adam", "roles": ["user"]} ], "settings": { "theme": "dark", "notifications": True } } async def write_json(obj): if isinstance(obj, dict): await response.write(b'{') for i, (key, value) in enumerate(obj.items()): await response.write(json.dumps(key).encode() + b':') await write_json(value) if i < len(obj) - 1: await response.write(b',') await response.write(b'}') elif isinstance(obj, list): await response.write(b'[') for i, item in enumerate(obj): await write_json(item) if i < len(obj) - 1: await response.write(b',') await response.write(b']') else: await response.write(json.dumps(obj).encode()) await asyncio.sleep(0.5) # Simulate processing time await write_json(data) await response.write_eof() return response app = web.Application() app.router.add_get('/', stream_nested_json) web.run_app(app)
Using curl to show response:
>curl -N http://localhost:8080
Output:
{"users":[{"id":1,"name":"Tom","roles":["admin","user"]},{"id":2,"name":"Adam","roles":["user"]}],"settings":{"theme":"dark","notifications":true}}
This method allows you to stream complex nested JSON structures where each element is sent as a separate chunk.
For more efficient JSON streaming, you can use libraries like ijson:
import ijson from aiohttp import web async def stream_large_json(request): response = web.StreamResponse() response.headers['Content-Type'] = 'application/json' response.enable_chunked_encoding() await response.prepare(request) large_json = b'{"items": [{"id": 1}, {"id": 2}, {"id": 3}]}' parser = ijson.parse(large_json) for prefix, event, value in parser: chunk = f"{prefix} {event} {value}\n".encode() await response.write(chunk) await asyncio.sleep(0.5) # Simulate processing time await response.write_eof() return response app = web.Application() app.router.add_get('/', stream_large_json) web.run_app(app)
Output:
start_map None map_key items items start_array None items.item start_map None items.item map_key id items.item.id number 1 items.item end_map None items.item start_map None items.item map_key id items.item.id number 2 items.item end_map None items.item start_map None items.item map_key id items.item.id number 3 items.item end_map None items end_array None end_map None
By using ijson, you can stream and parse very large JSON files without loading the entire file into memory.
Handle Chunked Transfer in WebSockets
WebSockets provide full-duplex communication, while chunked HTTP is unidirectional.
Here’s a comparison:
from aiohttp import web import asyncio async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) for i in range(5): await ws.send_str(f"WebSocket message {i}") await asyncio.sleep(1) await ws.close() return ws async def chunked_handler(request): response = web.StreamResponse() response.headers['Content-Type'] = 'text/plain' response.enable_chunked_encoding() await response.prepare(request) for i in range(5): await response.write(f"Chunked message {i}\n".encode()) await asyncio.sleep(1) await response.write_eof() return response app = web.Application() app.router.add_get('/ws', websocket_handler) app.router.add_get('/chunked', chunked_handler) web.run_app(app)
To initiate a WebSocket connection, you can use a WebSocket client:
import asyncio import websockets async def connect(): uri = "ws://127.0.0.1:8080/ws" async with websockets.connect(uri) as websocket: async for message in websocket: print(f"Received: {message}") asyncio.run(connect())
WebSocket output:
Received: WebSocket message 0 Received: WebSocket message 1 Received: WebSocket message 2 Received: WebSocket message 3 Received: WebSocket message 4
Chunked HTTP output:
Chunked message 0 Chunked message 1 Chunked message 2 Chunked message 3 Chunked message 4
Mokhtar is the founder of LikeGeeks.com. He is a seasoned technologist and accomplished author, with expertise in Linux system administration and Python development. Since 2010, Mokhtar has built an impressive career, transitioning from system administration to Python development in 2015. His work spans large corporations to freelance clients around the globe. Alongside his technical work, Mokhtar has authored some insightful books in his field. Known for his innovative solutions, meticulous attention to detail, and high-quality work, Mokhtar continually seeks new challenges within the dynamic field of technology.