Async

Concurrent programming with an event loop is a relatively new concept in Python 3.x. This module aims to highlight how it could be used in the context of a scheduler which runs a fire-and-forget operation for starting jobs. In the real world, it takes time for a scheduler to start a job (i.e. hit an API endpoint, ask the operating system for resources) so we assume that starting a job has some intrinsic delay.

import asyncio
from dataclasses import dataclass
from datetime import datetime
from uuid import uuid4

# Module-level constants
_DELAY_SMALL = .001
_DELAY_LARGE = 3600


@dataclass
class JobRecord:
    """Job record with useful metadata."""

    guid: str
    queued_at: datetime
    started_at: datetime


def _is_valid_record(record):
    """Check whether job record is valid or not."""
    return record.queued_at < record.started_at


def _current_time():
    """Return current time that is timezone-naive."""
    return datetime.now()


async def start_job(job_id, delay):
    """Start job ID after a certain amount of delay."""
    queue_time = _current_time()
    await asyncio.sleep(delay)
    start_time = _current_time()
    return JobRecord(job_id, queue_time, start_time)


async def schedule_jobs():
    """Schedule jobs concurrently."""
    # Start a job which also represents a coroutine
    single_job = start_job(uuid4().hex, _DELAY_SMALL)
    assert asyncio.iscoroutine(single_job)

    # Grab a job record from the coroutine
    single_record = await single_job
    assert _is_valid_record(single_record)

    # Task is a wrapped coroutine which also represents a future
    single_task = asyncio.create_task(start_job(uuid4().hex, _DELAY_LARGE))
    assert asyncio.isfuture(single_task)

    # Futures are different from other coroutines since they can be cancelled
    single_task.cancel()
    task_failed = False
    try:
        await single_task
    except asyncio.exceptions.CancelledError:
        assert single_task.cancelled()
        task_failed = True
    assert task_failed is True

    # Gather coroutines for batch start
    batch_jobs = [start_job(uuid4().hex, _DELAY_SMALL) for _ in range(10)]
    batch_records = await asyncio.gather(*batch_jobs)

    # We get the same amount of records as we have coroutines
    assert len(batch_records) == len(batch_jobs)
    assert all(_is_valid_record(record) for record in batch_records)


def main():
    asyncio.run(schedule_jobs())


if __name__ == "__main__":
    main()