1
Fork 0

Session_02

This commit is contained in:
Jakub Polec 2025-06-13 20:00:45 +02:00
parent 49768cffaf
commit cd61e85689
9 changed files with 2444 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

BIN
Session_02/.DS_Store vendored Normal file

Binary file not shown.

View file

@ -0,0 +1,199 @@
import asyncio
import json
from datetime import datetime
from websockets import connect
import random
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.live import Live
from rich.layout import Layout
from rich.text import Text
from rich.align import Align
from rich import box
console = Console()
symbols = ['btcusdt', 'ethusdt', 'solusdt', 'bnbusdt', 'dogeusdt', 'wifusdt']
websocket_url_base = 'wss://fstream.binance.com/ws/'
# Store latest data for each symbol
funding_data = {symbol: None for symbol in symbols}
print_lock = asyncio.Lock()
def get_funding_style(yearly_rate):
"""Return style based on funding rate"""
if yearly_rate > 50:
return "bold white on red"
elif yearly_rate > 30:
return "bold black on yellow"
elif yearly_rate > 5:
return "bold white on blue"
elif yearly_rate < -10:
return "bold white on green"
else:
return "bold black on bright_green"
def create_funding_table():
"""Create a rich table with current funding data"""
table = Table(
title="🚀 Binance Perpetual Funding Rates",
title_style="bold cyan",
box=box.ROUNDED,
show_header=True,
header_style="bold magenta"
)
table.add_column("Symbol", style="cyan", width=12)
table.add_column("Funding Rate", justify="right", width=15)
table.add_column("Yearly Rate", justify="right", width=15)
table.add_column("Status", justify="center", width=20)
table.add_column("Last Update", style="dim", width=12)
for symbol in symbols:
data = funding_data[symbol]
if data:
symbol_display = data['symbol_display']
funding_rate = data['funding_rate']
yearly_rate = data['yearly_rate']
last_update = data['last_update']
# Create status indicator
if yearly_rate > 30:
status = Text("🔥 HIGH", style="bold red")
elif yearly_rate > 5:
status = Text("📈 POSITIVE", style="bold yellow")
elif yearly_rate < -10:
status = Text("📉 NEGATIVE", style="bold green")
else:
status = Text("😐 NEUTRAL", style="dim")
# Style the rates
funding_text = Text(f"{funding_rate:.4f}%", style=get_funding_style(yearly_rate))
yearly_text = Text(f"{yearly_rate:+.2f}%", style=get_funding_style(yearly_rate))
table.add_row(
symbol_display,
funding_text,
yearly_text,
status,
last_update
)
else:
table.add_row(
symbol.replace('usdt', '').upper(),
Text("Loading...", style="dim"),
Text("Loading...", style="dim"),
Text("⏳ WAITING", style="dim"),
Text("--:--:--", style="dim")
)
return table
def create_layout():
"""Create the main layout"""
layout = Layout()
# Header
header = Panel(
Align.center(
Text("Binance Funding Rate Monitor", style="bold white"),
vertical="middle"
),
height=3,
style="blue"
)
# Main table
table = create_funding_table()
# Footer with info
footer_text = Text()
footer_text.append("🟥 >50% ", style="bold white on red")
footer_text.append("🟨 >30% ", style="bold black on yellow")
footer_text.append("🟦 >5% ", style="bold white on blue")
footer_text.append("🟩 <-10% ", style="bold white on green")
footer_text.append("💚 Normal", style="bold black on bright_green")
footer = Panel(
Align.center(footer_text),
title="Legend",
height=3,
style="dim"
)
layout.split_column(
Layout(header, name="header", size=3),
Layout(table, name="main"),
Layout(footer, name="footer", size=3)
)
return layout
async def binance_funding_stream(symbol):
"""Connect to Binance WebSocket and stream funding data"""
global funding_data, print_lock
websocket_url = f'{websocket_url_base}{symbol}@markPrice'
while True: # Reconnection loop
try:
async with connect(websocket_url) as websocket:
while True:
try:
message = await websocket.recv()
data = json.loads(message)
event_time = datetime.fromtimestamp(data['E'] / 1000).strftime("%H:%M:%S")
symbol_display = data['s'].replace('USDT', '').upper()
funding_rate = float(data['r']) * 100 # Convert to percentage
yearly_funding_rate = funding_rate * 3 * 365 # 3 times per day, 365 days
async with print_lock:
funding_data[symbol] = {
'symbol_display': symbol_display,
'funding_rate': funding_rate,
'yearly_rate': yearly_funding_rate,
'last_update': event_time
}
except json.JSONDecodeError:
continue
except Exception as e:
console.print(f"[red]Error processing data for {symbol}: {e}[/red]")
break
except Exception as e:
console.print(f"[red]Connection error for {symbol}: {e}. Retrying in 5 seconds...[/red]")
await asyncio.sleep(5)
async def display_loop():
"""Main display loop using Rich Live"""
with Live(create_layout(), refresh_per_second=2, screen=True) as live:
while True:
live.update(create_layout())
await asyncio.sleep(0.5)
async def main():
"""Main function to run all tasks"""
console.print("[bold green]🚀 Starting Binance Funding Rate Monitor...[/bold green]")
console.print("[yellow]Press Ctrl+C to exit[/yellow]\n")
# Start WebSocket streams
stream_tasks = [binance_funding_stream(symbol) for symbol in symbols]
# Start display
display_task = display_loop()
# Run all tasks
try:
await asyncio.gather(*stream_tasks, display_task)
except KeyboardInterrupt:
console.print("\n[bold red]Shutting down...[/bold red]")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,148 @@
import asyncio
import json
import os
from datetime import datetime
import pytz
from websockets import connect
from rich.console import Console
from rich.text import Text
from rich.panel import Panel
from rich.align import Align
from rich.columns import Columns
from rich.rule import Rule
LIQ_LIMIT = 0
console = Console()
websocket_url = 'wss://fstream.binance.com/ws/!forceOrder@arr'
def format_usd(amount):
"""Format USD amount with appropriate suffixes"""
if amount >= 1_000_000:
return f"${amount/1_000_000:.2f}M"
elif amount >= 1_000:
return f"${amount/1_000:.1f}K"
else:
return f"${amount:.0f}"
async def binance_liquidation(uri):
# Print header
console.print(Rule("[bold cyan]🔥 BINANCE LIQUIDATION MONITOR 🔥[/bold cyan]", style="cyan"))
console.print()
async with connect(uri) as websocket:
while True:
try:
msg = await websocket.recv()
order_data = json.loads(msg)['o']
symbol = order_data['s'].replace('USDT', '')
side = order_data['S']
timestamp = int(order_data['T'])
filled_quantity = float(order_data['z'])
price = float(order_data['p'])
usd_size = filled_quantity * price
est = pytz.timezone("US/Eastern")
time_est = datetime.fromtimestamp(timestamp / 1000, est).strftime('%H:%M:%S')
if usd_size >= LIQ_LIMIT:
liquidation_type = '📈 LONG LIQ' if side == 'SELL' else '📉 SHORT LIQ'
symbol_clean = symbol[:4]
formatted_usd = format_usd(usd_size)
# Choose colors and styling based on liquidation type and size
if side == 'SELL': # Long liquidation
border_color = "bright_green"
text_style = "bright_green"
emoji_prefix = "🟢"
else: # Short liquidation
border_color = "bright_red"
text_style = "bright_red"
emoji_prefix = "🔴"
# Create content sections
type_text = Text(liquidation_type, style=f"bold {text_style}")
symbol_text = Text(f"{symbol_clean}", style="bold yellow")
time_text = Text(f"{time_est}", style="white")
amount_text = Text(formatted_usd, style=f"bold {text_style}")
# Create the main content
content_parts = [
emoji_prefix,
type_text,
symbol_text,
time_text,
amount_text
]
if usd_size > 250000:
# Mega liquidation - very prominent
title = "💎 MEGA LIQUIDATION 💎"
panel = Panel(
Align.center(Columns(content_parts, padding=(0, 2))),
title=title,
title_align="center",
border_style=border_color,
style=f"bold {text_style}",
padding=(1, 2)
)
for _ in range(4):
console.print(panel)
elif usd_size > 100000:
# Large liquidation - prominent
title = "🚨 LARGE LIQUIDATION 🚨"
panel = Panel(
Align.center(Columns(content_parts, padding=(0, 1))),
title=title,
title_align="center",
border_style=border_color,
style=f"bold {text_style}",
padding=(0, 2)
)
for _ in range(2):
console.print(panel)
elif usd_size > 25000:
# Medium liquidation - boxed
panel = Panel(
Columns(content_parts, padding=(0, 1)),
border_style=border_color,
style=f"bold {text_style}",
padding=(0, 1)
)
console.print(panel)
else:
# Small liquidation - simple line
line_content = Text()
for i, part in enumerate(content_parts):
if i > 0:
line_content.append(" ")
line_content.append(part)
console.print(
Panel(
line_content,
border_style=border_color,
style=text_style,
padding=(0, 1),
expand=False
)
)
console.print() # Spacing
except Exception as e:
error_panel = Panel(
f"❌ Connection error: {str(e)}\n⏳ Reconnecting in 5 seconds...",
title="Error",
border_style="yellow",
style="yellow"
)
console.print(error_panel)
await asyncio.sleep(5)
if __name__ == "__main__":
try:
asyncio.run(binance_liquidation(websocket_url))
except KeyboardInterrupt:
console.print(Rule("[bold red]👋 Liquidation monitor stopped[/bold red]", style="red"))

View file

@ -0,0 +1,214 @@
import asyncio
import json
import os
from datetime import datetime
import pytz
from websockets import connect
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.text import Text
from rich.align import Align
from rich.layout import Layout
from rich.live import Live
from rich import box
from rich.progress import Progress, BarColumn, TextColumn, SpinnerColumn
from rich.columns import Columns
from rich.rule import Rule
from collections import defaultdict, deque
import time
import statistics
# Initialize Rich console
console = Console()
# List of symbols you want to track
symbols = ['btcusdt', 'ethusdt', 'solusdt', 'bnbusdt', 'dogeusdt', 'wifusdt']
websocket_url_base = 'wss://fstream.binance.com/ws/'
def get_trade_style(usd_size, trade_type):
"""Get enhanced styling for trade based on size and type"""
if usd_size >= 1000000: # $1M+
return {
'emoji': '💎',
'color': 'bold bright_magenta' if trade_type == 'SELL' else 'bold bright_cyan',
'border': 'heavy',
'title': '🌊 MEGA WHALE DETECTED 🌊',
'bg_color': 'on_blue' if trade_type == 'BUY' else 'on_red'
}
elif usd_size >= 500000: # $500K+
return {
'emoji': '🏦',
'color': 'bold magenta' if trade_type == 'SELL' else 'bold cyan',
'border': 'double',
'title': '🔥 MASSIVE WHALE 🔥',
'bg_color': None
}
elif usd_size >= 100000: # $100K+
return {
'emoji': '💰',
'color': 'bold red' if trade_type == 'SELL' else 'bold green',
'border': 'rounded',
'title': '⚡️ WHALE ALERT ⚡️',
'bg_color': None
}
else: # $15K+
return {
'emoji': '💵',
'color': 'red' if trade_type == 'SELL' else 'green',
'border': 'ascii',
'title': None,
'bg_color': None
}
async def binance_trade_stream(url, symbol):
"""Enhanced WebSocket connection for individual symbol"""
async with connect(url) as websocket:
while True:
try:
message = await websocket.recv()
data = json.loads(message)
# Parse trade data
price = float(data['p'])
quantity = float(data['q'])
trade_time = int(data['T'])
is_buyer_maker = data['m']
# Calculate USD size
usd_size = price * quantity
if usd_size >= 15000:
# Determine trade type
trade_type = 'SELL' if is_buyer_maker else "BUY"
# Format time
est = pytz.timezone('US/Eastern')
readable_trade_time = datetime.fromtimestamp(trade_time / 1000, est).strftime('%H:%M:%S')
display_symbol = symbol.upper().replace('USDT', '')
# Get styling
style_info = get_trade_style(usd_size, trade_type)
# Create enhanced trade notification
trade_text = Text()
trade_text.append(f"[{readable_trade_time}] ", style="dim bright_white")
trade_text.append(f"{style_info['emoji']} ", style="bold")
trade_text.append(f"{trade_type:<4} ", style=f"bold {'bright_green' if trade_type == 'BUY' else 'bright_red'}")
trade_text.append(f"{display_symbol:<4} ", style="bold bright_cyan")
trade_text.append(f"${usd_size:,.0f} ", style="bold bright_yellow")
trade_text.append(f"@ ${price:,.2f} ", style="bright_white")
# Create panel for large trades (100K+)
if usd_size >= 100000 and style_info['title']:
enhanced_content = Text()
enhanced_content.append(f"{style_info['emoji']} TRADE SIZE: ${usd_size:,.0f}\n", style="bold bright_yellow")
enhanced_content.append(f"💲 PRICE: ${price:,.2f}\n", style="bright_white")
enhanced_content.append(f"📊 SYMBOL: {display_symbol}\n", style="bold bright_cyan")
enhanced_content.append(f"⏰ TIME: {readable_trade_time}", style="dim")
panel = Panel(
Align.center(enhanced_content),
title=style_info['title'],
subtitle=f"🐋 {trade_type} ORDER",
box=getattr(box, style_info['border'].upper()),
border_style=style_info['color'],
padding=(1, 2)
)
console.print(panel)
# Add separator for mega trades
if usd_size >= 1000000:
console.print(Rule("🌊🌊🌊", style="bold bright_blue"))
else:
console.print(trade_text)
except Exception as e:
console.print(f"[red]❌ Error in {symbol}: {e}[/red]")
await asyncio.sleep(5)
async def dashboard_updater():
"""Periodically update the dashboard"""
while True:
try:
await asyncio.sleep(1) # Update every 10 seconds
# You could implement a live dashboard here using Rich Live
# For now, we'll just continue with the stream-based approach
except Exception as e:
console.print(f"[red]Dashboard error: {e}[/red]")
async def main():
"""Enhanced main function"""
console.clear()
# Enhanced startup sequence
startup_progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
)
with startup_progress:
task = startup_progress.add_task("🐋 Initializing Whale Monitor...", total=100)
for i in range(100):
await asyncio.sleep(0.02)
startup_progress.update(task, advance=1)
if i == 20:
startup_progress.update(task, description="🌐 Connecting to Binance...")
elif i == 50:
startup_progress.update(task, description="📊 Setting up data streams...")
elif i == 80:
startup_progress.update(task, description="🎯 Calibrating whale detection...")
console.clear()
# Display enhanced startup message
startup_panel = Panel(
Align.center(
Text("🐋 BINANCE WHALE MONITOR ACTIVE 🐋\n\n") +
Text("💎 Tracking: ", style="bright_green") +
Text(", ".join([s.upper().replace('USDT', '') for s in symbols]), style="bright_yellow") +
Text("\n🎯 Minimum Trade Size: $15,000", style="bright_white") +
Text("\n⚡️ Real-time WebSocket Streams Connected", style="bright_cyan") +
Text("\n\n🚀 Ready to catch whales!", style="bold bright_magenta")
),
title="🌊 WHALE MONITOR INITIALIZED 🌊",
border_style="bright_green",
box=box.DOUBLE,
padding=(1, 2)
)
console.print(startup_panel)
console.print()
# Create tasks for each symbol trade stream
tasks = []
# Add WebSocket tasks
for symbol in symbols:
stream_url = f"{websocket_url_base}{symbol}@aggTrade"
tasks.append(binance_trade_stream(stream_url, symbol))
# Add dashboard updater
tasks.append(dashboard_updater())
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
console.print("\n")
shutdown_panel = Panel(
Align.center("🐋 Whale Monitor shutting down...\n👋 Thanks for whale watching!"),
title="🌊 SHUTDOWN COMPLETE 🌊",
border_style="bright_yellow",
box=box.DOUBLE
)
console.print(shutdown_panel)
except Exception as e:
console.print(f"[red]❌ Critical Error: {e}[/red]")

View file

View file

@ -0,0 +1,625 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# AsyncIO Tutorial - Asynchronous Programming in Python\n",
"\n",
"This notebook covers the fundamentals of asynchronous programming in Python using the `asyncio` library.\n",
"\n",
"## What is AsyncIO?\n",
"\n",
"AsyncIO is a library to write **concurrent** code using the **async/await** syntax. It's particularly useful for:\n",
"- I/O-bound operations (file reading, network requests)\n",
"- Operations that involve waiting\n",
"- Building scalable network applications\n",
"\n",
"**Key Concepts:**\n",
"- **Coroutine**: A function defined with `async def`\n",
"- **Event Loop**: The core of asyncio that manages and executes coroutines\n",
"- **await**: Used to call coroutines and wait for their completion"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Basic Async/Await Syntax"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import time\n",
"\n",
"# Basic async function\n",
"async def say_hello():\n",
" print(\"Hello\")\n",
" await asyncio.sleep(1) # Non-blocking sleep\n",
" print(\"World!\")\n",
"\n",
"# Running an async function\n",
"await say_hello()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Comparing Synchronous vs Asynchronous Execution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Synchronous version - blocks execution\n",
"def sync_task(name, delay):\n",
" print(f\"Task {name} started\")\n",
" time.sleep(delay) # Blocking sleep\n",
" print(f\"Task {name} completed after {delay} seconds\")\n",
"\n",
"# Asynchronous version - non-blocking\n",
"async def async_task(name, delay):\n",
" print(f\"Task {name} started\")\n",
" await asyncio.sleep(delay) # Non-blocking sleep\n",
" print(f\"Task {name} completed after {delay} seconds\")\n",
"\n",
"# Demonstrate synchronous execution\n",
"print(\"=== Synchronous Execution ===\")\n",
"start_time = time.time()\n",
"sync_task(\"A\", 2)\n",
"sync_task(\"B\", 1)\n",
"sync_task(\"C\", 1)\n",
"print(f\"Total time: {time.time() - start_time:.2f} seconds\\n\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Demonstrate asynchronous execution\n",
"print(\"=== Asynchronous Execution ===\")\n",
"start_time = time.time()\n",
"\n",
"# Run tasks concurrently\n",
"await asyncio.gather(\n",
" async_task(\"A\", 2),\n",
" async_task(\"B\", 1),\n",
" async_task(\"C\", 1)\n",
")\n",
"\n",
"print(f\"Total time: {time.time() - start_time:.2f} seconds\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Different Ways to Run Async Code"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"async def simple_coroutine():\n",
" await asyncio.sleep(0.5)\n",
" return \"Coroutine completed!\"\n",
"\n",
"# Method 1: Direct await (in Jupyter/IPython)\n",
"result = await simple_coroutine()\n",
"print(f\"Result: {result}\")\n",
"\n",
"# Method 2: Using asyncio.create_task() for concurrent execution\n",
"task1 = asyncio.create_task(simple_coroutine())\n",
"task2 = asyncio.create_task(simple_coroutine())\n",
"\n",
"results = await asyncio.gather(task1, task2)\n",
"print(f\"Task results: {results}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Real-World Example: Fetching Data from Multiple URLs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import aiohttp\n",
"import asyncio\n",
"\n",
"# Note: You might need to install aiohttp: pip install aiohttp\n",
"# For this example, we'll simulate HTTP requests\n",
"\n",
"async def fetch_data(session, url):\n",
" \"\"\"Simulate fetching data from a URL\"\"\"\n",
" print(f\"Fetching {url}...\")\n",
" \n",
" # Simulate network delay\n",
" await asyncio.sleep(1)\n",
" \n",
" # Simulate response\n",
" return f\"Data from {url}\"\n",
"\n",
"async def fetch_multiple_urls():\n",
" urls = [\n",
" \"https://api.example1.com/data\",\n",
" \"https://api.example2.com/data\", \n",
" \"https://api.example3.com/data\",\n",
" \"https://api.example4.com/data\"\n",
" ]\n",
" \n",
" # Create a session (simulated)\n",
" session = None\n",
" \n",
" # Create tasks for all URLs\n",
" tasks = [fetch_data(session, url) for url in urls]\n",
" \n",
" # Execute all tasks concurrently\n",
" results = await asyncio.gather(*tasks)\n",
" \n",
" return results\n",
"\n",
"# Execute the function\n",
"start_time = time.time()\n",
"data = await fetch_multiple_urls()\n",
"end_time = time.time()\n",
"\n",
"print(\"\\nResults:\")\n",
"for item in data:\n",
" print(f\"- {item}\")\n",
"print(f\"\\nTotal time: {end_time - start_time:.2f} seconds\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Error Handling in Async Code"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"async def task_that_might_fail(name, should_fail=False):\n",
" await asyncio.sleep(1)\n",
" \n",
" if should_fail:\n",
" raise ValueError(f\"Task {name} failed!\")\n",
" \n",
" return f\"Task {name} succeeded\"\n",
"\n",
"# Example 1: Basic try/except\n",
"async def handle_single_task():\n",
" try:\n",
" result = await task_that_might_fail(\"A\", should_fail=True)\n",
" print(result)\n",
" except ValueError as e:\n",
" print(f\"Caught error: {e}\")\n",
"\n",
"await handle_single_task()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example 2: Handling errors in concurrent tasks\n",
"async def handle_multiple_tasks():\n",
" tasks = [\n",
" task_that_might_fail(\"A\", should_fail=False),\n",
" task_that_might_fail(\"B\", should_fail=True),\n",
" task_that_might_fail(\"C\", should_fail=False)\n",
" ]\n",
" \n",
" # Method 1: gather with return_exceptions=True\n",
" results = await asyncio.gather(*tasks, return_exceptions=True)\n",
" \n",
" for i, result in enumerate(results):\n",
" if isinstance(result, Exception):\n",
" print(f\"Task {i} failed: {result}\")\n",
" else:\n",
" print(f\"Task {i} succeeded: {result}\")\n",
"\n",
"await handle_multiple_tasks()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6. Using asyncio.wait() for More Control"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"async def long_running_task(name, duration):\n",
" print(f\"Starting {name} (will take {duration}s)\")\n",
" await asyncio.sleep(duration)\n",
" print(f\"Finished {name}\")\n",
" return f\"{name} result\"\n",
"\n",
"# Using asyncio.wait() with timeout\n",
"async def demo_wait_with_timeout():\n",
" tasks = [\n",
" asyncio.create_task(long_running_task(\"Fast\", 1)),\n",
" asyncio.create_task(long_running_task(\"Medium\", 3)),\n",
" asyncio.create_task(long_running_task(\"Slow\", 5))\n",
" ]\n",
" \n",
" # Wait for tasks with a timeout of 2 seconds\n",
" done, pending = await asyncio.wait(tasks, timeout=2.0)\n",
" \n",
" print(f\"\\nCompleted tasks: {len(done)}\")\n",
" print(f\"Pending tasks: {len(pending)}\")\n",
" \n",
" # Get results from completed tasks\n",
" for task in done:\n",
" result = await task\n",
" print(f\"Result: {result}\")\n",
" \n",
" # Cancel pending tasks\n",
" for task in pending:\n",
" task.cancel()\n",
" print(f\"Cancelled task: {task}\")\n",
"\n",
"await demo_wait_with_timeout()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7. Async Context Managers"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class AsyncResource:\n",
" def __init__(self, name):\n",
" self.name = name\n",
" \n",
" async def __aenter__(self):\n",
" print(f\"Acquiring resource: {self.name}\")\n",
" await asyncio.sleep(0.1) # Simulate setup time\n",
" return self\n",
" \n",
" async def __aexit__(self, exc_type, exc_val, exc_tb):\n",
" print(f\"Releasing resource: {self.name}\")\n",
" await asyncio.sleep(0.1) # Simulate cleanup time\n",
" \n",
" async def do_work(self):\n",
" print(f\"Working with {self.name}\")\n",
" await asyncio.sleep(1)\n",
" return f\"Work completed with {self.name}\"\n",
"\n",
"# Using async context manager\n",
"async def demo_async_context_manager():\n",
" async with AsyncResource(\"Database Connection\") as resource:\n",
" result = await resource.do_work()\n",
" print(result)\n",
" # Resource is automatically released here\n",
"\n",
"await demo_async_context_manager()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 8. Async Generators and Async Iteration"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Async generator\n",
"async def async_number_generator(max_num):\n",
" \"\"\"Generate numbers asynchronously\"\"\"\n",
" for i in range(max_num):\n",
" print(f\"Generating {i}\")\n",
" await asyncio.sleep(0.5) # Simulate async work\n",
" yield i\n",
"\n",
"# Using async generator\n",
"async def demo_async_generator():\n",
" print(\"=== Async Generator Demo ===\")\n",
" async for number in async_number_generator(5):\n",
" print(f\"Received: {number}\")\n",
"\n",
"await demo_async_generator()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Async iterator class\n",
"class AsyncRange:\n",
" def __init__(self, start, stop):\n",
" self.start = start\n",
" self.stop = stop\n",
" \n",
" def __aiter__(self):\n",
" return self\n",
" \n",
" async def __anext__(self):\n",
" if self.start >= self.stop:\n",
" raise StopAsyncIteration\n",
" \n",
" await asyncio.sleep(0.2) # Simulate async work\n",
" value = self.start\n",
" self.start += 1\n",
" return value\n",
"\n",
"# Using async iterator\n",
"async def demo_async_iterator():\n",
" print(\"\\n=== Async Iterator Demo ===\")\n",
" async for value in AsyncRange(1, 6):\n",
" print(f\"Value: {value}\")\n",
"\n",
"await demo_async_iterator()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 9. Limiting Concurrency with Semaphores"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Using semaphore to limit concurrent operations\n",
"async def limited_task(semaphore, task_id):\n",
" async with semaphore:\n",
" print(f\"Task {task_id} started\")\n",
" await asyncio.sleep(2) # Simulate work\n",
" print(f\"Task {task_id} completed\")\n",
" return f\"Result from task {task_id}\"\n",
"\n",
"async def demo_semaphore():\n",
" # Only allow 2 concurrent tasks\n",
" semaphore = asyncio.Semaphore(2)\n",
" \n",
" # Create 5 tasks\n",
" tasks = [\n",
" limited_task(semaphore, i) for i in range(1, 6)\n",
" ]\n",
" \n",
" print(\"Starting tasks with semaphore (max 2 concurrent)\")\n",
" start_time = time.time()\n",
" \n",
" results = await asyncio.gather(*tasks)\n",
" \n",
" print(f\"\\nAll tasks completed in {time.time() - start_time:.2f} seconds\")\n",
" print(\"Results:\", results)\n",
"\n",
"await demo_semaphore()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 10. Common Patterns and Best Practices"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pattern 1: Timeout handling\n",
"async def operation_with_timeout():\n",
" try:\n",
" # This operation takes 3 seconds\n",
" result = await asyncio.wait_for(\n",
" asyncio.sleep(3), \n",
" timeout=2.0\n",
" )\n",
" return \"Operation completed\"\n",
" except asyncio.TimeoutError:\n",
" return \"Operation timed out\"\n",
"\n",
"result = await operation_with_timeout()\n",
"print(f\"Result: {result}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pattern 2: Retry mechanism\n",
"async def unreliable_operation():\n",
" \"\"\"Simulates an operation that fails randomly\"\"\"\n",
" import random\n",
" await asyncio.sleep(0.5)\n",
" \n",
" if random.random() < 0.7: # 70% chance of failure\n",
" raise Exception(\"Operation failed!\")\n",
" \n",
" return \"Success!\"\n",
"\n",
"async def retry_operation(max_retries=3):\n",
" \"\"\"Retry an operation with exponential backoff\"\"\"\n",
" for attempt in range(max_retries):\n",
" try:\n",
" result = await unreliable_operation()\n",
" print(f\"Operation succeeded on attempt {attempt + 1}\")\n",
" return result\n",
" except Exception as e:\n",
" print(f\"Attempt {attempt + 1} failed: {e}\")\n",
" \n",
" if attempt < max_retries - 1:\n",
" # Exponential backoff\n",
" delay = 2 ** attempt\n",
" print(f\"Retrying in {delay} seconds...\")\n",
" await asyncio.sleep(delay)\n",
" \n",
" raise Exception(\"All retry attempts failed\")\n",
"\n",
"# Test retry mechanism\n",
"try:\n",
" result = await retry_operation()\n",
" print(f\"Final result: {result}\")\n",
"except Exception as e:\n",
" print(f\"Operation ultimately failed: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 11. Performance Comparison: Sync vs Async"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"# Simulate I/O bound operations\n",
"def sync_io_operation(duration):\n",
" \"\"\"Simulate a blocking I/O operation\"\"\"\n",
" time.sleep(duration)\n",
" return f\"Sync operation completed in {duration}s\"\n",
"\n",
"async def async_io_operation(duration):\n",
" \"\"\"Simulate a non-blocking I/O operation\"\"\"\n",
" await asyncio.sleep(duration)\n",
" return f\"Async operation completed in {duration}s\"\n",
"\n",
"# Performance test\n",
"async def performance_comparison():\n",
" operations = [0.5, 0.3, 0.7, 0.2, 0.4] # Different operation durations\n",
" \n",
" print(\"=== Performance Comparison ===\")\n",
" \n",
" # Synchronous execution\n",
" print(\"\\nSynchronous execution:\")\n",
" start = time.time()\n",
" for duration in operations:\n",
" result = sync_io_operation(duration)\n",
" print(f\" {result}\")\n",
" sync_time = time.time() - start\n",
" print(f\"Total sync time: {sync_time:.2f} seconds\")\n",
" \n",
" # Asynchronous execution\n",
" print(\"\\nAsynchronous execution:\")\n",
" start = time.time()\n",
" tasks = [async_io_operation(duration) for duration in operations]\n",
" results = await asyncio.gather(*tasks)\n",
" for result in results:\n",
" print(f\" {result}\")\n",
" async_time = time.time() - start\n",
" print(f\"Total async time: {async_time:.2f} seconds\")\n",
" \n",
" # Performance improvement\n",
" improvement = ((sync_time - async_time) / sync_time) * 100\n",
" print(f\"\\nPerformance improvement: {improvement:.1f}%\")\n",
"\n",
"await performance_comparison()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"This notebook covered the essential concepts of asyncio:\n",
"\n",
"1. **Basic async/await syntax** - Foundation of asynchronous programming\n",
"2. **Concurrent execution** - Running multiple operations simultaneously\n",
"3. **Error handling** - Managing exceptions in async code\n",
"4. **Control flow** - Using `asyncio.wait()`, timeouts, and cancellation\n",
"5. **Resource management** - Async context managers\n",
"6. **Data generation** - Async generators and iterators\n",
"7. **Concurrency control** - Semaphores for limiting parallel operations\n",
"8. **Communication** - Queues for producer-consumer patterns\n",
"9. **Best practices** - Timeouts, retries, and performance optimization\n",
"\n",
"## When to Use AsyncIO\n",
"\n",
"**Good for:**\n",
"- I/O-bound operations (file reading, network requests, database queries)\n",
"- Applications with many concurrent users\n",
"- Real-time applications (chat, gaming, live updates)\n",
"- Web scraping with multiple requests\n",
"\n",
"**Not ideal for:**\n",
"- CPU-intensive computations (use multiprocessing instead)\n",
"- Simple scripts with minimal I/O\n",
"- Applications where blocking behavior is acceptable"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

View file

@ -0,0 +1,509 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Python Error Handling - try, except, and Beyond\n",
"\n",
"This notebook covers Python's error handling mechanisms from basic try/except blocks to advanced exception handling patterns.\n",
"\n",
"## Learning Objectives\n",
"- Understand Python's exception hierarchy\n",
"- Master try, except, else, and finally blocks\n",
"- Learn to handle multiple exception types\n",
"- Create and use custom exceptions\n",
"- Follow error handling best practices"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Understanding Exceptions\n",
"\n",
"An **exception** is an event that occurs during program execution that disrupts the normal flow of instructions."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Common exceptions without handling\n",
"print(\"=== Common Exception Types ===\")\n",
"\n",
"def show_exceptions():\n",
" examples = [\n",
" (\"ZeroDivisionError\", lambda: 10 / 0),\n",
" (\"IndexError\", lambda: [1, 2, 3][5]),\n",
" (\"KeyError\", lambda: {'a': 1}['b']),\n",
" (\"TypeError\", lambda: \"hello\" + 5),\n",
" (\"ValueError\", lambda: int(\"not_a_number\"))\n",
" ]\n",
" \n",
" for name, func in examples:\n",
" try:\n",
" func()\n",
" except Exception as e:\n",
" print(f\"❌ {name}: {e}\")\n",
"\n",
"show_exceptions()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Basic try/except Syntax"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Basic try/except structure\n",
"def safe_divide(a, b):\n",
" try:\n",
" result = a / b\n",
" print(f\"✅ {a} ÷ {b} = {result}\")\n",
" return result\n",
" except ZeroDivisionError:\n",
" print(f\"❌ Cannot divide {a} by zero!\")\n",
" return None\n",
"\n",
"# Test the function\n",
"safe_divide(10, 2)\n",
"safe_divide(10, 0)\n",
"\n",
"print()\n",
"\n",
"# Capturing exception details\n",
"def analyze_exception(func):\n",
" try:\n",
" result = func()\n",
" print(f\"✅ Success: {result}\")\n",
" except Exception as e:\n",
" print(f\"❌ {type(e).__name__}: {e}\")\n",
"\n",
"analyze_exception(lambda: 10 / 2) # Success\n",
"analyze_exception(lambda: 10 / 0) # ZeroDivisionError\n",
"analyze_exception(lambda: int(\"abc\")) # ValueError"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Multiple Exception Types"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Handling different exceptions separately\n",
"def robust_calculator(expression):\n",
" try:\n",
" result = eval(expression) # Note: eval is dangerous in real applications!\n",
" print(f\"✅ {expression} = {result}\")\n",
" return result\n",
" \n",
" except ZeroDivisionError:\n",
" print(f\"❌ Division by zero in: {expression}\")\n",
" \n",
" except NameError as e:\n",
" print(f\"❌ Undefined variable in: {expression}\")\n",
" \n",
" except (TypeError, ValueError) as e:\n",
" print(f\"❌ Type/Value error in: {expression} - {e}\")\n",
"\n",
"# Test with various expressions\n",
"test_expressions = [\"10 + 5\", \"20 / 0\", \"x + 5\", \"'hello' + 5\"]\n",
"\n",
"for expr in test_expressions:\n",
" robust_calculator(expr)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. The Complete try/except/else/finally Structure"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Complete structure demonstration\n",
"def file_processor(filename):\n",
" file_handle = None\n",
" \n",
" try:\n",
" print(f\"📂 Opening: {filename}\")\n",
" file_handle = open(filename, 'r')\n",
" content = file_handle.read()\n",
" print(f\"📖 Read {len(content)} characters\")\n",
" \n",
" except FileNotFoundError:\n",
" print(f\"❌ File not found: {filename}\")\n",
" return None\n",
" \n",
" except PermissionError:\n",
" print(f\"❌ Permission denied: {filename}\")\n",
" return None\n",
" \n",
" else:\n",
" # Runs only if no exception occurred\n",
" print(\"✅ File processing successful\")\n",
" return content\n",
" \n",
" finally:\n",
" # Always runs for cleanup\n",
" if file_handle and not file_handle.closed:\n",
" print(\"🔒 Closing file\")\n",
" file_handle.close()\n",
"\n",
"# Create a test file\n",
"with open(\"test.txt\", \"w\") as f:\n",
" f.write(\"Hello, World!\")\n",
"\n",
"# Test with existing and non-existing files\n",
"file_processor(\"test.txt\")\n",
"print()\n",
"file_processor(\"nonexistent.txt\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Raising Exceptions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Raising exceptions manually\n",
"def validate_age(age):\n",
" if not isinstance(age, (int, float)):\n",
" raise TypeError(f\"Age must be a number, got {type(age).__name__}\")\n",
" \n",
" if age < 0:\n",
" raise ValueError(\"Age cannot be negative\")\n",
" \n",
" if age > 150:\n",
" raise ValueError(\"Age seems unrealistic (over 150)\")\n",
" \n",
" print(f\"✅ Valid age: {age}\")\n",
" return age\n",
"\n",
"# Test age validation\n",
"test_ages = [25, -5, \"thirty\", 200]\n",
"\n",
"for age in test_ages:\n",
" try:\n",
" validate_age(age)\n",
" except (TypeError, ValueError) as e:\n",
" print(f\"❌ {type(e).__name__}: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6. Custom Exceptions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Creating custom exception classes\n",
"class BankingError(Exception):\n",
" \"\"\"Base exception for banking operations\"\"\"\n",
" def __init__(self, message, account_id=None):\n",
" super().__init__(message)\n",
" self.account_id = account_id\n",
"\n",
"class InsufficientFundsError(BankingError):\n",
" \"\"\"Raised when account has insufficient funds\"\"\"\n",
" def __init__(self, required, available, account_id):\n",
" message = f\"Need ${required}, have ${available}\"\n",
" super().__init__(message, account_id)\n",
" self.required = required\n",
" self.available = available\n",
"\n",
"class AccountFrozenError(BankingError):\n",
" \"\"\"Raised when account is frozen\"\"\"\n",
" pass\n",
"\n",
"# Banking system using custom exceptions\n",
"class BankAccount:\n",
" def __init__(self, account_id, balance=0):\n",
" self.account_id = account_id\n",
" self.balance = balance\n",
" self.is_frozen = False\n",
" \n",
" def withdraw(self, amount):\n",
" if self.is_frozen:\n",
" raise AccountFrozenError(f\"Account {self.account_id} is frozen\", self.account_id)\n",
" \n",
" if amount > self.balance:\n",
" raise InsufficientFundsError(amount, self.balance, self.account_id)\n",
" \n",
" self.balance -= amount\n",
" print(f\"✅ Withdrew ${amount}. New balance: ${self.balance}\")\n",
"\n",
"# Test the banking system\n",
"account = BankAccount(\"ACC001\", 100)\n",
"\n",
"try:\n",
" account.withdraw(50) # Should work\n",
" account.withdraw(100) # Should fail - insufficient funds\n",
"except BankingError as e:\n",
" print(f\"❌ Banking error: {e}\")\n",
" print(f\" Account: {e.account_id}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7. Exception Chaining"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Exception chaining with 'from' keyword\n",
"class DataProcessingError(Exception):\n",
" pass\n",
"\n",
"def load_config(filename):\n",
" try:\n",
" with open(filename, 'r') as f:\n",
" import json\n",
" return json.loads(f.read())\n",
" except FileNotFoundError as e:\n",
" raise DataProcessingError(f\"Config file missing: {filename}\") from e\n",
" except json.JSONDecodeError as e:\n",
" raise DataProcessingError(f\"Invalid JSON in: {filename}\") from e\n",
"\n",
"def process_data(config_file):\n",
" try:\n",
" config = load_config(config_file)\n",
" print(f\"✅ Loaded config: {config}\")\n",
" except DataProcessingError as e:\n",
" print(f\"❌ Processing failed: {e}\")\n",
" print(f\" Original cause: {e.__cause__}\")\n",
"\n",
"# Create test files\n",
"import json\n",
"with open(\"valid.json\", \"w\") as f:\n",
" json.dump({\"setting\": \"value\"}, f)\n",
"\n",
"with open(\"invalid.json\", \"w\") as f:\n",
" f.write(\"{ invalid json }\")\n",
"\n",
"# Test exception chaining\n",
"process_data(\"valid.json\") # Should work\n",
"process_data(\"missing.json\") # FileNotFoundError -> DataProcessingError\n",
"process_data(\"invalid.json\") # JSONDecodeError -> DataProcessingError"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 8. Best Practices"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Best practices demonstration\n",
"import logging\n",
"from functools import wraps\n",
"\n",
"logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')\n",
"logger = logging.getLogger(__name__)\n",
"\n",
"# ✅ Good: Specific exception handling\n",
"def good_file_reader(filename):\n",
" try:\n",
" with open(filename, 'r') as f:\n",
" content = f.read()\n",
" logger.info(f\"Read {filename} successfully\")\n",
" return content\n",
" except FileNotFoundError:\n",
" logger.warning(f\"File not found: {filename}\")\n",
" return None\n",
" except PermissionError:\n",
" logger.error(f\"Permission denied: {filename}\")\n",
" return None\n",
" except Exception as e:\n",
" logger.error(f\"Unexpected error: {e}\")\n",
" raise # Re-raise unexpected errors\n",
"\n",
"# Exception handling decorator\n",
"def handle_exceptions(default_return=None, log_errors=True):\n",
" def decorator(func):\n",
" @wraps(func)\n",
" def wrapper(*args, **kwargs):\n",
" try:\n",
" return func(*args, **kwargs)\n",
" except Exception as e:\n",
" if log_errors:\n",
" logger.error(f\"Error in {func.__name__}: {e}\")\n",
" return default_return\n",
" return wrapper\n",
" return decorator\n",
"\n",
"@handle_exceptions(default_return=0)\n",
"def safe_divide(a, b):\n",
" return a / b\n",
"\n",
"# Test best practices\n",
"print(\"=== Testing Best Practices ===\")\n",
"result = good_file_reader(\"test.txt\")\n",
"print(f\"File content: {result[:20] if result else 'None'}...\")\n",
"\n",
"print(f\"Safe divide 10/2: {safe_divide(10, 2)}\")\n",
"print(f\"Safe divide 10/0: {safe_divide(10, 0)}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 9. Debugging Exceptions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Using traceback for debugging\n",
"import traceback\n",
"import sys\n",
"\n",
"def debug_function():\n",
" def level_1():\n",
" return level_2()\n",
" \n",
" def level_2():\n",
" data = {\"key\": \"value\"}\n",
" return data[\"missing_key\"] # KeyError\n",
" \n",
" try:\n",
" level_1()\n",
" except Exception as e:\n",
" print(f\"❌ Exception: {e}\")\n",
" print(\"\\n🔍 Traceback:\")\n",
" traceback.print_exc()\n",
" \n",
" # Get exception info\n",
" exc_type, exc_value, exc_traceback = sys.exc_info()\n",
" print(f\"\\n📊 Exception details:\")\n",
" print(f\" Type: {exc_type.__name__}\")\n",
" print(f\" Value: {exc_value}\")\n",
"\n",
"debug_function()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"### Key Concepts Covered:\n",
"\n",
"🎯 **Basic Structure:**\n",
"- `try`: Code that might raise an exception\n",
"- `except`: Handle specific exceptions\n",
"- `else`: Runs only if no exception occurred\n",
"- `finally`: Always runs for cleanup\n",
"\n",
"🛠️ **Advanced Features:**\n",
"- Multiple exception handling\n",
"- Custom exception classes\n",
"- Exception chaining with `from`\n",
"- Re-raising exceptions with `raise`\n",
"\n",
"📋 **Best Practices:**\n",
"- Use specific exception types\n",
"- Don't suppress exceptions silently\n",
"- Log errors appropriately\n",
"- Clean up resources properly\n",
"- Create meaningful custom exceptions\n",
"\n",
"### Exception Hierarchy (Common Types):\n",
"```\n",
"Exception\n",
" ├── ArithmeticError\n",
" │ └── ZeroDivisionError\n",
" ├── AttributeError\n",
" ├── LookupError\n",
" │ ├── IndexError\n",
" │ └── KeyError\n",
" ├── NameError\n",
" ├── OSError\n",
" │ ├── FileNotFoundError\n",
" │ └── PermissionError\n",
" ├── TypeError\n",
" └── ValueError\n",
"```\n",
"\n",
"### Remember:\n",
"- **Catch specific exceptions** rather than using broad `except Exception`\n",
"- **Always clean up resources** using `finally` or context managers\n",
"- **Log errors meaningfully** to help with debugging\n",
"- **Don't hide failures** - make them visible and actionable"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

View file

@ -0,0 +1,749 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Python Logging Tutorial\n",
"\n",
"This notebook provides a comprehensive guide to Python's logging module, covering basic concepts, advanced configurations, and best practices.\n",
"\n",
"## Table of Contents\n",
"1. [Introduction to Logging](#introduction)\n",
"2. [Basic Logging](#basic-logging)\n",
"3. [Logging Levels](#logging-levels)\n",
"4. [Configuring Loggers](#configuring-loggers)\n",
"5. [Handlers and Formatters](#handlers-formatters)\n",
"6. [Logging to Files](#logging-to-files)\n",
"7. [Advanced Configuration](#advanced-configuration)\n",
"8. [Best Practices](#best-practices)\n",
"9. [Real-world Examples](#real-world-examples)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Introduction to Logging {#introduction}\n",
"\n",
"Logging is a means of tracking events that happen when software runs. It's essential for:\n",
"- **Debugging**: Understanding what went wrong\n",
"- **Monitoring**: Tracking application behavior\n",
"- **Auditing**: Recording important events\n",
"- **Performance**: Identifying bottlenecks\n",
"\n",
"Python's `logging` module provides a flexible framework for emitting log messages from Python programs."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import logging\n",
"import sys\n",
"from datetime import datetime"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Basic Logging {#basic-logging}\n",
"\n",
"The simplest way to start logging is to use the module-level functions provided by the logging module."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Basic logging example\n",
"logging.warning('This is a warning message')\n",
"logging.error('This is an error message')\n",
"logging.critical('This is a critical message')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Why don't we see debug and info messages?\n",
"logging.debug('This debug message will not appear')\n",
"logging.info('This info message will not appear either')\n",
"\n",
"print(f\"Current logging level: {logging.getLogger().getEffectiveLevel()}\")\n",
"print(f\"WARNING level value: {logging.WARNING}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Logging Levels {#logging-levels}\n",
"\n",
"Python logging has five standard levels:\n",
"\n",
"| Level | Numeric Value | When to Use |\n",
"|-------|---------------|-------------|\n",
"| DEBUG | 10 | Detailed information for diagnosing problems |\n",
"| INFO | 20 | General information about program execution |\n",
"| WARNING | 30 | Something unexpected happened, but software still works |\n",
"| ERROR | 40 | Serious problem occurred, software couldn't perform function |\n",
"| CRITICAL | 50 | Very serious error, program may not continue |\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Configure basic logging to see all levels\n",
"logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')\n",
"\n",
"# Now all levels will be displayed\n",
"logging.debug('Debug message - detailed diagnostic info')\n",
"logging.info('Info message - general information')\n",
"logging.warning('Warning message - something unexpected')\n",
"logging.error('Error message - serious problem')\n",
"logging.critical('Critical message - very serious error')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Configuring Loggers {#configuring-loggers}\n",
"\n",
"For more control, create and configure your own logger instances."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a custom logger\n",
"logger = logging.getLogger('my_app')\n",
"logger.setLevel(logging.DEBUG)\n",
"\n",
"# Create console handler\n",
"console_handler = logging.StreamHandler()\n",
"console_handler.setLevel(logging.INFO)\n",
"\n",
"# Create formatter\n",
"formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n",
"console_handler.setFormatter(formatter)\n",
"\n",
"# Add handler to logger\n",
"logger.addHandler(console_handler)\n",
"\n",
"# Test the logger\n",
"logger.debug('This debug message will not appear (handler level is INFO)')\n",
"logger.info('This info message will appear')\n",
"logger.warning('This warning will appear')\n",
"logger.error('This error will appear')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Handlers and Formatters {#handlers-formatters}\n",
"\n",
"Handlers determine where log messages go, and formatters determine how they look."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Different formatter examples\n",
"logger2 = logging.getLogger('formatted_logger')\n",
"logger2.setLevel(logging.DEBUG)\n",
"\n",
"# Simple formatter\n",
"simple_handler = logging.StreamHandler()\n",
"simple_formatter = logging.Formatter('%(levelname)s: %(message)s')\n",
"simple_handler.setFormatter(simple_formatter)\n",
"\n",
"# Detailed formatter\n",
"detailed_handler = logging.StreamHandler()\n",
"detailed_formatter = logging.Formatter(\n",
" '%(asctime)s | %(name)s | %(levelname)-8s | %(filename)s:%(lineno)d | %(message)s'\n",
")\n",
"detailed_handler.setFormatter(detailed_formatter)\n",
"\n",
"logger2.addHandler(simple_handler)\n",
"logger2.info('Message with simple formatting')\n",
"\n",
"# Remove simple handler and add detailed handler\n",
"logger2.removeHandler(simple_handler)\n",
"logger2.addHandler(detailed_handler)\n",
"logger2.info('Message with detailed formatting')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Common Format Attributes\n",
"\n",
"| Attribute | Description |\n",
"|-----------|-------------|\n",
"| %(asctime)s | Human-readable time |\n",
"| %(levelname)s | Log level name |\n",
"| %(name)s | Logger name |\n",
"| %(message)s | Log message |\n",
"| %(filename)s | Filename |\n",
"| %(lineno)d | Line number |\n",
"| %(funcName)s | Function name |\n",
"| %(process)d | Process ID |\n",
"| %(thread)d | Thread ID |"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6. Logging to Files {#logging-to-files}\n",
"\n",
"File logging is crucial for production applications."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a file logger\n",
"file_logger = logging.getLogger('file_logger')\n",
"file_logger.setLevel(logging.DEBUG)\n",
"\n",
"# Create file handler\n",
"file_handler = logging.FileHandler('app.log')\n",
"file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')\n",
"file_handler.setFormatter(file_formatter)\n",
"\n",
"file_logger.addHandler(file_handler)\n",
"\n",
"# Log some messages\n",
"file_logger.info('Application started')\n",
"file_logger.warning('This is a warning')\n",
"file_logger.error('An error occurred')\n",
"\n",
"print(\"Messages logged to 'app.log' file\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Rotating file handler to prevent log files from getting too large\n",
"from logging.handlers import RotatingFileHandler\n",
"\n",
"rotating_logger = logging.getLogger('rotating_logger')\n",
"rotating_logger.setLevel(logging.DEBUG)\n",
"\n",
"# Create rotating file handler (max 1MB, keep 3 backup files)\n",
"rotating_handler = RotatingFileHandler(\n",
" 'rotating_app.log', \n",
" maxBytes=1024*1024, # 1MB\n",
" backupCount=3\n",
")\n",
"rotating_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')\n",
"rotating_handler.setFormatter(rotating_formatter)\n",
"\n",
"rotating_logger.addHandler(rotating_handler)\n",
"\n",
"# Simulate some log entries\n",
"for i in range(5):\n",
" rotating_logger.info(f'Log entry number {i+1}')\n",
"\n",
"print(\"Messages logged to 'rotating_app.log' with rotation\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7. Advanced Configuration {#advanced-configuration}\n",
"\n",
"Use dictConfig for complex logging setups."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import logging.config\n",
"\n",
"# Advanced logging configuration using dictConfig\n",
"LOGGING_CONFIG = {\n",
" 'version': 1,\n",
" 'disable_existing_loggers': False,\n",
" 'formatters': {\n",
" 'standard': {\n",
" 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'\n",
" },\n",
" 'detailed': {\n",
" 'format': '%(asctime)s [%(levelname)s] %(name)s:%(lineno)d: %(message)s'\n",
" }\n",
" },\n",
" 'handlers': {\n",
" 'console': {\n",
" 'level': 'INFO',\n",
" 'class': 'logging.StreamHandler',\n",
" 'formatter': 'standard',\n",
" 'stream': 'ext://sys.stdout'\n",
" },\n",
" 'file': {\n",
" 'level': 'DEBUG',\n",
" 'class': 'logging.FileHandler',\n",
" 'formatter': 'detailed',\n",
" 'filename': 'advanced_app.log',\n",
" 'mode': 'a'\n",
" }\n",
" },\n",
" 'loggers': {\n",
" 'my_app': {\n",
" 'handlers': ['console', 'file'],\n",
" 'level': 'DEBUG',\n",
" 'propagate': False\n",
" }\n",
" },\n",
" 'root': {\n",
" 'level': 'WARNING',\n",
" 'handlers': ['console']\n",
" }\n",
"}\n",
"\n",
"# Apply the configuration\n",
"logging.config.dictConfig(LOGGING_CONFIG)\n",
"\n",
"# Get the configured logger\n",
"advanced_logger = logging.getLogger('my_app')\n",
"\n",
"# Test the advanced logger\n",
"advanced_logger.debug('Debug message - only in file')\n",
"advanced_logger.info('Info message - in both console and file')\n",
"advanced_logger.warning('Warning message')\n",
"advanced_logger.error('Error message')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 8. Best Practices {#best-practices}\n",
"\n",
"Here are some logging best practices to follow:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 1. Use appropriate log levels\n",
"logger = logging.getLogger(__name__) # Use __name__ for logger names\n",
"\n",
"def process_user_data(user_id, data):\n",
" \"\"\"Example function demonstrating good logging practices\"\"\"\n",
" logger.info(f\"Processing data for user {user_id}\")\n",
" \n",
" try:\n",
" # Simulate processing\n",
" if not data:\n",
" logger.warning(f\"No data provided for user {user_id}\")\n",
" return None\n",
" \n",
" # Process data\n",
" result = len(data) # Simple processing\n",
" logger.debug(f\"Processed {result} items for user {user_id}\")\n",
" \n",
" return result\n",
" \n",
" except Exception as e:\n",
" logger.error(f\"Error processing data for user {user_id}: {e}\", exc_info=True)\n",
" raise\n",
" \n",
" finally:\n",
" logger.info(f\"Finished processing for user {user_id}\")\n",
"\n",
"# Test the function\n",
"process_user_data(123, ['item1', 'item2', 'item3'])\n",
"process_user_data(456, [])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 2. Use structured logging for better analysis\n",
"import json\n",
"\n",
"class JSONFormatter(logging.Formatter):\n",
" def format(self, record):\n",
" log_entry = {\n",
" 'timestamp': self.formatTime(record),\n",
" 'level': record.levelname,\n",
" 'logger': record.name,\n",
" 'message': record.getMessage(),\n",
" 'module': record.module,\n",
" 'function': record.funcName,\n",
" 'line': record.lineno\n",
" }\n",
" \n",
" if record.exc_info:\n",
" log_entry['exception'] = self.formatException(record.exc_info)\n",
" \n",
" return json.dumps(log_entry)\n",
"\n",
"# Create a logger with JSON formatting\n",
"json_logger = logging.getLogger('json_logger')\n",
"json_handler = logging.StreamHandler()\n",
"json_handler.setFormatter(JSONFormatter())\n",
"json_logger.addHandler(json_handler)\n",
"json_logger.setLevel(logging.INFO)\n",
"\n",
"json_logger.info('User login successful')\n",
"json_logger.warning('Login attempt from suspicious IP')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 3. Use logging context managers for consistent formatting\n",
"import contextlib\n",
"from contextvars import ContextVar\n",
"\n",
"# Context variable for request ID\n",
"request_id: ContextVar[str] = ContextVar('request_id', default='')\n",
"\n",
"class ContextualFormatter(logging.Formatter):\n",
" def format(self, record):\n",
" # Add request ID to the log record\n",
" record.request_id = request_id.get()\n",
" return super().format(record)\n",
"\n",
"# Set up contextual logger\n",
"contextual_logger = logging.getLogger('contextual')\n",
"contextual_handler = logging.StreamHandler()\n",
"contextual_formatter = ContextualFormatter(\n",
" '[%(request_id)s] %(asctime)s - %(levelname)s - %(message)s'\n",
")\n",
"contextual_handler.setFormatter(contextual_formatter)\n",
"contextual_logger.addHandler(contextual_handler)\n",
"contextual_logger.setLevel(logging.INFO)\n",
"\n",
"@contextlib.contextmanager\n",
"def request_context(req_id):\n",
" \"\"\"Context manager to set request ID for logging\"\"\"\n",
" token = request_id.set(req_id)\n",
" try:\n",
" yield\n",
" finally:\n",
" request_id.reset(token)\n",
"\n",
"# Use the contextual logger\n",
"with request_context('REQ-001'):\n",
" contextual_logger.info('Processing request')\n",
" contextual_logger.warning('Request taking longer than expected')\n",
"\n",
"with request_context('REQ-002'):\n",
" contextual_logger.info('Processing another request')\n",
" contextual_logger.error('Request failed')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 9. Real-world Examples {#real-world-examples}\n",
"\n",
"Here are some practical examples of logging in real applications."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example 1: Web API logging\n",
"import time\n",
"import uuid\n",
"\n",
"# Set up API logger\n",
"api_logger = logging.getLogger('api')\n",
"api_handler = logging.StreamHandler()\n",
"api_formatter = logging.Formatter(\n",
" '%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n",
")\n",
"api_handler.setFormatter(api_formatter)\n",
"api_logger.addHandler(api_handler)\n",
"api_logger.setLevel(logging.INFO)\n",
"\n",
"def api_endpoint(endpoint, user_id, request_data):\n",
" \"\"\"Simulate an API endpoint with proper logging\"\"\"\n",
" request_id = str(uuid.uuid4())[:8]\n",
" start_time = time.time()\n",
" \n",
" api_logger.info(\n",
" f\"[{request_id}] {endpoint} - User: {user_id} - Request started\"\n",
" )\n",
" \n",
" try:\n",
" # Simulate processing\n",
" time.sleep(0.1) # Simulate work\n",
" \n",
" if 'error' in request_data:\n",
" raise ValueError(\"Invalid request data\")\n",
" \n",
" # Simulate success\n",
" response = {\"status\": \"success\", \"data\": \"processed\"}\n",
" \n",
" duration = time.time() - start_time\n",
" api_logger.info(\n",
" f\"[{request_id}] {endpoint} - User: {user_id} - \"\n",
" f\"Request completed successfully in {duration:.3f}s\"\n",
" )\n",
" \n",
" return response\n",
" \n",
" except Exception as e:\n",
" duration = time.time() - start_time\n",
" api_logger.error(\n",
" f\"[{request_id}] {endpoint} - User: {user_id} - \"\n",
" f\"Request failed in {duration:.3f}s: {e}\",\n",
" exc_info=True\n",
" )\n",
" raise\n",
"\n",
"# Test the API endpoint\n",
"api_endpoint('/users/profile', 'user123', {'name': 'John'})\n",
"try:\n",
" api_endpoint('/users/profile', 'user456', {'error': 'invalid'})\n",
"except ValueError:\n",
" pass # Expected error"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example 2: Database operation logging\n",
"db_logger = logging.getLogger('database')\n",
"db_handler = logging.StreamHandler()\n",
"db_formatter = logging.Formatter(\n",
" '%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n",
")\n",
"db_handler.setFormatter(db_formatter)\n",
"db_logger.addHandler(db_handler)\n",
"db_logger.setLevel(logging.DEBUG)\n",
"\n",
"class DatabaseManager:\n",
" def __init__(self):\n",
" self.logger = logging.getLogger('database.manager')\n",
" \n",
" def connect(self):\n",
" self.logger.info(\"Establishing database connection\")\n",
" # Simulate connection\n",
" time.sleep(0.05)\n",
" self.logger.info(\"Database connection established\")\n",
" \n",
" def execute_query(self, query, params=None):\n",
" query_id = str(uuid.uuid4())[:8]\n",
" start_time = time.time()\n",
" \n",
" self.logger.debug(f\"[{query_id}] Executing query: {query}\")\n",
" if params:\n",
" self.logger.debug(f\"[{query_id}] Query parameters: {params}\")\n",
" \n",
" try:\n",
" # Simulate query execution\n",
" time.sleep(0.02)\n",
" \n",
" if 'DROP' in query.upper():\n",
" raise Exception(\"DROP operations are not allowed\")\n",
" \n",
" duration = time.time() - start_time\n",
" self.logger.info(\n",
" f\"[{query_id}] Query executed successfully in {duration:.3f}s\"\n",
" )\n",
" \n",
" return {\"rows\": 5, \"affected\": 1}\n",
" \n",
" except Exception as e:\n",
" duration = time.time() - start_time\n",
" self.logger.error(\n",
" f\"[{query_id}] Query failed in {duration:.3f}s: {e}\"\n",
" )\n",
" raise\n",
" \n",
" def close(self):\n",
" self.logger.info(\"Closing database connection\")\n",
"\n",
"# Test database operations\n",
"db = DatabaseManager()\n",
"db.connect()\n",
"db.execute_query(\"SELECT * FROM users WHERE id = ?\", [123])\n",
"db.execute_query(\"UPDATE users SET last_login = NOW() WHERE id = ?\", [123])\n",
"\n",
"try:\n",
" db.execute_query(\"DROP TABLE users\")\n",
"except Exception:\n",
" pass # Expected error\n",
" \n",
"db.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example 3: Application startup and shutdown logging\n",
"app_logger = logging.getLogger('application')\n",
"app_handler = logging.StreamHandler()\n",
"app_formatter = logging.Formatter(\n",
" '%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n",
")\n",
"app_handler.setFormatter(app_formatter)\n",
"app_logger.addHandler(app_handler)\n",
"app_logger.setLevel(logging.INFO)\n",
"\n",
"class Application:\n",
" def __init__(self, name, version):\n",
" self.name = name\n",
" self.version = version\n",
" self.logger = logging.getLogger(f'application.{name.lower()}')\n",
" self.running = False\n",
" \n",
" def startup(self):\n",
" \"\"\"Application startup with comprehensive logging\"\"\"\n",
" self.logger.info(f\"Starting {self.name} v{self.version}\")\n",
" \n",
" try:\n",
" # Log system information\n",
" import platform\n",
" self.logger.info(f\"Python version: {platform.python_version()}\")\n",
" self.logger.info(f\"Platform: {platform.platform()}\")\n",
" \n",
" # Initialize components\n",
" self.logger.info(\"Initializing components...\")\n",
" \n",
" components = ['Database', 'Cache', 'API Server', 'Background Tasks']\n",
" for component in components:\n",
" self.logger.info(f\"Initializing {component}...\")\n",
" time.sleep(0.01) # Simulate initialization\n",
" self.logger.info(f\"{component} initialized successfully\")\n",
" \n",
" self.running = True\n",
" self.logger.info(f\"{self.name} started successfully\")\n",
" \n",
" except Exception as e:\n",
" self.logger.critical(f\"Failed to start {self.name}: {e}\", exc_info=True)\n",
" raise\n",
" \n",
" def shutdown(self):\n",
" \"\"\"Application shutdown with proper cleanup logging\"\"\"\n",
" self.logger.info(f\"Shutting down {self.name}...\")\n",
" \n",
" try:\n",
" # Cleanup components in reverse order\n",
" components = ['Background Tasks', 'API Server', 'Cache', 'Database']\n",
" for component in components:\n",
" self.logger.info(f\"Stopping {component}...\")\n",
" time.sleep(0.01) # Simulate cleanup\n",
" self.logger.info(f\"{component} stopped\")\n",
" \n",
" self.running = False\n",
" self.logger.info(f\"{self.name} shutdown completed\")\n",
" \n",
" except Exception as e:\n",
" self.logger.error(f\"Error during shutdown: {e}\", exc_info=True)\n",
" \n",
"# Test application lifecycle\n",
"app = Application(\"MyWebApp\", \"1.2.3\")\n",
"app.startup()\n",
"time.sleep(0.1) # Simulate running\n",
"app.shutdown()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary\n",
"\n",
"This notebook covered:\n",
"\n",
"1. **Basic logging concepts** and module-level functions\n",
"2. **Logging levels** and when to use each one\n",
"3. **Custom loggers** with handlers and formatters\n",
"4. **File logging** with rotation capabilities\n",
"5. **Advanced configuration** using dictConfig\n",
"6. **Best practices** for production applications\n",
"7. **Real-world examples** from web APIs, databases, and application lifecycle\n",
"\n",
"### Key Takeaways:\n",
"\n",
"- Use appropriate log levels for different types of information\n",
"- Configure loggers with proper formatters for consistency\n",
"- Use file handlers with rotation for production systems\n",
"- Include context information (request IDs, user IDs) in log messages\n",
"- Log both successful operations and errors with appropriate detail\n",
"- Structure your logs for easy parsing and analysis\n",
"- Use exc_info=True for exception logging to get stack traces\n",
"\n",
"Remember: Good logging is essential for maintaining and debugging applications in production!"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}