Python Async Await Explained With Real Examples

Most Python async tutorials stop at asyncio.sleep() and call it a day. This one covers the event loop internals, real concurrent I/O patterns, production pitfalls like race conditions and forgotten awaits, and how to actually test async code — because async bugs in production are a different beast than anything a basic tutorial prepares you for.

Understanding Async/Await Fundamentals: From Coroutines to the Event Loop

Detailed view of programming code in a dark theme on a computer screen.
Photo by Stanislav Kondratiev on Pexels

What Are Coroutines and How async/await Keywords Work

The single biggest misconception: async def does not run your function. It returns a coroutine object. Nothing executes until something drives that coroutine — either await or the event loop.

# WRONG — this does nothing
import asyncio

async def fetch_data():
    print("Fetching...")
    await asyncio.sleep(1)
    return {"status": "ok"}

result = fetch_data()  # Returns <coroutine object fetch_data at 0x...>
print(result)          # RuntimeWarning: coroutine 'fetch_data' was never awaited
# RIGHT — drive the coroutine properly
import asyncio

async def fetch_data():
    print("Fetching...")
    await asyncio.sleep(1)
    return {"status": "ok"}

result = asyncio.run(fetch_data())  # Actually executes
print(result)  # {"status": "ok"}

The await keyword does two things: it suspends the current coroutine and hands control back to the event loop, which can then run other tasks. This is fundamentally different from time.sleep(), which blocks the entire thread — no other coroutine gets to run during that pause.

import asyncio
import time

# Blocks the event loop for 2 seconds — every other task waits
async def bad_sleep():
    time.sleep(2)  # Never do this inside an async function

# Suspends for 2 seconds — other tasks run freely during this time
async def good_sleep():
    await asyncio.sleep(2)

The Event Loop Mechanism: How Python Schedules Async Tasks

The event loop is a single-threaded scheduler. It keeps a queue of ready tasks and runs them one at a time. When a task hits an await on an I/O operation, the loop marks it as suspended and picks up the next ready task. No threads, no parallelism — pure cooperative multitasking.

import asyncio

async def task_a():
    print("A: start")
    await asyncio.sleep(1)  # Yields control here
    print("A: done")

async def task_b():
    print("B: start")
    await asyncio.sleep(0.5)
    print("B: done")

async def main():
    # Both tasks are scheduled before either runs
    await asyncio.gather(task_a(), task_b())

asyncio.run(main())
# Output:
# A: start
# B: start
# B: done   (B's sleep was shorter)
# A: done

For intermediate use cases where you need explicit loop control — say, integrating with legacy sync code — you can manage the loop manually. But asyncio.run() is the right default for everything else:

import asyncio

async def my_coroutine():
    await asyncio.sleep(0.1)
    return "done"

# Manual loop management (legacy pattern, avoid unless necessary)
loop = asyncio.new_event_loop()
result = loop.run_until_complete(my_coroutine())
loop.close()
print(result)

# Modern standard — prefer this always
result = asyncio.run(my_coroutine())

Critical Rule: Where await Can and Cannot Live

await only works inside an async def function. Put it anywhere else and Python throws a SyntaxError immediately. The subtler mistake is calling an async function without awaiting it inside an async context — Python won't crash right away, but the coroutine silently never runs.

import asyncio

async def get_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate DB call
    return {"id": user_id, "name": "Alice"}

async def main():
    # WRONG — coroutine object, not the result
    user = get_user(1)
    print(user)  # <coroutine object get_user at 0x...>

    # RIGHT
    user = await get_user(1)
    print(user)  # {"id": 1, "name": "Alice"}

asyncio.run(main())

Practical Async/Await Patterns: Concurrent I/O Examples

Fetching Multiple URLs Concurrently

This is where async pays off. Sequential HTTP requests are slow because each one waits for a server response before starting the next. With asyncio.gather(), all requests fire simultaneously and you collect results when they're all done.

import asyncio
import httpx
import time

URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

# Sequential — takes ~3 seconds
async def fetch_sequential(urls):
    results = []
    async with httpx.AsyncClient() as client:
        for url in urls:
            response = await client.get(url, timeout=10)
            results.append(response.status_code)
    return results

# Concurrent with gather — takes ~1 second
async def fetch_concurrent(urls):
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url, timeout=10) for url in urls]
        responses = await asyncio.gather(*tasks)
    return [r.status_code for r in responses]

# Concurrent with timeout handling per request
async def fetch_with_timeout(urls):
    async with httpx.AsyncClient() as client:
        async def safe_get(url):
            try:
                return await asyncio.wait_for(client.get(url), timeout=5.0)
            except asyncio.TimeoutError:
                return None

        results = await asyncio.gather(*[safe_get(u) for u in urls])
    return results

async def main():
    start = time.monotonic()
    await fetch_sequential(URLS)
    print(f"Sequential: {time.monotonic() - start:.2f}s")

    start = time.monotonic()
    await fetch_concurrent(URLS)
    print(f"Concurrent: {time.monotonic() - start:.2f}s")

asyncio.run(main())
# Sequential: ~3.0s
# Concurrent: ~1.0s

Managing Database Queries and File Operations Asynchronously

Async context managers (async with) are the standard pattern for resources that need cleanup — database connections, file handles, network sessions. The __aenter__ and __aexit__ methods are awaitable, so teardown is non-blocking.

import asyncio
import aiosqlite
import aiofiles

# Async database query
async def get_users():
    async with aiosqlite.connect("app.db") as db:
        async with db.execute("SELECT id, name FROM users LIMIT 10") as cursor:
            rows = await cursor.fetchall()
    return rows

# Async file read
async def read_config(path: str) -> str:
    async with aiofiles.open(path, mode="r") as f:
        return await f.read()

# Connection pool pattern (asyncpg for PostgreSQL)
# Pool exhaustion pitfall: creating a new connection per query
async def bad_pattern():
    import asyncpg
    for _ in range(100):
        conn = await asyncpg.connect("postgresql://localhost/mydb")
        await conn.fetch("SELECT 1")
        await conn.close()  # 100 connect/disconnect cycles — slow and wasteful

# Right: create the pool once, reuse connections
async def good_pattern():
    import asyncpg
    pool = await asyncpg.create_pool("postgresql://localhost/mydb", min_size=2, max_size=10)
    async with pool.acquire() as conn:
        result = await conn.fetch("SELECT 1")
    await pool.close()
    return result

Building a Rate-Limited Async API Consumer

Real-world scraping and API consumption need rate limiting. Without it, you'll either get throttled or hammer a server into the ground. asyncio.Semaphore caps concurrent requests cleanly.

import asyncio
import logging
import httpx

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def fetch_with_retry(client: httpx.AsyncClient, url: str, retries: int = 3) -> dict:
    for attempt in range(retries):
        try:
            response = await client.get(url, timeout=10)
            response.raise_for_status()
            return response.json()
        except (httpx.HTTPStatusError, httpx.RequestError) as e:
            wait = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
            logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {wait}s.")
            await asyncio.sleep(wait)
    logger.error(f"All retries exhausted for {url}")
    return {}

async def bounded_fetch(semaphore: asyncio.Semaphore, client: httpx.AsyncClient, url: str) -> dict:
    async with semaphore:  # Max N concurrent requests at a time
        return await fetch_with_retry(client, url)

async def scrape_urls(urls: list[str], max_concurrent: int = 5) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as client:
        tasks = [bounded_fetch(semaphore, client, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return [r for r in results if not isinstance(r, Exception)]

Advanced Patterns and Production Pitfalls: Where Async Code Breaks

A person reads 'Python for Unix and Linux System Administration' indoors.
Photo by Christina Morillo on Pexels

Debugging Race Conditions, Deadlocks, and Forgotten Awaits

Shared mutable state in async code is a race condition waiting to happen. Even though async is single-threaded, task switches at await points mean two tasks can interleave reads and writes on the same variable.

import asyncio

# WRONG — race condition on shared counter
counter = 0

async def increment_bad():
    global counter
    value = counter       # Task A reads 0
    await asyncio.sleep(0)  # Task B runs, also reads 0, writes 1
    counter = value + 1   # Task A writes 1 — lost the update from Task B

async def run_bad():
    await asyncio.gather(*[increment_bad() for _ in range(100)])
    print(f"Expected 100, got: {counter}")  # Will be less than 100

# RIGHT — use asyncio.Lock()
lock = asyncio.Lock()
safe_counter = 0

async def increment_safe():
    global safe_counter
    async with lock:
        safe_counter += 1

async def run_safe():
    await asyncio.gather(*[increment_safe() for _ in range(100)])
    print(f"Got: {safe_counter}")  # Always 100

Forgotten task references cause silent failures. If you call asyncio.create_task() and don't store the result, the task can be garbage collected mid-execution.

import asyncio

async def background_job():
    await asyncio.sleep(1)
    print("Job complete")

async def main():
    # WRONG — task may be garbage collected before completing
    asyncio.create_task(background_job())
    await asyncio.sleep(2)

    # RIGHT — keep a reference
    task = asyncio.create_task(background_job())
    await asyncio.sleep(2)
    await task  # Or store in a set and discard on completion

Async Context Managers and Generators

You can write your own async context managers with __aenter__ and __aexit__, or use contextlib.asynccontextmanager for simpler cases. Exception propagation in __aexit__ works exactly like the sync version — return True to suppress, anything else re-raises.

import asyncio
from contextlib import asynccontextmanager

class AsyncDBConnection:
    async def __aenter__(self):
        print("Opening connection")
        await asyncio.sleep(0.1)  # Simulate async connect
        self.connected = True
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection")
        await asyncio.sleep(0.05)  # Simulate async close
        self.connected = False
        return False  # Don't suppress exceptions

# Simpler pattern with asynccontextmanager
@asynccontextmanager
async def managed_connection():
    print("Connecting...")
    await asyncio.sleep(0.1)
    try:
        yield {"conn": "active"}
    finally:
        print("Disconnecting...")  # Always runs, even on exception
        await asyncio.sleep(0.05)

# Async generator
async def paginated_results(page_count: int):
    for page in range(page_count):
        await asyncio.sleep(0.1)  # Simulate API call
        yield [f"item_{page}_{i}" for i in range(10)]

async def main():
    async with managed_connection() as conn:
        print(f"Using: {conn}")

    # Async comprehension
    all_items = [item async for page in paginated_results(3) for item in page]
    print(f"Total items: {len(all_items)}")

asyncio.run(main())

Exception Handling and Propagation in Concurrent Tasks

By default, if one task in asyncio.gather() raises an exception, the whole gather raises immediately and other tasks' results are lost. Use return_exceptions=True to collect all results and inspect failures individually.

import asyncio
import logging

logger = logging.getLogger(__name__)

async def risky_task(n: int):
    await asyncio.sleep(0.1)
    if n == 2:
        raise ValueError(f"Task {n} failed")
    return f"result_{n}"

async def main():
    # WRONG — one failure kills all results
    try:
        results = await asyncio.gather(risky_task(1), risky_task(2), risky_task(3))
    except ValueError as e:
        print(f"Lost everything: {e}")

    # RIGHT — inspect each result
    results = await asyncio.gather(
        risky_task(1),
        risky_task(2),
        risky_task(3),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            logger.error(f"Task {i + 1} failed: {result}")
            # Handle or re-raise selectively — don't silently swallow
        else:
            print(f"Task {i + 1} succeeded: {result}")

asyncio.run(main())

asyncio.CancelledError is special — it inherits from BaseException in Python 3.8+, not Exception. Never catch it silently. If you catch it for cleanup, always re-raise it.

Integration, Performance, and Testing: Moving Async to Production

A developer typing code on a laptop with a Python book beside in an office.
Photo by Christina Morillo on Pexels

Async vs. Threading vs. Multiprocessing: Choosing the Right Tool

Async isn't a universal speed-up. The model only helps when your code spends time waiting on I/O. CPU-bound work — image processing, data crunching — needs real parallelism via multiprocessing or concurrent.futures.ProcessPoolExecutor.

Workload Type Best Tool Why
Network I/O (HTTP, sockets) async/await Low overhead, thousands of concurrent connections
File/DB I/O async/await Frees event loop during wait time
CPU-bound (parsing, math) ProcessPoolExecutor Bypasses GIL, true parallelism
Blocking legacy libraries ThreadPoolExecutor via loop.run_in_executor() Offloads blocking work without rewriting library
Mixed I/O + CPU async + ProcessPoolExecutor Async for I/O, processes for compute
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy(n: int) -> int:
    return sum(i * i for i in range(n))  # Blocks thread

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        # Offload CPU work to a process, keep event loop free
        result = await loop.run_in_executor(pool, cpu_heavy, 10_000_000)
    print(result)

asyncio.run(main())

Testing Async Code with pytest-asyncio

Standard pytest can't run coroutines directly. Install pytest-asyncio and mark your async tests. From pytest-asyncio 0.21+, you need to configure the asyncio mode explicitly in pyproject.toml.

# pyproject.toml
# [tool.pytest.ini_options]
# asyncio_mode = "auto"

import pytest
import asyncio
from unittest.mock import AsyncMock, patch

# The function under test
async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate network call
    return {"id": user_id, "name": "Alice"}

# Test with pytest-asyncio
@pytest.mark.asyncio
async def test_fetch_user_returns_correct_id():
    result = await fetch_user(42)
    assert result["id"] == 42
    assert result["name"] == "Alice"

# Mocking async dependencies
@pytest.mark.asyncio
async def test_fetch_with_mock():
    with patch("__main__.fetch_user", new_callable=AsyncMock) as mock_fetch:
        mock_fetch.return_value = {"id": 1, "name": "Mocked"}
        result = await fetch_user(1)
        assert result["name"] == "Mocked"
        mock_fetch.assert_called_once_with(1)

# Testing exception handling
@pytest.mark.asyncio
async def test_gather_handles_partial_failure():
    async def ok():
        return "success"

    async def fail():
        raise RuntimeError("boom")

    results = await asyncio.gather(ok(), fail(), return_exceptions=True)
    assert results[0] == "success"
    assert isinstance(results[1], RuntimeError)

For more on structuring Python projects for testability, see Python Project Structure Best Practices. The official Python asyncio documentation and the pytest documentation are the authoritative references for keeping up with API changes.

Async Best Practices Checklist

These are the rules that separate code that works in development from code that survives production traffic. Also check out Python Performance Tips for Backend Developers for complementary patterns.

  • Always use asyncio.run() as your top-level entry point — never loop.run_until_complete() in new code
  • Never call time.sleep(), requests.get(), or any blocking library inside an async function
  • Store references to tasks created with asyncio.create_task() — dangling tasks get garbage collected
  • Use return_exceptions=True in asyncio.gather() for any production batch operation
  • Protect shared mutable state with asyncio.Lock() — even in single-threaded async, task switches happen
  • Re-raise CancelledError — catching it silently breaks task cancellation
  • Use connection pools (asyncpg, httpx.AsyncClient) — don't open a new connection per operation
  • Cap concurrency with asyncio.Semaphore when hitting external APIs

Frequently Asked Questions

Q: Does using async/await make my Python code faster automatically?

A: No. Async only helps when your code spends time waiting on I/O — network calls, file reads, database queries. If you rewrite a CPU-bound function with async def, it runs at exactly the same speed. The benefit comes from running multiple I/O waits concurrently instead of sequentially.

Q: Can I use async/await with libraries like requests or SQLAlchemy that aren't async?

A: You can, but you have to offload them to a thread pool with loop.run_in_executor() — otherwise they block the event loop. The better long-term approach is switching to async-native libraries: httpx instead of requests, asyncpg or SQLAlchemy 2.x async mode instead of synchronous SQLAlchemy.

Q: What's the difference between asyncio.gather() and asyncio.create_task()?

A: create_task() schedules a coroutine as a task immediately — it starts running even before you await it. gather() schedules multiple coroutines and waits for all of them, returning results in order. Use create_task() when you want fire-and-forget or need a handle to cancel a task; use gather() when you need all results together.

Wrap-up

Async/await in Python is a cooperative concurrency model — powerful for I/O-bound workloads, irrelevant for CPU-bound ones. The common production failures (race conditions, garbage-collected tasks, swallowed exceptions) all have straightforward fixes once you understand the event loop's scheduling model. Start with asyncio.run(), use asyncio.gather(return_exceptions=True) for any batch of tasks, and always reach for async-native libraries when doing I/O in an async context.

Comments

Popular posts from this blog

Node.js Error Handling Best Practices 2026: Complete Guide

How to Use Docker for Local Development (Complete Guide 2026)