1
Fork 0
crypto_bot_training/Session_02/asyncio_examples.ipynb
2025-06-13 20:00:45 +02:00

625 lines
19 KiB
Text

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# AsyncIO Tutorial - Asynchronous Programming in Python\n",
"\n",
"This notebook covers the fundamentals of asynchronous programming in Python using the `asyncio` library.\n",
"\n",
"## What is AsyncIO?\n",
"\n",
"AsyncIO is a library to write **concurrent** code using the **async/await** syntax. It's particularly useful for:\n",
"- I/O-bound operations (file reading, network requests)\n",
"- Operations that involve waiting\n",
"- Building scalable network applications\n",
"\n",
"**Key Concepts:**\n",
"- **Coroutine**: A function defined with `async def`\n",
"- **Event Loop**: The core of asyncio that manages and executes coroutines\n",
"- **await**: Used to call coroutines and wait for their completion"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Basic Async/Await Syntax"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import time\n",
"\n",
"# Basic async function\n",
"async def say_hello():\n",
" print(\"Hello\")\n",
" await asyncio.sleep(1) # Non-blocking sleep\n",
" print(\"World!\")\n",
"\n",
"# Running an async function\n",
"await say_hello()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Comparing Synchronous vs Asynchronous Execution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Synchronous version - blocks execution\n",
"def sync_task(name, delay):\n",
" print(f\"Task {name} started\")\n",
" time.sleep(delay) # Blocking sleep\n",
" print(f\"Task {name} completed after {delay} seconds\")\n",
"\n",
"# Asynchronous version - non-blocking\n",
"async def async_task(name, delay):\n",
" print(f\"Task {name} started\")\n",
" await asyncio.sleep(delay) # Non-blocking sleep\n",
" print(f\"Task {name} completed after {delay} seconds\")\n",
"\n",
"# Demonstrate synchronous execution\n",
"print(\"=== Synchronous Execution ===\")\n",
"start_time = time.time()\n",
"sync_task(\"A\", 2)\n",
"sync_task(\"B\", 1)\n",
"sync_task(\"C\", 1)\n",
"print(f\"Total time: {time.time() - start_time:.2f} seconds\\n\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Demonstrate asynchronous execution\n",
"print(\"=== Asynchronous Execution ===\")\n",
"start_time = time.time()\n",
"\n",
"# Run tasks concurrently\n",
"await asyncio.gather(\n",
" async_task(\"A\", 2),\n",
" async_task(\"B\", 1),\n",
" async_task(\"C\", 1)\n",
")\n",
"\n",
"print(f\"Total time: {time.time() - start_time:.2f} seconds\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Different Ways to Run Async Code"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"async def simple_coroutine():\n",
" await asyncio.sleep(0.5)\n",
" return \"Coroutine completed!\"\n",
"\n",
"# Method 1: Direct await (in Jupyter/IPython)\n",
"result = await simple_coroutine()\n",
"print(f\"Result: {result}\")\n",
"\n",
"# Method 2: Using asyncio.create_task() for concurrent execution\n",
"task1 = asyncio.create_task(simple_coroutine())\n",
"task2 = asyncio.create_task(simple_coroutine())\n",
"\n",
"results = await asyncio.gather(task1, task2)\n",
"print(f\"Task results: {results}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Real-World Example: Fetching Data from Multiple URLs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import aiohttp\n",
"import asyncio\n",
"\n",
"# Note: You might need to install aiohttp: pip install aiohttp\n",
"# For this example, we'll simulate HTTP requests\n",
"\n",
"async def fetch_data(session, url):\n",
" \"\"\"Simulate fetching data from a URL\"\"\"\n",
" print(f\"Fetching {url}...\")\n",
" \n",
" # Simulate network delay\n",
" await asyncio.sleep(1)\n",
" \n",
" # Simulate response\n",
" return f\"Data from {url}\"\n",
"\n",
"async def fetch_multiple_urls():\n",
" urls = [\n",
" \"https://api.example1.com/data\",\n",
" \"https://api.example2.com/data\", \n",
" \"https://api.example3.com/data\",\n",
" \"https://api.example4.com/data\"\n",
" ]\n",
" \n",
" # Create a session (simulated)\n",
" session = None\n",
" \n",
" # Create tasks for all URLs\n",
" tasks = [fetch_data(session, url) for url in urls]\n",
" \n",
" # Execute all tasks concurrently\n",
" results = await asyncio.gather(*tasks)\n",
" \n",
" return results\n",
"\n",
"# Execute the function\n",
"start_time = time.time()\n",
"data = await fetch_multiple_urls()\n",
"end_time = time.time()\n",
"\n",
"print(\"\\nResults:\")\n",
"for item in data:\n",
" print(f\"- {item}\")\n",
"print(f\"\\nTotal time: {end_time - start_time:.2f} seconds\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Error Handling in Async Code"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"async def task_that_might_fail(name, should_fail=False):\n",
" await asyncio.sleep(1)\n",
" \n",
" if should_fail:\n",
" raise ValueError(f\"Task {name} failed!\")\n",
" \n",
" return f\"Task {name} succeeded\"\n",
"\n",
"# Example 1: Basic try/except\n",
"async def handle_single_task():\n",
" try:\n",
" result = await task_that_might_fail(\"A\", should_fail=True)\n",
" print(result)\n",
" except ValueError as e:\n",
" print(f\"Caught error: {e}\")\n",
"\n",
"await handle_single_task()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example 2: Handling errors in concurrent tasks\n",
"async def handle_multiple_tasks():\n",
" tasks = [\n",
" task_that_might_fail(\"A\", should_fail=False),\n",
" task_that_might_fail(\"B\", should_fail=True),\n",
" task_that_might_fail(\"C\", should_fail=False)\n",
" ]\n",
" \n",
" # Method 1: gather with return_exceptions=True\n",
" results = await asyncio.gather(*tasks, return_exceptions=True)\n",
" \n",
" for i, result in enumerate(results):\n",
" if isinstance(result, Exception):\n",
" print(f\"Task {i} failed: {result}\")\n",
" else:\n",
" print(f\"Task {i} succeeded: {result}\")\n",
"\n",
"await handle_multiple_tasks()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6. Using asyncio.wait() for More Control"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"async def long_running_task(name, duration):\n",
" print(f\"Starting {name} (will take {duration}s)\")\n",
" await asyncio.sleep(duration)\n",
" print(f\"Finished {name}\")\n",
" return f\"{name} result\"\n",
"\n",
"# Using asyncio.wait() with timeout\n",
"async def demo_wait_with_timeout():\n",
" tasks = [\n",
" asyncio.create_task(long_running_task(\"Fast\", 1)),\n",
" asyncio.create_task(long_running_task(\"Medium\", 3)),\n",
" asyncio.create_task(long_running_task(\"Slow\", 5))\n",
" ]\n",
" \n",
" # Wait for tasks with a timeout of 2 seconds\n",
" done, pending = await asyncio.wait(tasks, timeout=2.0)\n",
" \n",
" print(f\"\\nCompleted tasks: {len(done)}\")\n",
" print(f\"Pending tasks: {len(pending)}\")\n",
" \n",
" # Get results from completed tasks\n",
" for task in done:\n",
" result = await task\n",
" print(f\"Result: {result}\")\n",
" \n",
" # Cancel pending tasks\n",
" for task in pending:\n",
" task.cancel()\n",
" print(f\"Cancelled task: {task}\")\n",
"\n",
"await demo_wait_with_timeout()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7. Async Context Managers"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class AsyncResource:\n",
" def __init__(self, name):\n",
" self.name = name\n",
" \n",
" async def __aenter__(self):\n",
" print(f\"Acquiring resource: {self.name}\")\n",
" await asyncio.sleep(0.1) # Simulate setup time\n",
" return self\n",
" \n",
" async def __aexit__(self, exc_type, exc_val, exc_tb):\n",
" print(f\"Releasing resource: {self.name}\")\n",
" await asyncio.sleep(0.1) # Simulate cleanup time\n",
" \n",
" async def do_work(self):\n",
" print(f\"Working with {self.name}\")\n",
" await asyncio.sleep(1)\n",
" return f\"Work completed with {self.name}\"\n",
"\n",
"# Using async context manager\n",
"async def demo_async_context_manager():\n",
" async with AsyncResource(\"Database Connection\") as resource:\n",
" result = await resource.do_work()\n",
" print(result)\n",
" # Resource is automatically released here\n",
"\n",
"await demo_async_context_manager()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 8. Async Generators and Async Iteration"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Async generator\n",
"async def async_number_generator(max_num):\n",
" \"\"\"Generate numbers asynchronously\"\"\"\n",
" for i in range(max_num):\n",
" print(f\"Generating {i}\")\n",
" await asyncio.sleep(0.5) # Simulate async work\n",
" yield i\n",
"\n",
"# Using async generator\n",
"async def demo_async_generator():\n",
" print(\"=== Async Generator Demo ===\")\n",
" async for number in async_number_generator(5):\n",
" print(f\"Received: {number}\")\n",
"\n",
"await demo_async_generator()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Async iterator class\n",
"class AsyncRange:\n",
" def __init__(self, start, stop):\n",
" self.start = start\n",
" self.stop = stop\n",
" \n",
" def __aiter__(self):\n",
" return self\n",
" \n",
" async def __anext__(self):\n",
" if self.start >= self.stop:\n",
" raise StopAsyncIteration\n",
" \n",
" await asyncio.sleep(0.2) # Simulate async work\n",
" value = self.start\n",
" self.start += 1\n",
" return value\n",
"\n",
"# Using async iterator\n",
"async def demo_async_iterator():\n",
" print(\"\\n=== Async Iterator Demo ===\")\n",
" async for value in AsyncRange(1, 6):\n",
" print(f\"Value: {value}\")\n",
"\n",
"await demo_async_iterator()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 9. Limiting Concurrency with Semaphores"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Using semaphore to limit concurrent operations\n",
"async def limited_task(semaphore, task_id):\n",
" async with semaphore:\n",
" print(f\"Task {task_id} started\")\n",
" await asyncio.sleep(2) # Simulate work\n",
" print(f\"Task {task_id} completed\")\n",
" return f\"Result from task {task_id}\"\n",
"\n",
"async def demo_semaphore():\n",
" # Only allow 2 concurrent tasks\n",
" semaphore = asyncio.Semaphore(2)\n",
" \n",
" # Create 5 tasks\n",
" tasks = [\n",
" limited_task(semaphore, i) for i in range(1, 6)\n",
" ]\n",
" \n",
" print(\"Starting tasks with semaphore (max 2 concurrent)\")\n",
" start_time = time.time()\n",
" \n",
" results = await asyncio.gather(*tasks)\n",
" \n",
" print(f\"\\nAll tasks completed in {time.time() - start_time:.2f} seconds\")\n",
" print(\"Results:\", results)\n",
"\n",
"await demo_semaphore()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 10. Common Patterns and Best Practices"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pattern 1: Timeout handling\n",
"async def operation_with_timeout():\n",
" try:\n",
" # This operation takes 3 seconds\n",
" result = await asyncio.wait_for(\n",
" asyncio.sleep(3), \n",
" timeout=2.0\n",
" )\n",
" return \"Operation completed\"\n",
" except asyncio.TimeoutError:\n",
" return \"Operation timed out\"\n",
"\n",
"result = await operation_with_timeout()\n",
"print(f\"Result: {result}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pattern 2: Retry mechanism\n",
"async def unreliable_operation():\n",
" \"\"\"Simulates an operation that fails randomly\"\"\"\n",
" import random\n",
" await asyncio.sleep(0.5)\n",
" \n",
" if random.random() < 0.7: # 70% chance of failure\n",
" raise Exception(\"Operation failed!\")\n",
" \n",
" return \"Success!\"\n",
"\n",
"async def retry_operation(max_retries=3):\n",
" \"\"\"Retry an operation with exponential backoff\"\"\"\n",
" for attempt in range(max_retries):\n",
" try:\n",
" result = await unreliable_operation()\n",
" print(f\"Operation succeeded on attempt {attempt + 1}\")\n",
" return result\n",
" except Exception as e:\n",
" print(f\"Attempt {attempt + 1} failed: {e}\")\n",
" \n",
" if attempt < max_retries - 1:\n",
" # Exponential backoff\n",
" delay = 2 ** attempt\n",
" print(f\"Retrying in {delay} seconds...\")\n",
" await asyncio.sleep(delay)\n",
" \n",
" raise Exception(\"All retry attempts failed\")\n",
"\n",
"# Test retry mechanism\n",
"try:\n",
" result = await retry_operation()\n",
" print(f\"Final result: {result}\")\n",
"except Exception as e:\n",
" print(f\"Operation ultimately failed: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 11. Performance Comparison: Sync vs Async"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"# Simulate I/O bound operations\n",
"def sync_io_operation(duration):\n",
" \"\"\"Simulate a blocking I/O operation\"\"\"\n",
" time.sleep(duration)\n",
" return f\"Sync operation completed in {duration}s\"\n",
"\n",
"async def async_io_operation(duration):\n",
" \"\"\"Simulate a non-blocking I/O operation\"\"\"\n",
" await asyncio.sleep(duration)\n",
" return f\"Async operation completed in {duration}s\"\n",
"\n",
"# Performance test\n",
"async def performance_comparison():\n",
" operations = [0.5, 0.3, 0.7, 0.2, 0.4] # Different operation durations\n",
" \n",
" print(\"=== Performance Comparison ===\")\n",
" \n",
" # Synchronous execution\n",
" print(\"\\nSynchronous execution:\")\n",
" start = time.time()\n",
" for duration in operations:\n",
" result = sync_io_operation(duration)\n",
" print(f\" {result}\")\n",
" sync_time = time.time() - start\n",
" print(f\"Total sync time: {sync_time:.2f} seconds\")\n",
" \n",
" # Asynchronous execution\n",
" print(\"\\nAsynchronous execution:\")\n",
" start = time.time()\n",
" tasks = [async_io_operation(duration) for duration in operations]\n",
" results = await asyncio.gather(*tasks)\n",
" for result in results:\n",
" print(f\" {result}\")\n",
" async_time = time.time() - start\n",
" print(f\"Total async time: {async_time:.2f} seconds\")\n",
" \n",
" # Performance improvement\n",
" improvement = ((sync_time - async_time) / sync_time) * 100\n",
" print(f\"\\nPerformance improvement: {improvement:.1f}%\")\n",
"\n",
"await performance_comparison()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"This notebook covered the essential concepts of asyncio:\n",
"\n",
"1. **Basic async/await syntax** - Foundation of asynchronous programming\n",
"2. **Concurrent execution** - Running multiple operations simultaneously\n",
"3. **Error handling** - Managing exceptions in async code\n",
"4. **Control flow** - Using `asyncio.wait()`, timeouts, and cancellation\n",
"5. **Resource management** - Async context managers\n",
"6. **Data generation** - Async generators and iterators\n",
"7. **Concurrency control** - Semaphores for limiting parallel operations\n",
"8. **Communication** - Queues for producer-consumer patterns\n",
"9. **Best practices** - Timeouts, retries, and performance optimization\n",
"\n",
"## When to Use AsyncIO\n",
"\n",
"**Good for:**\n",
"- I/O-bound operations (file reading, network requests, database queries)\n",
"- Applications with many concurrent users\n",
"- Real-time applications (chat, gaming, live updates)\n",
"- Web scraping with multiple requests\n",
"\n",
"**Not ideal for:**\n",
"- CPU-intensive computations (use multiprocessing instead)\n",
"- Simple scripts with minimal I/O\n",
"- Applications where blocking behavior is acceptable"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}