How to Retry Requests in AIOHTTP Using tenacity
In this tutorial, you’ll learn how to implement retry logic in aiohttp
using both its built-in features and the tenacity
library.
You’ll learn how to handle various HTTP errors, manage connection and timeout issues, and customize retry strategies.
Built-in Retry Functionality in aiohttp
To enable retries in aiohttp
, you can configure the ClientSession
with retry options.
This allows you to handle transient errors gracefully.
import aiohttp import asyncio async def fetch_with_retries(url): async with aiohttp.ClientSession() as session: for attempt in range(3): try: async with session.get(url) as response: return await response.text() except aiohttp.ClientError as e: print(f"Attempt {attempt + 1} failed: {e}") await asyncio.sleep(1) return None asyncio.run(fetch_with_retries('https://whateverdommain.com/'))
Output:
Attempt 1 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 2 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 3 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed]
The code attempts to fetch a URL up to three times and prints an error message if a request fails.
It waits one second between attempts.
You can adjust the number of retry attempts to suit your needs.
for attempt in range(5): # Retry logic
Output:
Attempt 1 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 2 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 3 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 4 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 5 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed]
Increasing the retry attempts provides more opportunities to recover from transient errors.
You can add delays between retries to avoid overwhelming the server and to give it time to recover.
await asyncio.sleep(2) # Increase delay to 2 seconds
Longer delays between retries can help reduce server load and improve the chances of a successful request.
Using tenacity for Retry Mechanism
You can integrate tenacity
with aiohttp
to enhance your retry logic.
from tenacity import retry, stop_after_attempt, wait_fixed import aiohttp import asyncio @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) async def fetch_with_tenacity(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() try: asyncio.run(fetch_with_tenacity('https://whateverdommain.com/')) except Exception as e: print(f"Failed after 3 retries: {e}")
Output:
Failed after 3 retries: RetryError[]
The tenacity
library automatically retries the request up to three times and waits one second between attempts.
Retry on Specific HTTP Status Codes
You can retry requests based on specific HTTP status codes to handle server errors.
from tenacity import retry, stop_after_attempt, wait_fixed import aiohttp import asyncio from tenacity import retry, retry_if_exception def is_server_error(exception): return isinstance(exception, aiohttp.ClientResponseError) and exception.status in {500, 502, 503, 504} @retry(retry=retry_if_exception(is_server_error), stop=stop_after_attempt(3)) async def fetch_on_server_error(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() asyncio.run(fetch_on_server_error('http://localhost:8080'))
The code retries requests only for server errors (5xx) and ensures that client errors (4xx) are not retried unnecessarily.
I created a Python script that simulates 500 errors to make this script work and here’s the server script:
from http.server import BaseHTTPRequestHandler, HTTPServer class ErrorHandler(BaseHTTPRequestHandler): def do_GET(self): self.send_error(500, "Internal Server Error") def run(server_class=HTTPServer, handler_class=ErrorHandler, port=8080): server_address = ('', port) httpd = server_class(server_address, handler_class) print(f'Starting server on port {port}...') httpd.serve_forever() if __name__ == '__main__': run()
Retry on Connection and Timeout Errors
You can handle network-related errors by retrying requests when such exceptions occur.
from tenacity import retry, retry_if_exception_type @retry(retry=retry_if_exception_type((aiohttp.ClientConnectionError, asyncio.TimeoutError)), stop=stop_after_attempt(3)) async def fetch_on_network_error(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() asyncio.run(fetch_on_network_error('https://whateverdommain.com/'))
The code retries requests when network-related errors occur, such as connection resets or timeouts.
You can customize the retry logic to handle different exceptions.
def is_specific_error(exception): return isinstance(exception, aiohttp.ClientResponseError) and exception.status == 429 @retry(retry=retry_if_exception(is_specific_error), stop=stop_after_attempt(3)) async def fetch_on_specific_error(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() asyncio.run(fetch_on_specific_error('https://whateverdommain.com/'))
The code retries requests only when HTTP 429 (Too Many Requests), occurs.
Customize Retry Behavior
You can create custom functions to decide whether to retry based on the response content.
def should_retry(response): return response.status == 503 @retry(retry=retry_if_exception(should_retry), stop=stop_after_attempt(3)) async def fetch_with_custom_logic(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() asyncio.run(fetch_with_custom_logic('https://whateverdommain.com/'))
The code uses a custom function to retry requests only when the server returns a 503 status code.
You can enhance retry logic by adding custom headers or logging each retry attempt.
import logging logging.basicConfig(level=logging.INFO) @retry(stop=stop_after_attempt(3)) async def fetch_with_logging(url): async with aiohttp.ClientSession() as session: for attempt in range(3): try: async with session.get(url, headers={'Custom-Header': 'value'}) as response: return await response.text() except aiohttp.ClientError as e: logging.info(f"Attempt {attempt + 1} failed: {e}") await asyncio.sleep(1) asyncio.run(fetch_with_logging('https://whateverdommain.com/'))
The code logs each retry attempt and includes a custom header in the request.
Handle Retry for Concurrent Requests
Handle retries when making multiple concurrent requests using aiohttp
.
async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_with_retries(url) for url in urls] return await asyncio.gather(*tasks) urls = ['https://whateverdommain.com/', 'https://whateverdommain2.com/'] asyncio.run(fetch_all(urls))
Output:
Attempt 1 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 1 failed: Cannot connect to host whateverdommain2.com:443 ssl:default [getaddrinfo failed] Attempt 2 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 2 failed: Cannot connect to host whateverdommain2.com:443 ssl:default [getaddrinfo failed] Attempt 3 failed: Cannot connect to host whateverdommain.com:443 ssl:default [getaddrinfo failed] Attempt 3 failed: Cannot connect to host whateverdommain2.com:443 ssl:default [getaddrinfo failed]
The code manages retries for multiple concurrent requests and makes each request independently.
Avoid race Conditions and Ensure Thread Safety
To ensure thread safety and avoid race conditions when implementing retries for concurrent requests, you can use aiohttp
‘s ClientSession
within each coroutine:
import asyncio import aiohttp from tenacity import retry, stop_after_attempt @retry(stop=stop_after_attempt(3)) async def fetch_safe(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def fetch_all_safe(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_safe(url) for url in urls] return await asyncio.gather(*tasks) urls = ['https://jsonplaceholder.typicode.com/posts/1', 'https://jsonplaceholder.typicode.com/posts/2'] asyncio.run(fetch_all_safe(urls))
Mokhtar is the founder of LikeGeeks.com. He is a seasoned technologist and accomplished author, with expertise in Linux system administration and Python development. Since 2010, Mokhtar has built an impressive career, transitioning from system administration to Python development in 2015. His work spans large corporations to freelance clients around the globe. Alongside his technical work, Mokhtar has authored some insightful books in his field. Known for his innovative solutions, meticulous attention to detail, and high-quality work, Mokhtar continually seeks new challenges within the dynamic field of technology.