{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# AsyncIO Tutorial - Asynchronous Programming in Python\n", "\n", "This notebook covers the fundamentals of asynchronous programming in Python using the `asyncio` library.\n", "\n", "## What is AsyncIO?\n", "\n", "AsyncIO is a library to write **concurrent** code using the **async/await** syntax. It's particularly useful for:\n", "- I/O-bound operations (file reading, network requests)\n", "- Operations that involve waiting\n", "- Building scalable network applications\n", "\n", "**Key Concepts:**\n", "- **Coroutine**: A function defined with `async def`\n", "- **Event Loop**: The core of asyncio that manages and executes coroutines\n", "- **await**: Used to call coroutines and wait for their completion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Basic Async/Await Syntax" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "import time\n", "\n", "# Basic async function\n", "async def say_hello():\n", " print(\"Hello\")\n", " await asyncio.sleep(1) # Non-blocking sleep\n", " print(\"World!\")\n", "\n", "# Running an async function\n", "await say_hello()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Comparing Synchronous vs Asynchronous Execution" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Synchronous version - blocks execution\n", "def sync_task(name, delay):\n", " print(f\"Task {name} started\")\n", " time.sleep(delay) # Blocking sleep\n", " print(f\"Task {name} completed after {delay} seconds\")\n", "\n", "# Asynchronous version - non-blocking\n", "async def async_task(name, delay):\n", " print(f\"Task {name} started\")\n", " await asyncio.sleep(delay) # Non-blocking sleep\n", " print(f\"Task {name} completed after {delay} seconds\")\n", "\n", "# Demonstrate synchronous execution\n", "print(\"=== Synchronous Execution ===\")\n", "start_time = time.time()\n", "sync_task(\"A\", 2)\n", "sync_task(\"B\", 1)\n", "sync_task(\"C\", 1)\n", "print(f\"Total time: {time.time() - start_time:.2f} seconds\\n\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Demonstrate asynchronous execution\n", "print(\"=== Asynchronous Execution ===\")\n", "start_time = time.time()\n", "\n", "# Run tasks concurrently\n", "await asyncio.gather(\n", " async_task(\"A\", 2),\n", " async_task(\"B\", 1),\n", " async_task(\"C\", 1)\n", ")\n", "\n", "print(f\"Total time: {time.time() - start_time:.2f} seconds\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Different Ways to Run Async Code" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def simple_coroutine():\n", " await asyncio.sleep(0.5)\n", " return \"Coroutine completed!\"\n", "\n", "# Method 1: Direct await (in Jupyter/IPython)\n", "result = await simple_coroutine()\n", "print(f\"Result: {result}\")\n", "\n", "# Method 2: Using asyncio.create_task() for concurrent execution\n", "task1 = asyncio.create_task(simple_coroutine())\n", "task2 = asyncio.create_task(simple_coroutine())\n", "\n", "results = await asyncio.gather(task1, task2)\n", "print(f\"Task results: {results}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. Real-World Example: Fetching Data from Multiple URLs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import aiohttp\n", "import asyncio\n", "\n", "# Note: You might need to install aiohttp: pip install aiohttp\n", "# For this example, we'll simulate HTTP requests\n", "\n", "async def fetch_data(session, url):\n", " \"\"\"Simulate fetching data from a URL\"\"\"\n", " print(f\"Fetching {url}...\")\n", " \n", " # Simulate network delay\n", " await asyncio.sleep(1)\n", " \n", " # Simulate response\n", " return f\"Data from {url}\"\n", "\n", "async def fetch_multiple_urls():\n", " urls = [\n", " \"https://api.example1.com/data\",\n", " \"https://api.example2.com/data\", \n", " \"https://api.example3.com/data\",\n", " \"https://api.example4.com/data\"\n", " ]\n", " \n", " # Create a session (simulated)\n", " session = None\n", " \n", " # Create tasks for all URLs\n", " tasks = [fetch_data(session, url) for url in urls]\n", " \n", " # Execute all tasks concurrently\n", " results = await asyncio.gather(*tasks)\n", " \n", " return results\n", "\n", "# Execute the function\n", "start_time = time.time()\n", "data = await fetch_multiple_urls()\n", "end_time = time.time()\n", "\n", "print(\"\\nResults:\")\n", "for item in data:\n", " print(f\"- {item}\")\n", "print(f\"\\nTotal time: {end_time - start_time:.2f} seconds\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5. Error Handling in Async Code" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def task_that_might_fail(name, should_fail=False):\n", " await asyncio.sleep(1)\n", " \n", " if should_fail:\n", " raise ValueError(f\"Task {name} failed!\")\n", " \n", " return f\"Task {name} succeeded\"\n", "\n", "# Example 1: Basic try/except\n", "async def handle_single_task():\n", " try:\n", " result = await task_that_might_fail(\"A\", should_fail=True)\n", " print(result)\n", " except ValueError as e:\n", " print(f\"Caught error: {e}\")\n", "\n", "await handle_single_task()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Example 2: Handling errors in concurrent tasks\n", "async def handle_multiple_tasks():\n", " tasks = [\n", " task_that_might_fail(\"A\", should_fail=False),\n", " task_that_might_fail(\"B\", should_fail=True),\n", " task_that_might_fail(\"C\", should_fail=False)\n", " ]\n", " \n", " # Method 1: gather with return_exceptions=True\n", " results = await asyncio.gather(*tasks, return_exceptions=True)\n", " \n", " for i, result in enumerate(results):\n", " if isinstance(result, Exception):\n", " print(f\"Task {i} failed: {result}\")\n", " else:\n", " print(f\"Task {i} succeeded: {result}\")\n", "\n", "await handle_multiple_tasks()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6. Using asyncio.wait() for More Control" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def long_running_task(name, duration):\n", " print(f\"Starting {name} (will take {duration}s)\")\n", " await asyncio.sleep(duration)\n", " print(f\"Finished {name}\")\n", " return f\"{name} result\"\n", "\n", "# Using asyncio.wait() with timeout\n", "async def demo_wait_with_timeout():\n", " tasks = [\n", " asyncio.create_task(long_running_task(\"Fast\", 1)),\n", " asyncio.create_task(long_running_task(\"Medium\", 3)),\n", " asyncio.create_task(long_running_task(\"Slow\", 5))\n", " ]\n", " \n", " # Wait for tasks with a timeout of 2 seconds\n", " done, pending = await asyncio.wait(tasks, timeout=2.0)\n", " \n", " print(f\"\\nCompleted tasks: {len(done)}\")\n", " print(f\"Pending tasks: {len(pending)}\")\n", " \n", " # Get results from completed tasks\n", " for task in done:\n", " result = await task\n", " print(f\"Result: {result}\")\n", " \n", " # Cancel pending tasks\n", " for task in pending:\n", " task.cancel()\n", " print(f\"Cancelled task: {task}\")\n", "\n", "await demo_wait_with_timeout()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 7. Async Context Managers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class AsyncResource:\n", " def __init__(self, name):\n", " self.name = name\n", " \n", " async def __aenter__(self):\n", " print(f\"Acquiring resource: {self.name}\")\n", " await asyncio.sleep(0.1) # Simulate setup time\n", " return self\n", " \n", " async def __aexit__(self, exc_type, exc_val, exc_tb):\n", " print(f\"Releasing resource: {self.name}\")\n", " await asyncio.sleep(0.1) # Simulate cleanup time\n", " \n", " async def do_work(self):\n", " print(f\"Working with {self.name}\")\n", " await asyncio.sleep(1)\n", " return f\"Work completed with {self.name}\"\n", "\n", "# Using async context manager\n", "async def demo_async_context_manager():\n", " async with AsyncResource(\"Database Connection\") as resource:\n", " result = await resource.do_work()\n", " print(result)\n", " # Resource is automatically released here\n", "\n", "await demo_async_context_manager()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 8. Async Generators and Async Iteration" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Async generator\n", "async def async_number_generator(max_num):\n", " \"\"\"Generate numbers asynchronously\"\"\"\n", " for i in range(max_num):\n", " print(f\"Generating {i}\")\n", " await asyncio.sleep(0.5) # Simulate async work\n", " yield i\n", "\n", "# Using async generator\n", "async def demo_async_generator():\n", " print(\"=== Async Generator Demo ===\")\n", " async for number in async_number_generator(5):\n", " print(f\"Received: {number}\")\n", "\n", "await demo_async_generator()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Async iterator class\n", "class AsyncRange:\n", " def __init__(self, start, stop):\n", " self.start = start\n", " self.stop = stop\n", " \n", " def __aiter__(self):\n", " return self\n", " \n", " async def __anext__(self):\n", " if self.start >= self.stop:\n", " raise StopAsyncIteration\n", " \n", " await asyncio.sleep(0.2) # Simulate async work\n", " value = self.start\n", " self.start += 1\n", " return value\n", "\n", "# Using async iterator\n", "async def demo_async_iterator():\n", " print(\"\\n=== Async Iterator Demo ===\")\n", " async for value in AsyncRange(1, 6):\n", " print(f\"Value: {value}\")\n", "\n", "await demo_async_iterator()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 9. Limiting Concurrency with Semaphores" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Using semaphore to limit concurrent operations\n", "async def limited_task(semaphore, task_id):\n", " async with semaphore:\n", " print(f\"Task {task_id} started\")\n", " await asyncio.sleep(2) # Simulate work\n", " print(f\"Task {task_id} completed\")\n", " return f\"Result from task {task_id}\"\n", "\n", "async def demo_semaphore():\n", " # Only allow 2 concurrent tasks\n", " semaphore = asyncio.Semaphore(2)\n", " \n", " # Create 5 tasks\n", " tasks = [\n", " limited_task(semaphore, i) for i in range(1, 6)\n", " ]\n", " \n", " print(\"Starting tasks with semaphore (max 2 concurrent)\")\n", " start_time = time.time()\n", " \n", " results = await asyncio.gather(*tasks)\n", " \n", " print(f\"\\nAll tasks completed in {time.time() - start_time:.2f} seconds\")\n", " print(\"Results:\", results)\n", "\n", "await demo_semaphore()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 10. Common Patterns and Best Practices" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Pattern 1: Timeout handling\n", "async def operation_with_timeout():\n", " try:\n", " # This operation takes 3 seconds\n", " result = await asyncio.wait_for(\n", " asyncio.sleep(3), \n", " timeout=2.0\n", " )\n", " return \"Operation completed\"\n", " except asyncio.TimeoutError:\n", " return \"Operation timed out\"\n", "\n", "result = await operation_with_timeout()\n", "print(f\"Result: {result}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Pattern 2: Retry mechanism\n", "async def unreliable_operation():\n", " \"\"\"Simulates an operation that fails randomly\"\"\"\n", " import random\n", " await asyncio.sleep(0.5)\n", " \n", " if random.random() < 0.7: # 70% chance of failure\n", " raise Exception(\"Operation failed!\")\n", " \n", " return \"Success!\"\n", "\n", "async def retry_operation(max_retries=3):\n", " \"\"\"Retry an operation with exponential backoff\"\"\"\n", " for attempt in range(max_retries):\n", " try:\n", " result = await unreliable_operation()\n", " print(f\"Operation succeeded on attempt {attempt + 1}\")\n", " return result\n", " except Exception as e:\n", " print(f\"Attempt {attempt + 1} failed: {e}\")\n", " \n", " if attempt < max_retries - 1:\n", " # Exponential backoff\n", " delay = 2 ** attempt\n", " print(f\"Retrying in {delay} seconds...\")\n", " await asyncio.sleep(delay)\n", " \n", " raise Exception(\"All retry attempts failed\")\n", "\n", "# Test retry mechanism\n", "try:\n", " result = await retry_operation()\n", " print(f\"Final result: {result}\")\n", "except Exception as e:\n", " print(f\"Operation ultimately failed: {e}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 11. Performance Comparison: Sync vs Async" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "\n", "# Simulate I/O bound operations\n", "def sync_io_operation(duration):\n", " \"\"\"Simulate a blocking I/O operation\"\"\"\n", " time.sleep(duration)\n", " return f\"Sync operation completed in {duration}s\"\n", "\n", "async def async_io_operation(duration):\n", " \"\"\"Simulate a non-blocking I/O operation\"\"\"\n", " await asyncio.sleep(duration)\n", " return f\"Async operation completed in {duration}s\"\n", "\n", "# Performance test\n", "async def performance_comparison():\n", " operations = [0.5, 0.3, 0.7, 0.2, 0.4] # Different operation durations\n", " \n", " print(\"=== Performance Comparison ===\")\n", " \n", " # Synchronous execution\n", " print(\"\\nSynchronous execution:\")\n", " start = time.time()\n", " for duration in operations:\n", " result = sync_io_operation(duration)\n", " print(f\" {result}\")\n", " sync_time = time.time() - start\n", " print(f\"Total sync time: {sync_time:.2f} seconds\")\n", " \n", " # Asynchronous execution\n", " print(\"\\nAsynchronous execution:\")\n", " start = time.time()\n", " tasks = [async_io_operation(duration) for duration in operations]\n", " results = await asyncio.gather(*tasks)\n", " for result in results:\n", " print(f\" {result}\")\n", " async_time = time.time() - start\n", " print(f\"Total async time: {async_time:.2f} seconds\")\n", " \n", " # Performance improvement\n", " improvement = ((sync_time - async_time) / sync_time) * 100\n", " print(f\"\\nPerformance improvement: {improvement:.1f}%\")\n", "\n", "await performance_comparison()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "\n", "This notebook covered the essential concepts of asyncio:\n", "\n", "1. **Basic async/await syntax** - Foundation of asynchronous programming\n", "2. **Concurrent execution** - Running multiple operations simultaneously\n", "3. **Error handling** - Managing exceptions in async code\n", "4. **Control flow** - Using `asyncio.wait()`, timeouts, and cancellation\n", "5. **Resource management** - Async context managers\n", "6. **Data generation** - Async generators and iterators\n", "7. **Concurrency control** - Semaphores for limiting parallel operations\n", "8. **Communication** - Queues for producer-consumer patterns\n", "9. **Best practices** - Timeouts, retries, and performance optimization\n", "\n", "## When to Use AsyncIO\n", "\n", "**Good for:**\n", "- I/O-bound operations (file reading, network requests, database queries)\n", "- Applications with many concurrent users\n", "- Real-time applications (chat, gaming, live updates)\n", "- Web scraping with multiple requests\n", "\n", "**Not ideal for:**\n", "- CPU-intensive computations (use multiprocessing instead)\n", "- Simple scripts with minimal I/O\n", "- Applications where blocking behavior is acceptable" ] } ], "metadata": { "kernelspec": { "display_name": "venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.3" } }, "nbformat": 4, "nbformat_minor": 4 }