Using Chunked Transfer Encoding with aiohttp

Chunked Transfer Encoding in HTTP/1.1 allows you to send data in chunks without knowing the total content length.

In this tutorial, you’ll learn how to implement both server-side and client-side chunked transfer encoding using aiohttp in Python.

In HTTP/1.1, chunked transfer encoding is indicated by the “Transfer-Encoding: chunked” header.

Each chunk consists of its size in hexadecimal format, followed by a newline, the chunk data, and another newline.

 

 

Advantages Over Fixed-length Responses

Chunked transfer encoding offers several benefits over fixed-length responses:

Let’s compare chunked and non-chunked responses:

import asyncio
from aiohttp import web
async def chunked_response(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'text/plain'
    await response.prepare(request)
    for i in range(5):
        chunk = f"Chunk {i}\n".encode()
        await response.write(chunk)
        await asyncio.sleep(1)  # Simulate processing time
    await response.write_eof()
    return response
async def non_chunked_response(request):
    content = "".join([f"Chunk {i}\n" for i in range(5)])
    await asyncio.sleep(5)  # Simulate processing time
    return web.Response(text=content)
app = web.Application()
app.router.add_get('/chunked', chunked_response)
app.router.add_get('/non-chunked', non_chunked_response)
web.run_app(app)

You can check the response difference using curl:

>curl -N http://localhost:8080/chunked

Output (Chunked):

Chunk 0
Chunk 1
Chunk 2
Chunk 3
Chunk 4

For non-chunked:

>curl -N http://localhost:8080/non-chunked

Output (Non-chunked):

Chunk 0
Chunk 1
Chunk 2
Chunk 3
Chunk 4

The chunked response starts sending data immediately and continues to send chunks every second.

The non-chunked response waits for all data to be ready before sending which results in a 5-second delay before any data is received.

 

Client-Side Handling of Chunked Responses

Instead of using curl as a client, you can use aiohttp ClientSession to handle chunked responses on the client side:

import aiohttp
import asyncio
async def fetch_and_process_chunked_data():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8080/chunked') as response:
            async for chunk in response.content.iter_chunked(1024):
                processed_chunk = chunk.decode().upper()
                print(processed_chunk, end='')
asyncio.run(fetch_and_process_chunked_data())

Output:

CHUNK 0
CHUNK 1
CHUNK 2
CHUNK 3
CHUNK 4

The client receives and processes each chunk as it arrives.

 

Chunked Transfer Encoding with JSON Data

You can use chunked transfer encoding to stream JSON data:

import json
from aiohttp import web
import asyncio
async def stream_json(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'application/json'
    response.enable_chunked_encoding()
    await response.prepare(request)
    data = [
        {"id": 1, "name": "Tom"},
        {"id": 2, "name": "Adam"},
        {"id": 3, "name": "Sam"}
    ]
    await response.write(b'[\n')
    for i, item in enumerate(data):
        chunk = json.dumps(item).encode() + b'\n'
        if i < len(data) - 1:
            chunk += b','
        await response.write(chunk)
        await asyncio.sleep(1)  # Simulate processing time
    await response.write(b']')
    await response.write_eof()
    return response
app = web.Application()
app.router.add_get('/', stream_json)
web.run_app(app)

Let’s use curl to check the response:

>curl -N http://localhost:8080

Output:

[
{"id": 1, "name": "Tom"}
,
{"id": 2, "name": "Adam"}
,
{"id": 3, "name": "Sam"}
]

This example streams a JSON array by sending each object as a separate chunk.

The client can start processing the data as soon as the first object arrives.

Handle nested JSON objects and arrays in chunks

For nested JSON structures, you can use a recursive approach:

import json
import asyncio
from aiohttp import web
async def stream_nested_json(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'application/json'
    response.enable_chunked_encoding()
    await response.prepare(request)
    data = {
        "users": [
            {"id": 1, "name": "Tom", "roles": ["admin", "user"]},
            {"id": 2, "name": "Adam", "roles": ["user"]}
        ],
        "settings": {
            "theme": "dark",
            "notifications": True
        }
    }
    async def write_json(obj):
        if isinstance(obj, dict):
            await response.write(b'{')
            for i, (key, value) in enumerate(obj.items()):
                await response.write(json.dumps(key).encode() + b':')
                await write_json(value)
                if i < len(obj) - 1:
                    await response.write(b',')
            await response.write(b'}')
        elif isinstance(obj, list):
            await response.write(b'[')
            for i, item in enumerate(obj):
                await write_json(item)
                if i < len(obj) - 1:
                    await response.write(b',')
            await response.write(b']')
        else:
            await response.write(json.dumps(obj).encode())
        await asyncio.sleep(0.5)  # Simulate processing time
    await write_json(data)
    await response.write_eof()
    return response
app = web.Application()
app.router.add_get('/', stream_nested_json)
web.run_app(app)

Using curl to show response:

>curl -N http://localhost:8080

Output:

{"users":[{"id":1,"name":"Tom","roles":["admin","user"]},{"id":2,"name":"Adam","roles":["user"]}],"settings":{"theme":"dark","notifications":true}}

This method allows you to stream complex nested JSON structures where each element is sent as a separate chunk.

For more efficient JSON streaming, you can use libraries like ijson:

import ijson
from aiohttp import web
async def stream_large_json(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'application/json'
    response.enable_chunked_encoding()
    await response.prepare(request)
    large_json = b'{"items": [{"id": 1}, {"id": 2}, {"id": 3}]}'
    parser = ijson.parse(large_json)
    for prefix, event, value in parser:
        chunk = f"{prefix} {event} {value}\n".encode()
        await response.write(chunk)
        await asyncio.sleep(0.5)  # Simulate processing time
    await response.write_eof()
    return response
app = web.Application()
app.router.add_get('/', stream_large_json)
web.run_app(app)

Output:

 start_map None
 map_key items
items start_array None
items.item start_map None
items.item map_key id
items.item.id number 1
items.item end_map None
items.item start_map None
items.item map_key id
items.item.id number 2
items.item end_map None
items.item start_map None
items.item map_key id
items.item.id number 3
items.item end_map None
items end_array None
 end_map None

By using ijson, you can stream and parse very large JSON files without loading the entire file into memory.

 

Handle Chunked Transfer in WebSockets

WebSockets provide full-duplex communication, while chunked HTTP is unidirectional.

Here’s a comparison:

from aiohttp import web
import asyncio
async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    for i in range(5):
        await ws.send_str(f"WebSocket message {i}")
        await asyncio.sleep(1)

    await ws.close()
    return ws
async def chunked_handler(request):
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'text/plain'
    response.enable_chunked_encoding()
    await response.prepare(request)
    for i in range(5):
        await response.write(f"Chunked message {i}\n".encode())
        await asyncio.sleep(1)

    await response.write_eof()
    return response
app = web.Application()
app.router.add_get('/ws', websocket_handler)
app.router.add_get('/chunked', chunked_handler)
web.run_app(app)

To initiate a WebSocket connection, you can use a WebSocket client:

import asyncio
import websockets
async def connect():
    uri = "ws://127.0.0.1:8080/ws"
    async with websockets.connect(uri) as websocket:
        async for message in websocket:
            print(f"Received: {message}")
asyncio.run(connect())

WebSocket output:

Received: WebSocket message 0
Received: WebSocket message 1
Received: WebSocket message 2
Received: WebSocket message 3
Received: WebSocket message 4

Chunked HTTP output:

Chunked message 0
Chunked message 1
Chunked message 2
Chunked message 3
Chunked message 4
Leave a Reply

Your email address will not be published. Required fields are marked *