aiohttp Exception Handling: Comprehensive Tutorial
In this tutorial, you’ll learn how to handle exceptions in aiohttp.
You’ll explore various types of exceptions, implement custom error handling, and manage errors in aiohttp applications.
Handle Client-side Exceptions
To handle specific client-side exceptions, you can catch them individually.
Here’s an example:
import aiohttp import asyncio async def fetch(url): try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=1)) as response: return await response.json() except aiohttp.ClientConnectorError: print("Connection error occurred") except asyncio.TimeoutError: print("Request timed out") except aiohttp.InvalidURL: print("Invalid URL provided") except aiohttp.ContentTypeError: print("Unexpected content type in response") asyncio.run(fetch("https://microsoft.com"))
Output:
Request timed out
In this case, the request timed out because we set a very short timeout of 1 second.
If we change the timeout to 7 seconds, the timeout issue should be resolved:
Unexpected content type in response
As you can see, this error because the website doesn’t return json as we expect.
This example shows how to handle different types of client-side exceptions.
Custom Exception Handling
You can create custom exception classes to handle application-specific errors:
class APIError(Exception): def __init__(self, status_code, message): self.status_code = status_code self.message = message super().__init__(f"API Error: {status_code} - {message}") async def fetch_api_data(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status >= 400: raise APIError(response.status, await response.text()) return await response.json() try: data = asyncio.run(fetch_api_data("https://api.github.com/nonexistent")) except APIError as e: print(f"Caught custom exception: {e}")
Output:
Caught custom exception: API Error: 404 - {"message":"Not Found","documentation_url":"https://docs.github.com/rest","status":"404"}
This example shows how to create and use a custom APIError
exception.
It provides more context about API-specific errors, including the status code and error message.
Implement str and repr methods
You can improve your custom exceptions by implementing __str__
and __repr__
methods:
class APIError(Exception): def __init__(self, status_code, message): self.status_code = status_code self.message = message super().__init__(f"API Error: {status_code} - {message}") def __str__(self): return f"APIError(status_code={self.status_code}, message='{self.message}')" def __repr__(self): return self.__str__() error = APIError(404, "Not Found") print(str(error)) print(repr(error))
Output:
APIError(status_code=404, message='Not Found') APIError(status_code=404, message='Not Found')
By implementing __str__
and __repr__
methods, you provide a clear and informative representation of your custom exception.
Exception propagation in aiohttp applications
You can allow exceptions to bubble up to be handled at a higher level:
async def fetch_data(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status != 200: raise aiohttp.ClientResponseError( response.request_info, response.history, status=response.status, message=f"HTTP Error {response.status}", ) return await response.json() async def main(): try: data = await fetch_data("https://api.github.com/nonexistent") except aiohttp.ClientResponseError as e: print(f"Caught exception in main: {e}") asyncio.run(main())
Output:
Caught exception in main: 404, message='HTTP Error 404'
This example shows how exceptions can bubble up from fetch_data
to main
.
Catch and re-raise exceptions
Sometimes you might want to catch an exception, perform some action, and then re-raise it:
async def fetch_data(url): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() except aiohttp.ClientError as e: print(f"Logging error: {e}") raise async def main(): try: data = await fetch_data("https://github.com/nonexistent") except aiohttp.ClientError as e: print(f"Handling error in main: {e}") asyncio.run(main())
Output:
Logging error: 0, message='Attempt to decode JSON with unexpected mimetype: text/html; charset=utf-8', url=URL('https://github.com/nonexistent') Handling error in main: 0, message='Attempt to decode JSON with unexpected mimetype: text/html; charset=utf-8', url=URL('https://github.com/nonexistent')
This code catches the exception in fetch_data
, logs it, and then re-raises it to be handled in main
. This allows for both local and global error handling.
Error Handling Middleware
You can create middleware to handle exceptions across your entire aiohttp application:
from aiohttp import web @web.middleware async def error_middleware(request, handler): try: response = await handler(request) return response except web.HTTPException: raise except Exception as ex: print(f"Unhandled exception: {ex}") return web.json_response({'error': 'Internal Server Error'}, status=500)
This middleware catches all unhandled exceptions and returns a JSON response with a 500 status code.
To use the middleware, add it to your aiohttp application:
app = web.Application(middlewares=[error_middleware]) async def hello(request): return web.Response(text="Hello, World!") app.router.add_get('/', hello) web.run_app(app)
This code adds the error_middleware
to the application, ensuring all requests pass through it.
You can customize error responses in your middleware:
@web.middleware async def error_middleware(request, handler): try: response = await handler(request) return response except web.HTTPException as ex: return web.json_response({'error': ex.reason}, status=ex.status) except Exception as ex: print(f"Unhandled exception: {ex}") return web.json_response({'error': 'Internal Server Error'}, status=500)
This middleware provides custom JSON responses for different types of errors.
Graceful Shutdown and Cleanup
You can handle cancellation exceptions during shutdown:
import asyncio async def long_running_task(): try: while True: await asyncio.sleep(1) print("Working...") except asyncio.CancelledError: print("Task was cancelled, cleaning up...") await asyncio.sleep(0.5) # Simulate cleanup print("Cleanup complete") raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(long_running_task()) await asyncio.sleep(3) # Let the task run for a bit task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())
Output:
Working... Working... Task was cancelled, cleaning up... Cleanup complete Main: task was cancelled
This example shows how to handle CancelledError
to perform cleanup operations before the task is fully cancelled.
Using try…finally for resource management
Here’s an example of using try
/finally
for proper resource management:
async def fetch_and_process(url): session = aiohttp.ClientSession() try: async with session.get(url) as response: data = await response.text() return process_data(data) finally: await session.close() def process_data(data): return len(data) result = asyncio.run(fetch_and_process("https://example.com")) print(f"Processed data length: {result}")
Output:
Processed data length: 1256
This example shows how to use try
/finally
to ensure that the ClientSession
is properly closed, regardless of whether an exception occurs or not.
Handling Exceptions in Streaming Responses
Deal with incomplete data (Interruption before completion)
When dealing with streaming responses, you need to handle potential incomplete data:
async def stream_data(url): async with aiohttp.ClientSession() as session: try: async with session.get(url) as response: async for chunk in response.content.iter_chunked(1024): process_chunk(chunk) except aiohttp.ClientPayloadError: print("Data stream was interrupted") def process_chunk(chunk): print(f"Processed chunk of size {len(chunk)}") asyncio.run(stream_data("https://example.com"))
This code handles ClientPayloadError
, which can occur if the data stream is interrupted before completion.
Manage connection drops during streaming
To manage connection drops during streaming:
async def stream_with_retry(url, max_retries=3): for attempt in range(max_retries): try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: async for chunk in response.content.iter_chunked(1024): process_chunk(chunk) break # If we get here, streaming completed successfully except aiohttp.ClientError as e: print(f"Attempt {attempt + 1} failed: {e}") if attempt == max_retries - 1: print("Max retries reached, giving up") raise def process_chunk(chunk): print(f"Processed chunk of size {len(chunk)}") asyncio.run(stream_with_retry("https://example.com"))
This example implements a retry mechanism for streaming data so you can recover from temporary connection issues.
Mokhtar is the founder of LikeGeeks.com. He is a seasoned technologist and accomplished author, with expertise in Linux system administration and Python development. Since 2010, Mokhtar has built an impressive career, transitioning from system administration to Python development in 2015. His work spans large corporations to freelance clients around the globe. Alongside his technical work, Mokhtar has authored some insightful books in his field. Known for his innovative solutions, meticulous attention to detail, and high-quality work, Mokhtar continually seeks new challenges within the dynamic field of technology.