Caching Methods for aiohttp in Python

In this tutorial, you’ll learn various caching methods for aiohttp in Python.

We’ll cover in-memory caching, Redis caching, filesystem caching, HTTP caching headers, and caching middleware.

 

 

In-memory Caching

Using aiocache library

To implement in-memory caching with aiohttp, you can use the aiocache library. First, install it using pip:

pip install aiocache

Now, let’s create a simple aiohttp server with in-memory caching:

from aiohttp import web
from aiocache import Cache
from aiocache.serializers import JsonSerializer
cache = Cache(Cache.MEMORY, serializer=JsonSerializer())
async def get_data(request):
    key = "example_key"
    cached_data = await cache.get(key)
    if cached_data is None:
        # Simulate data fetching
        data = {"message": "Hello, World!", "timestamp": time.time()}
        await cache.set(key, data, ttl=60)
        return web.json_response(data)
    return web.json_response(cached_data)
app = web.Application()
app.router.add_get('/', get_data)
if __name__ == '__main__':
    web.run_app(app)

Output (first request):

{
    "message": "Hello, World!",
    "timestamp": 1725266615.340784
}

Output (subsequent requests within 60 seconds):

{
    "message": "Hello, World!",
    "timestamp": 1725266615.340784
}

The first request fetches and caches the data, while subsequent requests within 60 seconds return the cached data without recalculation.

Custom in-memory cache

You can also implement a simple custom in-memory cache using Python built-in dict:

import time
from aiohttp import web
class SimpleCache:
    def __init__(self):
        self.cache = {}
    async def get(self, key):
        if key in self.cache:
            value, expiry = self.cache[key]
            if expiry > time.time():
                return value
            del self.cache[key]
        return None
    async def set(self, key, value, ttl):
        expiry = time.time() + ttl
        self.cache[key] = (value, expiry)
cache = SimpleCache()
async def get_data(request):
    key = "example_key"
    cached_data = await cache.get(key)
    if cached_data is None:
        data = {"message": "Hello, World!", "timestamp": time.time()}
        await cache.set(key, data, ttl=60)
        return web.json_response(data)
    return web.json_response(cached_data)
app = web.Application()
app.router.add_get('/', get_data)
if __name__ == '__main__':
    web.run_app(app)

The output will be similar to the previous example, with the first request generating new data and subsequent requests within 60 seconds returning cached data.

 

Redis Caching

To use Redis for caching with aiohttp, you’ll need to install redis-py:

pip install redis

Now, let’s implement Redis caching:

from aiohttp import web
import json
import time
from redis.asyncio import Redis

async def create_redis_pool():
    return Redis.from_url('redis://localhost')

async def get_data(request):
    redis = request.app['redis']
    key = "example_key"
    cached_data = await redis.get(key)
    if cached_data is None:
        data = {"message": "Hello, World!", "timestamp": time.time()}
        await redis.setex(key, 60, json.dumps(data))
        return web.json_response(data)
    return web.json_response(json.loads(cached_data))

async def init_app():
    app = web.Application()
    app['redis'] = await create_redis_pool()
    app.router.add_get('/', get_data)
    return app
if __name__ == '__main__':
    web.run_app(init_app())

The output will be similar to the in-memory caching examples, but the data is now stored in Redis.

Caching Response Data in Redis

You can cache entire response data in Redis:

from aiohttp import web
import json
import time
from redis.asyncio import Redis
async def create_redis_pool():
    return Redis.from_url('redis://localhost')
async def get_data(request):
    redis = request.app['redis']
    key = "response_cache"
    cached_response = await redis.get(key)
    if cached_response is None:
        data = {"message": "Hello, World!", "timestamp": time.time()}
        response = web.json_response(data)
        await redis.setex(key, 60, json.dumps({
            "body": json.dumps(data),
            "headers": dict(response.headers),
            "status": response.status
        }))
        return response

    cached_data = json.loads(cached_response)
    return web.Response(
        body=cached_data["body"],
        headers=cached_data["headers"],
        status=cached_data["status"]
    )
async def init_app():
    app = web.Application()
    app['redis'] = await create_redis_pool()
    app.router.add_get('/', get_data)
    return app
if __name__ == '__main__':
    web.run_app(init_app())

This method caches the entire response including headers and status code.

 

Filesystem Caching

You can implement file-based caching for aiohttp using the aiofiles library:

pip install aiofiles

Now, let’s create a simple file-based cache:

import aiofiles
import json
import os
import time
from aiohttp import web
CACHE_DIR = "cache"
async def get_cached_data(key):
    file_path = os.path.join(CACHE_DIR, f"{key}.json")
    if os.path.exists(file_path):
        async with aiofiles.open(file_path, mode='r') as f:
            content = await f.read()
            data = json.loads(content)
            if data["expiry"] > time.time():
                return data["value"]
    return None

async def set_cached_data(key, value, ttl):
    os.makedirs(CACHE_DIR, exist_ok=True)
    file_path = os.path.join(CACHE_DIR, f"{key}.json")
    data = {
        "value": value,
        "expiry": time.time() + ttl
    }
    async with aiofiles.open(file_path, mode='w') as f:
        await f.write(json.dumps(data))

async def get_data(request):
    key = "example_key"
    cached_data = await get_cached_data(key)
    if cached_data is None:
        data = {"message": "Hello, World!", "timestamp": time.time()}
        await set_cached_data(key, data, ttl=60)
        return web.json_response(data)
    return web.json_response(cached_data)
app = web.Application()
app.router.add_get('/', get_data)
if __name__ == '__main__':
    web.run_app(app)

This implementation stores cache data in JSON files within a “cache” directory.

 

HTTP Caching Headers

You can use Cache-Control headers to control caching behavior:

from aiohttp import web
import time
async def get_data(request):
    data = {"message": "Hello, World!", "timestamp": time.time()}
    response = web.json_response(data)
    response.headers['Cache-Control'] = 'public, max-age=60'
    return response
app = web.Application()
app.router.add_get('/', get_data)
if __name__ == '__main__':
    web.run_app(app)

This sets the Cache-Control header to allow public caching for 60 seconds.

Using ETag and If-None-Match for Conditional Requests

You can implement ETag-based caching for conditional requests:

from aiohttp import web
import hashlib
import json
import time
async def get_data(request):
    data = {"message": "Hello, World!", "timestamp": time.time()}
    etag = hashlib.md5(json.dumps(data).encode()).hexdigest()
    if request.headers.get('If-None-Match') == etag:
        return web.Response(status=304)
    response = web.json_response(data)
    response.headers['ETag'] = etag
    return response
app = web.Application()
app.router.add_get('/', get_data)
if __name__ == '__main__':
    web.run_app(app)

This method generates an ETag for the response and returns a 304 Not Modified status if the client’s ETag matches.

 

Caching Middleware

You can create a custom caching middleware for aiohttp:

from aiohttp import web
import time
class CacheMiddleware:
    def __init__(self):
        self.cache = {}
    async def __call__(self, app, handler):
        async def middleware(request):
            if request.method == 'GET':
                key = request.path
                cached_response = self.cache.get(key)
                if cached_response and cached_response['expiry'] > time.time():
                    return web.Response(**cached_response['data'])
            response = await handler(request)
            if request.method == 'GET' and response.status == 200:
                self.cache[request.path] = {
                    'expiry': time.time() + 60,
                    'data': {
                        'body': response.body,
                        'status': response.status,
                        'headers': dict(response.headers)
                    }
                }
            return response
        return middleware
async def hello(request):
    return web.Response(text=f"Hello, World! {time.time()}")
app = web.Application(middlewares=[CacheMiddleware()])
app.router.add_get('/', hello)
if __name__ == '__main__':
    web.run_app(app)

This middleware caches GET responses for 60 seconds.

 

Cache Invalidation Methods

Time-based Invalidation

Time-based invalidation is the simplest form of cache invalidation.

This method automatically invalidates cache entries after their TTL expires.

The above examples show how to implement time-based invalidation.

Event-driven Invalidation

Event-driven invalidation allows you to clear cache entries based on specific events:

from aiohttp import web
import time
class EventDrivenCache:
    def __init__(self):
        self.cache = {}

    async def get(self, key):
        return self.cache.get(key)

    async def set(self, key, value):
        self.cache[key] = value

    async def invalidate(self, key):
        if key in self.cache:
            del self.cache[key]
async def get_data(request):
    cache = request.app['cache']
    key = "example_key"
    cached_data = await cache.get(key)
    if cached_data is None:
        data = {"message": "Hello, World!", "timestamp": time.time()}
        await cache.set(key, data)
        return web.json_response(data)
    return web.json_response(cached_data)
async def update_data(request):
    cache = request.app['cache']
    key = "example_key"
    await cache.invalidate(key)
    return web.Response(text="Cache invalidated")
app = web.Application()
app['cache'] = EventDrivenCache()
app.router.add_get('/', get_data)
app.router.add_get('/update', update_data)
if __name__ == '__main__':
    web.run_app(app)

This example allows you to invalidate the cache by making a GET request to /update.

Selective Cache Clearing

Selective cache clearing allows you to clear specific groups of cache entries:

from aiohttp import web
import time
class SelectiveCache:
    def __init__(self):
        self.cache = {}

    async def get(self, key):
        return self.cache.get(key)

    async def set(self, key, value, group=None):
        self.cache[key] = (value, group)

    async def clear_group(self, group):
        self.cache = {k: v for k, v in self.cache.items() if v[1] != group}
async def get_data(request):
    cache = request.app['cache']
    key = request.query.get('key', 'default')
    group = request.query.get('group', 'default')
    cached_data = await cache.get(key)
    if cached_data is None:
        data = {"message": f"Hello, {key}!", "timestamp": time.time()}
        await cache.set(key, data, group)
        return web.json_response(data)
    return web.json_response(cached_data[0])
async def clear_group(request):
    cache = request.app['cache']
    group = request.query.get('group', 'default')
    await cache.clear_group(group)
    return web.Response(text=f"Group {group} cleared from cache")
app = web.Application()
app['cache'] = SelectiveCache()
app.router.add_get('/', get_data)
app.router.add_post('/clear', clear_group)
if __name__ == '__main__':
    web.run_app(app)

This method allows you to clear cache entries by group.

 

Cache Serialization and Deserialization

To store and retrieve complex data structures in the cache, you can use JSON serialization:

import json
import time
from aiohttp import web
class JSONCache:
    def __init__(self):
        self.cache = {}

    async def get(self, key):
        serialized_data = self.cache.get(key)
        if serialized_data:
            return json.loads(serialized_data)
        return None

    async def set(self, key, value):
        self.cache[key] = json.dumps(value)

async def get_data(request):
    cache = request.app['cache']
    key = request.path
    cached_data = await cache.get(key)
    if cached_data is None:
        data = {"message": "Complex data structure", "nested": {"a": 1, "b": 2}, "timestamp": time.time()}
        await cache.set(key, data)
        return web.json_response(data)
    return web.json_response(cached_data)
app = web.Application()
app['cache'] = JSONCache()
app.router.add_get('/', get_data)
if __name__ == '__main__':
    web.run_app(app)

Here we use JSON to serialize and deserialize cache data so you can cache complex data structures.

Leave a Reply

Your email address will not be published. Required fields are marked *