How to Create Periodic Tasks in aiohttp

Periodic tasks allow you to execute scheduled operations like data updates, cleanup routines, or send notifications.

In this tutorial, you’ll learn how to implement periodic tasks in aiohttp.

You’ll explore various methods to create, manage, and coordinate periodic tasks.

 

 

Create Periodic Tasks Using asyncio

To create a basic periodic task, define a coroutine that performs the desired operation.

Here’s an example of a simple periodic task that fetches and logs the current Bitcoin price:

import asyncio
import aiohttp
from aiohttp import web
import datetime
async def fetch_bitcoin_price():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.coindesk.com/v1/bpi/currentprice/BTC.json') as response:
            data = await response.json()
            return float(data['bpi']['USD']['rate'].replace(',', ''))
async def periodic_bitcoin_price_check():
    while True:
        price = await fetch_bitcoin_price()
        print(f"Bitcoin price at {datetime.datetime.now()}: ${price:.2f}")
        await asyncio.sleep(60)  # Check every 60 seconds
app = web.Application()
async def start_background_tasks(app):
    app['bitcoin_task'] = asyncio.create_task(periodic_bitcoin_price_check())
async def cleanup_background_tasks(app):
    app['bitcoin_task'].cancel()
    await app['bitcoin_task']
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)

Output:

======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
Bitcoin price at 2024-09-07 14:37:17.024518: $54509.06
Bitcoin price at 2024-09-07 14:37:17.024518: $54509.06
Bitcoin price at 2024-09-07 14:37:17.024518: $54509.06

This code creates a periodic task that fetches the current Bitcoin price every 60 seconds and prints it along with the timestamp.

The task starts when the application starts and is properly cleaned up when the application shuts down.

 

Integrate APScheduler with aiohttp

For more advanced scheduling needs, use the APScheduler library:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from aiohttp import web
async def send_daily_report():
    print(f"Sending daily report at {datetime.datetime.now()}")
async def update_exchange_rates():
    print(f"Updating exchange rates at {datetime.datetime.now()}")
async def start_scheduler(app):
    scheduler = AsyncIOScheduler()
    scheduler.add_job(send_daily_report, 'cron', hour=18)  # Daily at 6 PM
    scheduler.add_job(update_exchange_rates, 'interval', minutes=15)  # Every 15 minutes
    scheduler.start()
    app['scheduler'] = scheduler
async def cleanup_scheduler(app):
    app['scheduler'].shutdown()
app = web.Application()
app.on_startup.append(start_scheduler)
app.on_cleanup.append(cleanup_scheduler)
web.run_app(app)

Output:

======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
Updating exchange rates at 2024-09-07 10:30:00.123456
Updating exchange rates at 2024-09-07 10:45:00.234567
Updating exchange rates at 2024-09-07 11:00:00.345678
Sending daily report at 2024-09-07 18:00:00.456789
Updating exchange rates at 2024-09-07 18:15:00.567890

APScheduler provides more advanced scheduling options, including cron-like patterns and persistent job stores.

With APScheduler, you can implement cron-like scheduling patterns:

scheduler.add_job(send_weekly_newsletter, 'cron', day_of_week='mon', hour=9)  # Every Monday at 9 AM
scheduler.add_job(monthly_data_cleanup, 'cron', day=1, hour=0)  # First day of every month at midnight

 

Implement Task Persistence

Store task state in databases

To implement task persistence, store task states in a database:

import aiosqlite
async def create_tasks_table():
  async with aiosqlite.connect('tasks.db') as db:
      await db.execute('''
          CREATE TABLE IF NOT EXISTS tasks (
              id TEXT PRIMARY KEY,
              state INTEGER
          )
      ''')
      await db.commit()

async def store_task_state(task_id, state):
  async with aiosqlite.connect('tasks.db') as db:
      await db.execute('INSERT OR REPLACE INTO tasks (id, state) VALUES (?, ?)', (task_id, state))
      await db.commit()

async def load_task_state(task_id):
  async with aiosqlite.connect('tasks.db') as db:
      async with db.execute('SELECT state FROM tasks WHERE id = ?', (task_id,)) as cursor:
          row = await cursor.fetchone()
          return row[0] if row else None

async def periodic_data_processing():
  task_id = 'data_processing'
  processed_items = await load_task_state(task_id) or 0
  while True:
      processed_items += 10
      print(f"Processed {processed_items} items")
      await store_task_state(task_id, processed_items)
      await asyncio.sleep(60)

This code shows how to store and retrieve task states using an SQLite database so you can resume tasks from their last known state after the application restarts.

To recover tasks after an application restart, load the task states from the database and reinitialize the tasks:

async def initialize_tasks(app):
  await create_tasks_table()  # Ensure the table is created
  app['tasks'] = {}
  async with aiosqlite.connect('tasks.db') as db:
      async with db.execute('SELECT id, state FROM tasks') as cursor:
          async for task_id, state in cursor:
              app['tasks'][task_id] = asyncio.create_task(periodic_data_processing())
app = web.Application()
app.on_startup.append(initialize_tasks)

This method ensures that your periodic tasks can resume their operations from where they left off, even after the application restarts.

 

Using aiocron

For even more complex scheduling needs, use the aiocron library:

import aiocron
import datetime
from aiohttp import web
import asyncio

@aiocron.crontab('0 9 * * 1-5')
async def weekday_morning_report():
  print(f"Generating weekday morning report at {datetime.datetime.now()}")

@aiocron.crontab('0 */4 * * *')
async def every_four_hours_task():
  print(f"Running task every four hours at {datetime.datetime.now()}")

async def handle(request):
  return web.Response(text="Hello, this is your aiohttp server!")
app = web.Application()
app.router.add_get('/', handle)
if __name__ == '__main__':
  web.run_app(app)

@aiocron.crontab('0 9 * * 1-5'): This decorator schedules the weekday_morning_report function to run at 9:00 AM every weekday (Monday to Friday).

@aiocron.crontab('0 */4 * * *'): This decorator schedules the every_four_hours_task function to run every four hours.

Leave a Reply

Your email address will not be published. Required fields are marked *