Python Async Await Explained With Real Examples
Most Python async tutorials stop at asyncio.sleep() and call it a day. This one covers the event loop internals, real concurrent I/O patterns, production pitfalls like race conditions and forgotten awaits, and how to actually test async code — because async bugs in production are a different beast than anything a basic tutorial prepares you for.
Understanding Async/Await Fundamentals: From Coroutines to the Event Loop

What Are Coroutines and How async/await Keywords Work
The single biggest misconception: async def does not run your function. It returns a coroutine object. Nothing executes until something drives that coroutine — either await or the event loop.
# WRONG — this does nothing
import asyncio
async def fetch_data():
print("Fetching...")
await asyncio.sleep(1)
return {"status": "ok"}
result = fetch_data() # Returns <coroutine object fetch_data at 0x...>
print(result) # RuntimeWarning: coroutine 'fetch_data' was never awaited
# RIGHT — drive the coroutine properly
import asyncio
async def fetch_data():
print("Fetching...")
await asyncio.sleep(1)
return {"status": "ok"}
result = asyncio.run(fetch_data()) # Actually executes
print(result) # {"status": "ok"}
The await keyword does two things: it suspends the current coroutine and hands control back to the event loop, which can then run other tasks. This is fundamentally different from time.sleep(), which blocks the entire thread — no other coroutine gets to run during that pause.
import asyncio
import time
# Blocks the event loop for 2 seconds — every other task waits
async def bad_sleep():
time.sleep(2) # Never do this inside an async function
# Suspends for 2 seconds — other tasks run freely during this time
async def good_sleep():
await asyncio.sleep(2)
The Event Loop Mechanism: How Python Schedules Async Tasks
The event loop is a single-threaded scheduler. It keeps a queue of ready tasks and runs them one at a time. When a task hits an await on an I/O operation, the loop marks it as suspended and picks up the next ready task. No threads, no parallelism — pure cooperative multitasking.
import asyncio
async def task_a():
print("A: start")
await asyncio.sleep(1) # Yields control here
print("A: done")
async def task_b():
print("B: start")
await asyncio.sleep(0.5)
print("B: done")
async def main():
# Both tasks are scheduled before either runs
await asyncio.gather(task_a(), task_b())
asyncio.run(main())
# Output:
# A: start
# B: start
# B: done (B's sleep was shorter)
# A: done
For intermediate use cases where you need explicit loop control — say, integrating with legacy sync code — you can manage the loop manually. But asyncio.run() is the right default for everything else:
import asyncio
async def my_coroutine():
await asyncio.sleep(0.1)
return "done"
# Manual loop management (legacy pattern, avoid unless necessary)
loop = asyncio.new_event_loop()
result = loop.run_until_complete(my_coroutine())
loop.close()
print(result)
# Modern standard — prefer this always
result = asyncio.run(my_coroutine())
Critical Rule: Where await Can and Cannot Live
await only works inside an async def function. Put it anywhere else and Python throws a SyntaxError immediately. The subtler mistake is calling an async function without awaiting it inside an async context — Python won't crash right away, but the coroutine silently never runs.
import asyncio
async def get_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # Simulate DB call
return {"id": user_id, "name": "Alice"}
async def main():
# WRONG — coroutine object, not the result
user = get_user(1)
print(user) # <coroutine object get_user at 0x...>
# RIGHT
user = await get_user(1)
print(user) # {"id": 1, "name": "Alice"}
asyncio.run(main())
Practical Async/Await Patterns: Concurrent I/O Examples
Fetching Multiple URLs Concurrently
This is where async pays off. Sequential HTTP requests are slow because each one waits for a server response before starting the next. With asyncio.gather(), all requests fire simultaneously and you collect results when they're all done.
import asyncio
import httpx
import time
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
# Sequential — takes ~3 seconds
async def fetch_sequential(urls):
results = []
async with httpx.AsyncClient() as client:
for url in urls:
response = await client.get(url, timeout=10)
results.append(response.status_code)
return results
# Concurrent with gather — takes ~1 second
async def fetch_concurrent(urls):
async with httpx.AsyncClient() as client:
tasks = [client.get(url, timeout=10) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.status_code for r in responses]
# Concurrent with timeout handling per request
async def fetch_with_timeout(urls):
async with httpx.AsyncClient() as client:
async def safe_get(url):
try:
return await asyncio.wait_for(client.get(url), timeout=5.0)
except asyncio.TimeoutError:
return None
results = await asyncio.gather(*[safe_get(u) for u in urls])
return results
async def main():
start = time.monotonic()
await fetch_sequential(URLS)
print(f"Sequential: {time.monotonic() - start:.2f}s")
start = time.monotonic()
await fetch_concurrent(URLS)
print(f"Concurrent: {time.monotonic() - start:.2f}s")
asyncio.run(main())
# Sequential: ~3.0s
# Concurrent: ~1.0s
Managing Database Queries and File Operations Asynchronously
Async context managers (async with) are the standard pattern for resources that need cleanup — database connections, file handles, network sessions. The __aenter__ and __aexit__ methods are awaitable, so teardown is non-blocking.
import asyncio
import aiosqlite
import aiofiles
# Async database query
async def get_users():
async with aiosqlite.connect("app.db") as db:
async with db.execute("SELECT id, name FROM users LIMIT 10") as cursor:
rows = await cursor.fetchall()
return rows
# Async file read
async def read_config(path: str) -> str:
async with aiofiles.open(path, mode="r") as f:
return await f.read()
# Connection pool pattern (asyncpg for PostgreSQL)
# Pool exhaustion pitfall: creating a new connection per query
async def bad_pattern():
import asyncpg
for _ in range(100):
conn = await asyncpg.connect("postgresql://localhost/mydb")
await conn.fetch("SELECT 1")
await conn.close() # 100 connect/disconnect cycles — slow and wasteful
# Right: create the pool once, reuse connections
async def good_pattern():
import asyncpg
pool = await asyncpg.create_pool("postgresql://localhost/mydb", min_size=2, max_size=10)
async with pool.acquire() as conn:
result = await conn.fetch("SELECT 1")
await pool.close()
return result
Building a Rate-Limited Async API Consumer
Real-world scraping and API consumption need rate limiting. Without it, you'll either get throttled or hammer a server into the ground. asyncio.Semaphore caps concurrent requests cleanly.
import asyncio
import logging
import httpx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def fetch_with_retry(client: httpx.AsyncClient, url: str, retries: int = 3) -> dict:
for attempt in range(retries):
try:
response = await client.get(url, timeout=10)
response.raise_for_status()
return response.json()
except (httpx.HTTPStatusError, httpx.RequestError) as e:
wait = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {wait}s.")
await asyncio.sleep(wait)
logger.error(f"All retries exhausted for {url}")
return {}
async def bounded_fetch(semaphore: asyncio.Semaphore, client: httpx.AsyncClient, url: str) -> dict:
async with semaphore: # Max N concurrent requests at a time
return await fetch_with_retry(client, url)
async def scrape_urls(urls: list[str], max_concurrent: int = 5) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as client:
tasks = [bounded_fetch(semaphore, client, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
Advanced Patterns and Production Pitfalls: Where Async Code Breaks

Debugging Race Conditions, Deadlocks, and Forgotten Awaits
Shared mutable state in async code is a race condition waiting to happen. Even though async is single-threaded, task switches at await points mean two tasks can interleave reads and writes on the same variable.
import asyncio
# WRONG — race condition on shared counter
counter = 0
async def increment_bad():
global counter
value = counter # Task A reads 0
await asyncio.sleep(0) # Task B runs, also reads 0, writes 1
counter = value + 1 # Task A writes 1 — lost the update from Task B
async def run_bad():
await asyncio.gather(*[increment_bad() for _ in range(100)])
print(f"Expected 100, got: {counter}") # Will be less than 100
# RIGHT — use asyncio.Lock()
lock = asyncio.Lock()
safe_counter = 0
async def increment_safe():
global safe_counter
async with lock:
safe_counter += 1
async def run_safe():
await asyncio.gather(*[increment_safe() for _ in range(100)])
print(f"Got: {safe_counter}") # Always 100
Forgotten task references cause silent failures. If you call asyncio.create_task() and don't store the result, the task can be garbage collected mid-execution.
import asyncio
async def background_job():
await asyncio.sleep(1)
print("Job complete")
async def main():
# WRONG — task may be garbage collected before completing
asyncio.create_task(background_job())
await asyncio.sleep(2)
# RIGHT — keep a reference
task = asyncio.create_task(background_job())
await asyncio.sleep(2)
await task # Or store in a set and discard on completion
Async Context Managers and Generators
You can write your own async context managers with __aenter__ and __aexit__, or use contextlib.asynccontextmanager for simpler cases. Exception propagation in __aexit__ works exactly like the sync version — return True to suppress, anything else re-raises.
import asyncio
from contextlib import asynccontextmanager
class AsyncDBConnection:
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simulate async connect
self.connected = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.05) # Simulate async close
self.connected = False
return False # Don't suppress exceptions
# Simpler pattern with asynccontextmanager
@asynccontextmanager
async def managed_connection():
print("Connecting...")
await asyncio.sleep(0.1)
try:
yield {"conn": "active"}
finally:
print("Disconnecting...") # Always runs, even on exception
await asyncio.sleep(0.05)
# Async generator
async def paginated_results(page_count: int):
for page in range(page_count):
await asyncio.sleep(0.1) # Simulate API call
yield [f"item_{page}_{i}" for i in range(10)]
async def main():
async with managed_connection() as conn:
print(f"Using: {conn}")
# Async comprehension
all_items = [item async for page in paginated_results(3) for item in page]
print(f"Total items: {len(all_items)}")
asyncio.run(main())
Exception Handling and Propagation in Concurrent Tasks
By default, if one task in asyncio.gather() raises an exception, the whole gather raises immediately and other tasks' results are lost. Use return_exceptions=True to collect all results and inspect failures individually.
import asyncio
import logging
logger = logging.getLogger(__name__)
async def risky_task(n: int):
await asyncio.sleep(0.1)
if n == 2:
raise ValueError(f"Task {n} failed")
return f"result_{n}"
async def main():
# WRONG — one failure kills all results
try:
results = await asyncio.gather(risky_task(1), risky_task(2), risky_task(3))
except ValueError as e:
print(f"Lost everything: {e}")
# RIGHT — inspect each result
results = await asyncio.gather(
risky_task(1),
risky_task(2),
risky_task(3),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i + 1} failed: {result}")
# Handle or re-raise selectively — don't silently swallow
else:
print(f"Task {i + 1} succeeded: {result}")
asyncio.run(main())
asyncio.CancelledError is special — it inherits from BaseException in Python 3.8+, not Exception. Never catch it silently. If you catch it for cleanup, always re-raise it.
Integration, Performance, and Testing: Moving Async to Production

Async vs. Threading vs. Multiprocessing: Choosing the Right Tool
Async isn't a universal speed-up. The model only helps when your code spends time waiting on I/O. CPU-bound work — image processing, data crunching — needs real parallelism via multiprocessing or concurrent.futures.ProcessPoolExecutor.
| Workload Type | Best Tool | Why |
|---|---|---|
| Network I/O (HTTP, sockets) | async/await | Low overhead, thousands of concurrent connections |
| File/DB I/O | async/await | Frees event loop during wait time |
| CPU-bound (parsing, math) | ProcessPoolExecutor | Bypasses GIL, true parallelism |
| Blocking legacy libraries | ThreadPoolExecutor via loop.run_in_executor() | Offloads blocking work without rewriting library |
| Mixed I/O + CPU | async + ProcessPoolExecutor | Async for I/O, processes for compute |
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_heavy(n: int) -> int:
return sum(i * i for i in range(n)) # Blocks thread
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
# Offload CPU work to a process, keep event loop free
result = await loop.run_in_executor(pool, cpu_heavy, 10_000_000)
print(result)
asyncio.run(main())
Testing Async Code with pytest-asyncio
Standard pytest can't run coroutines directly. Install pytest-asyncio and mark your async tests. From pytest-asyncio 0.21+, you need to configure the asyncio mode explicitly in pyproject.toml.
# pyproject.toml
# [tool.pytest.ini_options]
# asyncio_mode = "auto"
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
# The function under test
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.1) # Simulate network call
return {"id": user_id, "name": "Alice"}
# Test with pytest-asyncio
@pytest.mark.asyncio
async def test_fetch_user_returns_correct_id():
result = await fetch_user(42)
assert result["id"] == 42
assert result["name"] == "Alice"
# Mocking async dependencies
@pytest.mark.asyncio
async def test_fetch_with_mock():
with patch("__main__.fetch_user", new_callable=AsyncMock) as mock_fetch:
mock_fetch.return_value = {"id": 1, "name": "Mocked"}
result = await fetch_user(1)
assert result["name"] == "Mocked"
mock_fetch.assert_called_once_with(1)
# Testing exception handling
@pytest.mark.asyncio
async def test_gather_handles_partial_failure():
async def ok():
return "success"
async def fail():
raise RuntimeError("boom")
results = await asyncio.gather(ok(), fail(), return_exceptions=True)
assert results[0] == "success"
assert isinstance(results[1], RuntimeError)
For more on structuring Python projects for testability, see Python Project Structure Best Practices. The official Python asyncio documentation and the pytest documentation are the authoritative references for keeping up with API changes.
Async Best Practices Checklist
These are the rules that separate code that works in development from code that survives production traffic. Also check out Python Performance Tips for Backend Developers for complementary patterns.
- Always use
asyncio.run()as your top-level entry point — neverloop.run_until_complete()in new code - Never call
time.sleep(),requests.get(), or any blocking library inside an async function - Store references to tasks created with
asyncio.create_task()— dangling tasks get garbage collected - Use
return_exceptions=Trueinasyncio.gather()for any production batch operation - Protect shared mutable state with
asyncio.Lock()— even in single-threaded async, task switches happen - Re-raise
CancelledError— catching it silently breaks task cancellation - Use connection pools (
asyncpg,httpx.AsyncClient) — don't open a new connection per operation - Cap concurrency with
asyncio.Semaphorewhen hitting external APIs
Frequently Asked Questions
Q: Does using async/await make my Python code faster automatically?
A: No. Async only helps when your code spends time waiting on I/O — network calls, file reads, database queries. If you rewrite a CPU-bound function with async def, it runs at exactly the same speed. The benefit comes from running multiple I/O waits concurrently instead of sequentially.
Q: Can I use async/await with libraries like requests or SQLAlchemy that aren't async?
A: You can, but you have to offload them to a thread pool with loop.run_in_executor() — otherwise they block the event loop. The better long-term approach is switching to async-native libraries: httpx instead of requests, asyncpg or SQLAlchemy 2.x async mode instead of synchronous SQLAlchemy.
Q: What's the difference between asyncio.gather() and asyncio.create_task()?
A: create_task() schedules a coroutine as a task immediately — it starts running even before you await it. gather() schedules multiple coroutines and waits for all of them, returning results in order. Use create_task() when you want fire-and-forget or need a handle to cancel a task; use gather() when you need all results together.
Wrap-up
Async/await in Python is a cooperative concurrency model — powerful for I/O-bound workloads, irrelevant for CPU-bound ones. The common production failures (race conditions, garbage-collected tasks, swallowed exceptions) all have straightforward fixes once you understand the event loop's scheduling model. Start with asyncio.run(), use asyncio.gather(return_exceptions=True) for any batch of tasks, and always reach for async-native libraries when doing I/O in an async context.
References
- Simplest async/await example possible in Python - Stack Overflow
- Basic Async Python with Asyncio
- await in python
- Await Async Tutorial with Real Examples and Simple Explanations
- Python Asynchronous Programming Tutorial: Asyncio, async & await ...
- Async Python pitfalls: common mistakes and solutions - LinkedIn
Comments
Post a Comment