Caching Methods for aiohttp in Python
In this tutorial, you’ll learn various caching methods for aiohttp in Python.
We’ll cover in-memory caching, Redis caching, filesystem caching, HTTP caching headers, and caching middleware.
In-memory Caching
Using aiocache library
To implement in-memory caching with aiohttp, you can use the aiocache library. First, install it using pip:
pip install aiocache
Now, let’s create a simple aiohttp server with in-memory caching:
from aiohttp import web from aiocache import Cache from aiocache.serializers import JsonSerializer cache = Cache(Cache.MEMORY, serializer=JsonSerializer()) async def get_data(request): key = "example_key" cached_data = await cache.get(key) if cached_data is None: # Simulate data fetching data = {"message": "Hello, World!", "timestamp": time.time()} await cache.set(key, data, ttl=60) return web.json_response(data) return web.json_response(cached_data) app = web.Application() app.router.add_get('/', get_data) if __name__ == '__main__': web.run_app(app)
Output (first request):
{ "message": "Hello, World!", "timestamp": 1725266615.340784 }
Output (subsequent requests within 60 seconds):
{ "message": "Hello, World!", "timestamp": 1725266615.340784 }
The first request fetches and caches the data, while subsequent requests within 60 seconds return the cached data without recalculation.
Custom in-memory cache
You can also implement a simple custom in-memory cache using Python built-in dict:
import time from aiohttp import web class SimpleCache: def __init__(self): self.cache = {} async def get(self, key): if key in self.cache: value, expiry = self.cache[key] if expiry > time.time(): return value del self.cache[key] return None async def set(self, key, value, ttl): expiry = time.time() + ttl self.cache[key] = (value, expiry) cache = SimpleCache() async def get_data(request): key = "example_key" cached_data = await cache.get(key) if cached_data is None: data = {"message": "Hello, World!", "timestamp": time.time()} await cache.set(key, data, ttl=60) return web.json_response(data) return web.json_response(cached_data) app = web.Application() app.router.add_get('/', get_data) if __name__ == '__main__': web.run_app(app)
The output will be similar to the previous example, with the first request generating new data and subsequent requests within 60 seconds returning cached data.
Redis Caching
To use Redis for caching with aiohttp, you’ll need to install redis-py:
pip install redis
Now, let’s implement Redis caching:
from aiohttp import web import json import time from redis.asyncio import Redis async def create_redis_pool(): return Redis.from_url('redis://localhost') async def get_data(request): redis = request.app['redis'] key = "example_key" cached_data = await redis.get(key) if cached_data is None: data = {"message": "Hello, World!", "timestamp": time.time()} await redis.setex(key, 60, json.dumps(data)) return web.json_response(data) return web.json_response(json.loads(cached_data)) async def init_app(): app = web.Application() app['redis'] = await create_redis_pool() app.router.add_get('/', get_data) return app if __name__ == '__main__': web.run_app(init_app())
The output will be similar to the in-memory caching examples, but the data is now stored in Redis.
Caching Response Data in Redis
You can cache entire response data in Redis:
from aiohttp import web import json import time from redis.asyncio import Redis async def create_redis_pool(): return Redis.from_url('redis://localhost') async def get_data(request): redis = request.app['redis'] key = "response_cache" cached_response = await redis.get(key) if cached_response is None: data = {"message": "Hello, World!", "timestamp": time.time()} response = web.json_response(data) await redis.setex(key, 60, json.dumps({ "body": json.dumps(data), "headers": dict(response.headers), "status": response.status })) return response cached_data = json.loads(cached_response) return web.Response( body=cached_data["body"], headers=cached_data["headers"], status=cached_data["status"] ) async def init_app(): app = web.Application() app['redis'] = await create_redis_pool() app.router.add_get('/', get_data) return app if __name__ == '__main__': web.run_app(init_app())
This method caches the entire response including headers and status code.
Filesystem Caching
You can implement file-based caching for aiohttp using the aiofiles library:
pip install aiofiles
Now, let’s create a simple file-based cache:
import aiofiles import json import os import time from aiohttp import web CACHE_DIR = "cache" async def get_cached_data(key): file_path = os.path.join(CACHE_DIR, f"{key}.json") if os.path.exists(file_path): async with aiofiles.open(file_path, mode='r') as f: content = await f.read() data = json.loads(content) if data["expiry"] > time.time(): return data["value"] return None async def set_cached_data(key, value, ttl): os.makedirs(CACHE_DIR, exist_ok=True) file_path = os.path.join(CACHE_DIR, f"{key}.json") data = { "value": value, "expiry": time.time() + ttl } async with aiofiles.open(file_path, mode='w') as f: await f.write(json.dumps(data)) async def get_data(request): key = "example_key" cached_data = await get_cached_data(key) if cached_data is None: data = {"message": "Hello, World!", "timestamp": time.time()} await set_cached_data(key, data, ttl=60) return web.json_response(data) return web.json_response(cached_data) app = web.Application() app.router.add_get('/', get_data) if __name__ == '__main__': web.run_app(app)
This implementation stores cache data in JSON files within a “cache” directory.
HTTP Caching Headers
You can use Cache-Control headers to control caching behavior:
from aiohttp import web import time async def get_data(request): data = {"message": "Hello, World!", "timestamp": time.time()} response = web.json_response(data) response.headers['Cache-Control'] = 'public, max-age=60' return response app = web.Application() app.router.add_get('/', get_data) if __name__ == '__main__': web.run_app(app)
This sets the Cache-Control header to allow public caching for 60 seconds.
Using ETag and If-None-Match for Conditional Requests
You can implement ETag-based caching for conditional requests:
from aiohttp import web import hashlib import json import time async def get_data(request): data = {"message": "Hello, World!", "timestamp": time.time()} etag = hashlib.md5(json.dumps(data).encode()).hexdigest() if request.headers.get('If-None-Match') == etag: return web.Response(status=304) response = web.json_response(data) response.headers['ETag'] = etag return response app = web.Application() app.router.add_get('/', get_data) if __name__ == '__main__': web.run_app(app)
This method generates an ETag for the response and returns a 304 Not Modified status if the client’s ETag matches.
Caching Middleware
You can create a custom caching middleware for aiohttp:
from aiohttp import web import time class CacheMiddleware: def __init__(self): self.cache = {} async def __call__(self, app, handler): async def middleware(request): if request.method == 'GET': key = request.path cached_response = self.cache.get(key) if cached_response and cached_response['expiry'] > time.time(): return web.Response(**cached_response['data']) response = await handler(request) if request.method == 'GET' and response.status == 200: self.cache[request.path] = { 'expiry': time.time() + 60, 'data': { 'body': response.body, 'status': response.status, 'headers': dict(response.headers) } } return response return middleware async def hello(request): return web.Response(text=f"Hello, World! {time.time()}") app = web.Application(middlewares=[CacheMiddleware()]) app.router.add_get('/', hello) if __name__ == '__main__': web.run_app(app)
This middleware caches GET responses for 60 seconds.
Cache Invalidation Methods
Time-based Invalidation
Time-based invalidation is the simplest form of cache invalidation.
This method automatically invalidates cache entries after their TTL expires.
The above examples show how to implement time-based invalidation.
Event-driven Invalidation
Event-driven invalidation allows you to clear cache entries based on specific events:
from aiohttp import web import time class EventDrivenCache: def __init__(self): self.cache = {} async def get(self, key): return self.cache.get(key) async def set(self, key, value): self.cache[key] = value async def invalidate(self, key): if key in self.cache: del self.cache[key] async def get_data(request): cache = request.app['cache'] key = "example_key" cached_data = await cache.get(key) if cached_data is None: data = {"message": "Hello, World!", "timestamp": time.time()} await cache.set(key, data) return web.json_response(data) return web.json_response(cached_data) async def update_data(request): cache = request.app['cache'] key = "example_key" await cache.invalidate(key) return web.Response(text="Cache invalidated") app = web.Application() app['cache'] = EventDrivenCache() app.router.add_get('/', get_data) app.router.add_get('/update', update_data) if __name__ == '__main__': web.run_app(app)
This example allows you to invalidate the cache by making a GET request to /update.
Selective Cache Clearing
Selective cache clearing allows you to clear specific groups of cache entries:
from aiohttp import web import time class SelectiveCache: def __init__(self): self.cache = {} async def get(self, key): return self.cache.get(key) async def set(self, key, value, group=None): self.cache[key] = (value, group) async def clear_group(self, group): self.cache = {k: v for k, v in self.cache.items() if v[1] != group} async def get_data(request): cache = request.app['cache'] key = request.query.get('key', 'default') group = request.query.get('group', 'default') cached_data = await cache.get(key) if cached_data is None: data = {"message": f"Hello, {key}!", "timestamp": time.time()} await cache.set(key, data, group) return web.json_response(data) return web.json_response(cached_data[0]) async def clear_group(request): cache = request.app['cache'] group = request.query.get('group', 'default') await cache.clear_group(group) return web.Response(text=f"Group {group} cleared from cache") app = web.Application() app['cache'] = SelectiveCache() app.router.add_get('/', get_data) app.router.add_post('/clear', clear_group) if __name__ == '__main__': web.run_app(app)
This method allows you to clear cache entries by group.
Cache Serialization and Deserialization
To store and retrieve complex data structures in the cache, you can use JSON serialization:
import json import time from aiohttp import web class JSONCache: def __init__(self): self.cache = {} async def get(self, key): serialized_data = self.cache.get(key) if serialized_data: return json.loads(serialized_data) return None async def set(self, key, value): self.cache[key] = json.dumps(value) async def get_data(request): cache = request.app['cache'] key = request.path cached_data = await cache.get(key) if cached_data is None: data = {"message": "Complex data structure", "nested": {"a": 1, "b": 2}, "timestamp": time.time()} await cache.set(key, data) return web.json_response(data) return web.json_response(cached_data) app = web.Application() app['cache'] = JSONCache() app.router.add_get('/', get_data) if __name__ == '__main__': web.run_app(app)
Here we use JSON to serialize and deserialize cache data so you can cache complex data structures.
Mokhtar is the founder of LikeGeeks.com. He is a seasoned technologist and accomplished author, with expertise in Linux system administration and Python development. Since 2010, Mokhtar has built an impressive career, transitioning from system administration to Python development in 2015. His work spans large corporations to freelance clients around the globe. Alongside his technical work, Mokhtar has authored some insightful books in his field. Known for his innovative solutions, meticulous attention to detail, and high-quality work, Mokhtar continually seeks new challenges within the dynamic field of technology.