Skip to main content

Python asyncio Event Loop

Deep dive into Python's asyncio library, understanding event loops, coroutines, tasks, and async/await patterns with interactive visualizations.

What is asyncio?

asyncio is Python's built-in library for writing single-threaded concurrent code using the async/await syntax. It provides an event loop that manages and executes asynchronous tasks, making it perfect for I/O-bound and high-level structured network code.

The Event Loop: Heart of asyncio

Core Concepts

1. Coroutines

Coroutines are special functions defined with async def that can be paused and resumed:

import asyncio async def hello_world(): print("Hello") await asyncio.sleep(1) # Pause here, let other tasks run print("World") # Coroutines must be awaited or run by the event loop asyncio.run(hello_world())

Key Points:

  • Defined with async def
  • Can use await to pause execution
  • Return coroutine objects when called (not executed immediately)
  • Must be run by an event loop

2. Event Loop

The event loop is the core of every asyncio application. It runs asynchronous tasks and callbacks, performs network I/O operations, and runs subprocesses.

import asyncio async def task(name, delay): print(f"Task {name} starting") await asyncio.sleep(delay) print(f"Task {name} completed after {delay}s") return f"Result-{name}" async def main(): # The event loop runs multiple coroutines concurrently results = await asyncio.gather( task("A", 2), task("B", 1), task("C", 3) ) print(f"All results: {results}") # asyncio.run() creates an event loop, runs the coroutine, and closes the loop asyncio.run(main())

3. Tasks

Tasks are used to schedule coroutines concurrently:

import asyncio import time async def fetch_data(id, delay): print(f"Fetching data {id}...") await asyncio.sleep(delay) return f"Data-{id}" async def main(): start = time.time() # Create tasks to run concurrently task1 = asyncio.create_task(fetch_data(1, 2)) task2 = asyncio.create_task(fetch_data(2, 3)) task3 = asyncio.create_task(fetch_data(3, 1)) # Wait for all tasks results = await asyncio.gather(task1, task2, task3) print(f"Results: {results}") print(f"Total time: {time.time() - start:.2f}s") # ~3s, not 6s! asyncio.run(main())

async/await Syntax

The async Keyword

  • async def: Defines a coroutine function
  • Returns a coroutine object when called
  • Can contain await expressions

The await Keyword

  • await: Pauses the coroutine until the awaited task completes
  • Can only be used inside async functions
  • Yields control back to the event loop
async def fetch_user(user_id): # Simulate API call await asyncio.sleep(1) return {"id": user_id, "name": f"User-{user_id}"} async def fetch_posts(user_id): # Simulate database query await asyncio.sleep(0.5) return [f"Post-{i}" for i in range(3)] async def get_user_data(user_id): # Concurrent execution using gather user, posts = await asyncio.gather( fetch_user(user_id), fetch_posts(user_id) ) return {"user": user, "posts": posts} # Run the async function result = asyncio.run(get_user_data(123))

Common asyncio Patterns

1. Fire and Forget

async def background_task(name): await asyncio.sleep(2) print(f"Background task {name} completed") async def main(): # Create task but don't await it immediately asyncio.create_task(background_task("cleanup")) # Do other work print("Main work...") await asyncio.sleep(1) print("Main work done") # Give background tasks time to complete await asyncio.sleep(2) asyncio.run(main())

2. Timeout Handling

async def slow_operation(): await asyncio.sleep(10) return "Complete" async def main(): try: # Wait maximum 3 seconds result = await asyncio.wait_for(slow_operation(), timeout=3.0) print(result) except asyncio.TimeoutError: print("Operation timed out!") asyncio.run(main())

3. Producer-Consumer Pattern

import asyncio import random async def producer(queue, producer_id): for i in range(5): item = f"Item-{producer_id}-{i}" await queue.put(item) print(f"Producer {producer_id} added {item}") await asyncio.sleep(random.uniform(0.5, 1.5)) async def consumer(queue, consumer_id): while True: item = await queue.get() if item is None: # Poison pill break print(f"Consumer {consumer_id} processed {item}") await asyncio.sleep(random.uniform(0.2, 0.8)) queue.task_done() async def main(): queue = asyncio.Queue(maxsize=10) # Create producers and consumers producers = [asyncio.create_task(producer(queue, i)) for i in range(2)] consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)] # Wait for producers to finish await asyncio.gather(*producers) # Wait for queue to be processed await queue.join() # Stop consumers for _ in consumers: await queue.put(None) await asyncio.gather(*consumers) asyncio.run(main())

4. Semaphore for Rate Limiting

async def rate_limited_request(semaphore, url): async with semaphore: # Acquire semaphore print(f"Requesting {url}") await asyncio.sleep(1) # Simulate API call return f"Response from {url}" async def main(): # Limit to 3 concurrent requests semaphore = asyncio.Semaphore(3) urls = [f"http://api.example.com/endpoint/{i}" for i in range(10)] tasks = [rate_limited_request(semaphore, url) for url in urls] results = await asyncio.gather(*tasks) print(f"Completed {len(results)} requests") asyncio.run(main())

Real-World Example: Web Scraper

import asyncio import aiohttp from typing import List, Dict async def fetch_page(session: aiohttp.ClientSession, url: str) -> Dict: """Fetch a single page""" try: async with session.get(url, timeout=5) as response: return { "url": url, "status": response.status, "content": await response.text(), "headers": dict(response.headers) } except asyncio.TimeoutError: return {"url": url, "error": "Timeout"} except Exception as e: return {"url": url, "error": str(e)} async def fetch_all_pages(urls: List[str]) -> List[Dict]: """Fetch multiple pages concurrently""" connector = aiohttp.TCPConnector(limit=10) # Limit connections timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession( connector=connector, timeout=timeout ) as session: tasks = [fetch_page(session, url) for url in urls] return await asyncio.gather(*tasks) async def process_pages(urls: List[str]): """Process pages with progress reporting""" print(f"Fetching {len(urls)} pages...") results = await fetch_all_pages(urls) successful = [r for r in results if "error" not in r] failed = [r for r in results if "error" in r] print(f"✅ Success: {len(successful)}") print(f"❌ Failed: {len(failed)}") for failure in failed: print(f" - {failure['url']}: {failure['error']}") return results # Usage urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/status/200", "https://httpbin.org/status/404", ] results = asyncio.run(process_pages(urls))

asyncio vs Threading

asynciocooperative
Threadingpreemptive
Concurrency Model
Cooperative, single-threaded — tasks yield at await points
Preemptive, multi-threaded — OS schedules threads anywhere
Best For
I/O-bound tasks (network, disk, sockets)
CPU-bound work or blocking-I/O libraries with no async port
Context Switch
Very fast — pure user space, no syscall
Slower — kernel-mediated thread switch
Memory Usage
Low — one stack, lightweight coroutine frames
Higher — each thread reserves its own stack (~8 MB default)
GIL Impact
None — only one thread runs all coroutines
Limited — GIL serializes Python bytecode across threads
Debugging
Easier — sequential reasoning, deterministic switch points
Harder — race conditions, deadlocks, locking overhead
Scalability
10,000+ concurrent tasks on one thread
Hundreds to a few thousand threads before OS thrashes

Advanced Features

1. Async Context Managers

class AsyncDatabase: async def __aenter__(self): print("Connecting to database...") await asyncio.sleep(0.5) self.connection = "Connected" return self async def __aexit__(self, exc_type, exc_val, exc_tb): print("Closing database connection...") await asyncio.sleep(0.2) self.connection = None async def query(self, sql): await asyncio.sleep(0.1) return f"Results for: {sql}" async def main(): async with AsyncDatabase() as db: result = await db.query("SELECT * FROM users") print(result) asyncio.run(main())

2. Async Iterators

class AsyncCounter: def __init__(self, stop): self.current = 0 self.stop = stop def __aiter__(self): return self async def __anext__(self): if self.current < self.stop: await asyncio.sleep(0.1) # Simulate async work self.current += 1 return self.current raise StopAsyncIteration async def main(): async for num in AsyncCounter(5): print(f"Count: {num}") asyncio.run(main())

3. Async Generators

async def fetch_paginated_data(pages): """Async generator for paginated API calls""" for page in range(1, pages + 1): await asyncio.sleep(0.5) # Simulate API call yield { "page": page, "data": [f"item-{i}" for i in range(5)] } async def process_data(): async for page_data in fetch_paginated_data(3): print(f"Processing page {page_data['page']}") # Process data as it arrives for item in page_data['data']: print(f" - {item}") asyncio.run(process_data())

Common Pitfalls and Solutions

1. Blocking the Event Loop

Bad — Blocking sleep
async def bad_example():
    # Blocks the entire event loop!
    time.sleep(5)
    return "Done"
time.sleep is synchronous — every other coroutine stalls for 5s.
Good — Async sleep
async def good_example():
    # Yields to the loop while waiting
    await asyncio.sleep(5)
    return "Done"
asyncio.sleep is awaitable — the loop runs other tasks meanwhile.

2. Forgetting to await

Bad — Missing await
async def fetch_data():
    await asyncio.sleep(1)
    return "data"

async def bad_main():
result = fetch_data() # coroutine object
print(result) # <coroutine object ...>
Calling a coroutine returns the object — it never runs.
Good — Properly awaited
async def good_main():
result = await fetch_data() # actually runs it
print(result) # "data"
await schedules the coroutine and yields the return value.

3. CPU-Bound Tasks

Bad — Tight loop in coroutine
async def cpu_intensive():
    # Hot loop never yields → loop is frozen
    for i in range(100_000_000):
        _ = i * i
    return "Done"
No await points means no other task gets a turn.
Good — Offload to a process pool
from concurrent.futures import ProcessPoolExecutor

def cpu*intensive_sync():
for i in range(100_000_000):

- = i * i
  return "Done"

async def cpu_intensive_async():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as ex:
return await loop.run_in_executor(
ex, cpu_intensive_sync
)
run_in_executor lets the event loop keep serving I/O tasks.

Performance Tips

  1. Use asyncio.gather() for concurrent execution
  2. Limit concurrent connections with Semaphore
  3. Use connection pooling for network requests
  4. Avoid blocking operations in async functions
  5. Use asyncio.create_task() for fire-and-forget tasks
  6. Profile with asyncio.get_event_loop().set_debug(True)

When to Use asyncio

Good Use Cases
  • Web scraping and API calls
  • Network servers and clients
  • Database operations with async drivers
  • File I/O with aiofiles
  • WebSocket connections
  • Real-time data processing
Not Ideal For
  • CPU-intensive computations
  • Simple scripts with little I/O
  • Legacy code with blocking libraries
  • When threading or multiprocessing is simpler

Ecosystem and Libraries

Popular asyncio-compatible libraries:

  • aiohttp: HTTP client/server
  • aiofiles: Async file I/O
  • asyncpg: PostgreSQL driver
  • motor: MongoDB driver
  • aioredis: Redis client
  • fastapi: Modern web framework
  • httpx: HTTP client with async support

Conclusion

asyncio revolutionizes how Python handles concurrent I/O operations. By using cooperative multitasking and an event loop, it enables thousands of concurrent operations on a single thread, making it perfect for modern web applications, microservices, and any I/O-bound workload.

Key takeaways:

  • Single-threaded concurrency through event loop
  • async/await syntax for clean asynchronous code
  • Perfect for I/O-bound operations
  • Massive scalability with minimal resources
  • Rich ecosystem of async libraries

If you found this explanation helpful, consider sharing it with others.

Mastodon