aiohttp Exception Handling: Comprehensive Tutorial

In this tutorial, you’ll learn how to handle exceptions in aiohttp.

You’ll explore various types of exceptions, implement custom error handling, and manage errors in aiohttp applications.

 

 

Handle Client-side Exceptions

To handle specific client-side exceptions, you can catch them individually.

Here’s an example:

import aiohttp
import asyncio
async def fetch(url):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=1)) as response:
                return await response.json()
    except aiohttp.ClientConnectorError:
        print("Connection error occurred")
    except asyncio.TimeoutError:
        print("Request timed out")
    except aiohttp.InvalidURL:
        print("Invalid URL provided")
    except aiohttp.ContentTypeError:
        print("Unexpected content type in response")
asyncio.run(fetch("https://microsoft.com"))

Output:

Request timed out

In this case, the request timed out because we set a very short timeout of 1 second.

If we change the timeout to 7 seconds, the timeout issue should be resolved:

Unexpected content type in response

As you can see, this error because the website doesn’t return json as we expect.

This example shows how to handle different types of client-side exceptions.

 

Custom Exception Handling

You can create custom exception classes to handle application-specific errors:

class APIError(Exception):
    def __init__(self, status_code, message):
        self.status_code = status_code
        self.message = message
        super().__init__(f"API Error: {status_code} - {message}")
async def fetch_api_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status >= 400:
                raise APIError(response.status, await response.text())
            return await response.json()
try:
    data = asyncio.run(fetch_api_data("https://api.github.com/nonexistent"))
except APIError as e:
    print(f"Caught custom exception: {e}")

Output:

Caught custom exception: API Error: 404 - {"message":"Not Found","documentation_url":"https://docs.github.com/rest","status":"404"}

This example shows how to create and use a custom APIError exception.

It provides more context about API-specific errors, including the status code and error message.

Implement str and repr methods

You can improve your custom exceptions by implementing __str__ and __repr__ methods:

class APIError(Exception):
    def __init__(self, status_code, message):
        self.status_code = status_code
        self.message = message
        super().__init__(f"API Error: {status_code} - {message}")

    def __str__(self):
        return f"APIError(status_code={self.status_code}, message='{self.message}')"

    def __repr__(self):
        return self.__str__()
error = APIError(404, "Not Found")
print(str(error))
print(repr(error))

Output:

APIError(status_code=404, message='Not Found')
APIError(status_code=404, message='Not Found')

By implementing __str__ and __repr__ methods, you provide a clear and informative representation of your custom exception.

 

Exception propagation in aiohttp applications

You can allow exceptions to bubble up to be handled at a higher level:

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status != 200:
                raise aiohttp.ClientResponseError(
                    response.request_info,
                    response.history,
                    status=response.status,
                    message=f"HTTP Error {response.status}",
                )
            return await response.json()
async def main():
    try:
        data = await fetch_data("https://api.github.com/nonexistent")
    except aiohttp.ClientResponseError as e:
        print(f"Caught exception in main: {e}")
asyncio.run(main())

Output:

Caught exception in main: 404, message='HTTP Error 404'

This example shows how exceptions can bubble up from fetch_data to main.

Catch and re-raise exceptions

Sometimes you might want to catch an exception, perform some action, and then re-raise it:

async def fetch_data(url):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()
    except aiohttp.ClientError as e:
        print(f"Logging error: {e}")
        raise
async def main():
    try:
        data = await fetch_data("https://github.com/nonexistent")
    except aiohttp.ClientError as e:
        print(f"Handling error in main: {e}")

asyncio.run(main())

Output:

Logging error: 0, message='Attempt to decode JSON with unexpected mimetype: text/html; charset=utf-8', url=URL('https://github.com/nonexistent')
Handling error in main: 0, message='Attempt to decode JSON with unexpected mimetype: text/html; charset=utf-8', url=URL('https://github.com/nonexistent')

This code catches the exception in fetch_data, logs it, and then re-raises it to be handled in main. This allows for both local and global error handling.

 

Error Handling Middleware

You can create middleware to handle exceptions across your entire aiohttp application:

from aiohttp import web

@web.middleware
async def error_middleware(request, handler):
    try:
        response = await handler(request)
        return response
    except web.HTTPException:
        raise
    except Exception as ex:
        print(f"Unhandled exception: {ex}")
        return web.json_response({'error': 'Internal Server Error'}, status=500)

This middleware catches all unhandled exceptions and returns a JSON response with a 500 status code.

To use the middleware, add it to your aiohttp application:

app = web.Application(middlewares=[error_middleware])
async def hello(request):
    return web.Response(text="Hello, World!")
app.router.add_get('/', hello)
web.run_app(app)

This code adds the error_middleware to the application, ensuring all requests pass through it.

You can customize error responses in your middleware:

@web.middleware
async def error_middleware(request, handler):
    try:
        response = await handler(request)
        return response
    except web.HTTPException as ex:
        return web.json_response({'error': ex.reason}, status=ex.status)
    except Exception as ex:
        print(f"Unhandled exception: {ex}")
        return web.json_response({'error': 'Internal Server Error'}, status=500)

This middleware provides custom JSON responses for different types of errors.

 

Graceful Shutdown and Cleanup

You can handle cancellation exceptions during shutdown:

import asyncio
async def long_running_task():
    try:
        while True:
            await asyncio.sleep(1)
            print("Working...")
    except asyncio.CancelledError:
        print("Task was cancelled, cleaning up...")
        await asyncio.sleep(0.5)  # Simulate cleanup
        print("Cleanup complete")
        raise  # Re-raise the CancelledError
async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(3)  # Let the task run for a bit
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")
asyncio.run(main())

Output:

Working...
Working...
Task was cancelled, cleaning up...
Cleanup complete
Main: task was cancelled

This example shows how to handle CancelledError to perform cleanup operations before the task is fully cancelled.

Using try…finally for resource management

Here’s an example of using try/finally for proper resource management:

async def fetch_and_process(url):
    session = aiohttp.ClientSession()
    try:
        async with session.get(url) as response:
            data = await response.text()
            return process_data(data)
    finally:
        await session.close()
def process_data(data):
    return len(data)
result = asyncio.run(fetch_and_process("https://example.com"))
print(f"Processed data length: {result}")

Output:

Processed data length: 1256

This example shows how to use try/finally to ensure that the ClientSession is properly closed, regardless of whether an exception occurs or not.

 

Handling Exceptions in Streaming Responses

Deal with incomplete data (Interruption before completion)

When dealing with streaming responses, you need to handle potential incomplete data:

async def stream_data(url):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url) as response:
                async for chunk in response.content.iter_chunked(1024):
                    process_chunk(chunk)
        except aiohttp.ClientPayloadError:
            print("Data stream was interrupted")
def process_chunk(chunk):
    print(f"Processed chunk of size {len(chunk)}")
asyncio.run(stream_data("https://example.com"))

This code handles ClientPayloadError, which can occur if the data stream is interrupted before completion.

Manage connection drops during streaming

To manage connection drops during streaming:

async def stream_with_retry(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    async for chunk in response.content.iter_chunked(1024):
                        process_chunk(chunk)
            break  # If we get here, streaming completed successfully
        except aiohttp.ClientError as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                print("Max retries reached, giving up")
                raise
def process_chunk(chunk):
    print(f"Processed chunk of size {len(chunk)}")
asyncio.run(stream_with_retry("https://example.com"))

This example implements a retry mechanism for streaming data so you can recover from temporary connection issues.

Leave a Reply

Your email address will not be published. Required fields are marked *