# Session 5: Event-Driven Trading Bot Foundation

## Notebook 1: Event System Foundation

**Learning Objectives:**
- Understand the Publisher-Subscriber pattern
- Build a basic event queue system
- Create event types for trading bots
- Test component communication through events

**Why This Matters:**
Instead of components calling each other directly (tight coupling), they communicate through events (loose coupling). This makes your trading bot more professional, testable, and scalable.

---

## Part 1: Basic Event System

Let's start with the simplest possible event system - a basic Event class that carries information between components.

In [1]:
from datetime import datetime
from typing import Dict, Any, List, Callable
from dataclasses import dataclass
import json

# Basic Event class - every message in our system
@dataclass
class Event:
 """Base class for all events in our trading system"""
 event_type: str
 data: Dict[str, Any]
 timestamp: datetime = None
 source: str = "unknown"
 
 def __post_init__(self):
 if self.timestamp is None:
 self.timestamp = datetime.now()
 
 def to_dict(self) -> Dict[str, Any]:
 """Convert event to dictionary for logging/debugging"""
 return {
 'event_type': self.event_type,
 'data': self.data,
 'timestamp': self.timestamp.isoformat(),
 'source': self.source
 }
 
 def __str__(self):
 return f"Event({self.event_type} from {self.source} at {self.timestamp.strftime('%H:%M:%S')})"

# Test our basic Event
test_event = Event(
 event_type="price_update",
 data={"symbol": "BTC/USDT", "price": 43250.50},
 source="market_data_handler"
)

print("Basic Event:")
print(test_event)
print("\nEvent Data:")
print(json.dumps(test_event.to_dict(), indent=2, default=str))

Basic Event:
Event(price_update from market_data_handler at 19:06:23)

Event Data:
{
 "event_type": "price_update",
 "data": {
 "symbol": "BTC/USDT",
 "price": 43250.5
 },
 "timestamp": "2025-06-25T19:06:23.130343",
 "source": "market_data_handler"
}


## Part 2: Event Queue (Message Bus)

Now we need a central place where all events go - the Event Queue. This is the "post office" of our trading bot.

In [2]:
from collections import deque
from threading import Lock

class EventQueue:
 """Central event queue - the heart of our event-driven system"""
 
 def __init__(self):
 self.events = deque() # Fast append/pop from both ends
 self.subscribers = {} # {event_type: [callback_functions]}
 self.lock = Lock() # Thread safety for production
 self.event_history = [] # Keep history for debugging
 
 def subscribe(self, event_type: str, callback: Callable[[Event], None]):
 """Subscribe to specific event types"""
 with self.lock:
 if event_type not in self.subscribers:
 self.subscribers[event_type] = []
 self.subscribers[event_type].append(callback)
 print(f"โœ… Subscribed to '{event_type}' events")
 
 def publish(self, event: Event):
 """Publish an event to all subscribers"""
 with self.lock:
 # Add to queue
 self.events.append(event)
 self.event_history.append(event)
 
 # Notify subscribers immediately
 if event.event_type in self.subscribers:
 for callback in self.subscribers[event.event_type]:
 try:
 callback(event)
 except Exception as e:
 print(f"โŒ Error in callback for {event.event_type}: {e}")
 
 print(f"๐Ÿ“ค Published: {event}")
 
 def get_next_event(self) -> Event:
 """Get next event from queue (useful for batch processing)"""
 with self.lock:
 if self.events:
 return self.events.popleft()
 return None
 
 def get_stats(self) -> Dict[str, Any]:
 """Get queue statistics"""
 with self.lock:
 return {
 'events_in_queue': len(self.events),
 'total_events_processed': len(self.event_history),
 'event_types_subscribed': list(self.subscribers.keys()),
 'subscribers_count': {k: len(v) for k, v in self.subscribers.items()}
 }

# Create our central event queue
event_queue = EventQueue()
print("๐Ÿ—๏ธ Event Queue created!")
print(f"Initial stats: {event_queue.get_stats()}")

๐Ÿ—๏ธ Event Queue created!
Initial stats: {'events_in_queue': 0, 'total_events_processed': 0, 'event_types_subscribed': [], 'subscribers_count': {}}


## Part 3: Trading Event Types

Let's create specific event types that our MARKET components will use to communicate.

In [3]:
# Specific event types for our trading system

class MarketDataEvent(Event):
 """Market data events - price updates, order book changes"""
 def __init__(self, symbol: str, price: float, volume: float = 0, source: str = "market_data"):
 super().__init__(
 event_type="market_data",
 data={
 "symbol": symbol,
 "price": price,
 "volume": volume
 },
 source=source
 )

class SignalEvent(Event):
 """Trading signal events - buy/sell signals from algorithm"""
 def __init__(self, symbol: str, signal_type: str, strength: float, source: str = "algorithm"):
 super().__init__(
 event_type="signal",
 data={
 "symbol": symbol,
 "signal_type": signal_type, # 'BUY', 'SELL', 'HOLD'
 "strength": strength # 0.0 to 1.0
 },
 source=source
 )

class OrderEvent(Event):
 """Order events - order requests, fills, cancellations"""
 def __init__(self, symbol: str, order_type: str, quantity: float, price: float = None, source: str = "order_manager"):
 super().__init__(
 event_type="order",
 data={
 "symbol": symbol,
 "order_type": order_type, # 'MARKET', 'LIMIT', 'STOP'
 "quantity": quantity,
 "price": price
 },
 source=source
 )

class RiskEvent(Event):
 """Risk management events - warnings, violations, limits"""
 def __init__(self, risk_type: str, message: str, severity: str = "WARNING", source: str = "risk_controller"):
 super().__init__(
 event_type="risk",
 data={
 "risk_type": risk_type,
 "message": message,
 "severity": severity # 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
 },
 source=source
 )

# Test our trading event types
print("๐Ÿงช Testing Trading Event Types:\n")

# Market data event
market_event = MarketDataEvent("BTC/USDT", 43250.50, 1.25)
print(f"Market Event: {market_event}")

# Signal event
signal_event = SignalEvent("BTC/USDT", "BUY", 0.8)
print(f"Signal Event: {signal_event}")

# Order event
order_event = OrderEvent("BTC/USDT", "MARKET", 0.1)
print(f"Order Event: {order_event}")

# Risk event
risk_event = RiskEvent("position_limit", "Position size exceeds 10% of portfolio", "WARNING")
print(f"Risk Event: {risk_event}")

๐Ÿงช Testing Trading Event Types:

Market Event: Event(market_data from market_data at 19:06:23)
Signal Event: Event(signal from algorithm at 19:06:23)
Order Event: Event(order from order_manager at 19:06:23)
Risk Event: Event(risk from risk_controller at 19:06:23)


## Part 4: Simple Component Example

Let's create two simple components that communicate through events - this demonstrates the publisher-subscriber pattern.

In [4]:
class SimpleMarketDataHandler:
 """Simulates receiving market data and publishing events"""
 
 def __init__(self, event_queue: EventQueue):
 self.event_queue = event_queue
 self.name = "market_data_handler"
 
 def simulate_price_update(self, symbol: str, price: float):
 """Simulate receiving a price update from exchange"""
 print(f"๐Ÿ“Š {self.name}: Received price update for {symbol}: ${price:,.2f}")
 
 # Create and publish market data event
 event = MarketDataEvent(symbol, price, source=self.name)
 self.event_queue.publish(event)


class SimpleAlgorithm:
 """Simple algorithm that listens to market data and generates signals"""
 
 def __init__(self, event_queue: EventQueue):
 self.event_queue = event_queue
 self.name = "simple_algorithm"
 self.last_price = None
 
 # Subscribe to market data events
 self.event_queue.subscribe("market_data", self.on_market_data)
 
 def on_market_data(self, event: Event):
 """Handle market data events"""
 symbol = event.data["symbol"]
 price = event.data["price"]
 
 print(f"๐Ÿค– {self.name}: Processing {symbol} price: ${price:,.2f}")
 
 # Simple momentum strategy
 if self.last_price is not None:
 price_change = (price - self.last_price) / self.last_price
 
 if price_change > 0.01: # 1% increase
 signal = SignalEvent(symbol, "BUY", 0.7, source=self.name)
 self.event_queue.publish(signal)
 print(f" ๐Ÿ“ˆ Generated BUY signal (price up {price_change:.2%})")
 
 elif price_change < -0.01: # 1% decrease
 signal = SignalEvent(symbol, "SELL", 0.7, source=self.name)
 self.event_queue.publish(signal)
 print(f" ๐Ÿ“‰ Generated SELL signal (price down {price_change:.2%})")
 
 self.last_price = price


class SimpleRiskController:
 """Simple risk controller that monitors signals"""
 
 def __init__(self, event_queue: EventQueue):
 self.event_queue = event_queue
 self.name = "risk_controller"
 
 # Subscribe to signal events
 self.event_queue.subscribe("signal", self.on_signal)
 
 def on_signal(self, event: Event):
 """Validate trading signals"""
 symbol = event.data["symbol"]
 signal_type = event.data["signal_type"]
 strength = event.data["strength"]
 
 print(f"๐Ÿ›ก๏ธ {self.name}: Validating {signal_type} signal for {symbol} (strength: {strength})")
 
 # Simple validation - only allow strong signals
 if strength >= 0.6:
 print(f" โœ… Signal APPROVED - strength sufficient ({strength})")
 # In real system, would publish order_request event here
 else:
 risk_event = RiskEvent(
 "weak_signal", 
 f"Signal strength too low: {strength} < 0.6", 
 "WARNING",
 source=self.name
 )
 self.event_queue.publish(risk_event)
 print(f" โŒ Signal REJECTED - strength too low")


# Create our components
print("๐Ÿ—๏ธ Creating Components...\n")

market_handler = SimpleMarketDataHandler(event_queue)
algorithm = SimpleAlgorithm(event_queue)
risk_controller = SimpleRiskController(event_queue)

print("\n๐Ÿ“Š Current Event Queue Stats:")
print(json.dumps(event_queue.get_stats(), indent=2))

๐Ÿ—๏ธ Creating Components...

โœ… Subscribed to 'market_data' events
โœ… Subscribed to 'signal' events

๐Ÿ“Š Current Event Queue Stats:
{
 "events_in_queue": 0,
 "total_events_processed": 0,
 "event_types_subscribed": [
 "market_data",
 "signal"
 ],
 "subscribers_count": {
 "market_data": 1,
 "signal": 1
 }
}


## Part 5: Event System in Action

Now let's test our event-driven system by simulating market data updates!

In [5]:
import time

print("๐ŸŽฌ DEMO: Event-Driven Trading Bot in Action!\n")
print("=" * 60)

# Simulate market data updates
test_prices = [
 ("BTC/USDT", 43000.00), # Starting price
 ("BTC/USDT", 43450.00), # 1.05% increase -> should trigger BUY
 ("BTC/USDT", 43200.00), # -0.58% change -> no signal
 ("BTC/USDT", 42750.00), # -1.04% decrease -> should trigger SELL
 ("BTC/USDT", 43100.00), # 0.82% increase -> no signal
]

for i, (symbol, price) in enumerate(test_prices, 1):
 print(f"\n๐Ÿ“ก Market Update #{i}:")
 print("-" * 40)
 
 # This will trigger the entire event chain!
 market_handler.simulate_price_update(symbol, price)
 
 # Small delay to make it readable
 time.sleep(0.5)

print("\n" + "=" * 60)
print("๐Ÿ“ˆ Final Event Queue Statistics:")
print(json.dumps(event_queue.get_stats(), indent=2))

print(f"\n๐Ÿ“‹ Total Events Processed: {len(event_queue.event_history)}")
print("Event Types:")
event_types = {}
for event in event_queue.event_history:
 event_types[event.event_type] = event_types.get(event.event_type, 0) + 1

for event_type, count in event_types.items():
 print(f" โ€ข {event_type}: {count} events")

๐ŸŽฌ DEMO: Event-Driven Trading Bot in Action!


๐Ÿ“ก Market Update #1:
----------------------------------------
๐Ÿ“Š market_data_handler: Received price update for BTC/USDT: $43,000.00
๐Ÿค– simple_algorithm: Processing BTC/USDT price: $43,000.00
๐Ÿ“ค Published: Event(market_data from market_data_handler at 19:06:23)

๐Ÿ“ก Market Update #2:
----------------------------------------
๐Ÿ“Š market_data_handler: Received price update for BTC/USDT: $43,450.00
๐Ÿค– simple_algorithm: Processing BTC/USDT price: $43,450.00


KeyboardInterrupt: 

## Part 6: Your Turn - Practice Exercise

**Challenge:** Create a simple `PortfolioTracker` component that:
1. Subscribes to `order` events
2. Keeps track of positions for each symbol
3. Publishes `portfolio_update` events when positions change

**Hint:** You'll need to create a `PortfolioEvent` class first!

In [None]:
# TODO: Create PortfolioEvent class
class PortfolioEvent(Event):
 """Portfolio update events"""
 def __init__(self, symbol: str, position: float, avg_price: float, unrealized_pnl: float, source: str = "portfolio_tracker"):
 # YOUR CODE HERE
 pass

# TODO: Create PortfolioTracker class
class PortfolioTracker:
 """Tracks portfolio positions and P&L"""
 
 def __init__(self, event_queue: EventQueue):
 # YOUR CODE HERE
 pass
 
 def on_order(self, event: Event):
 """Handle order events and update positions"""
 # YOUR CODE HERE
 pass

# Test your implementation
# portfolio_tracker = PortfolioTracker(event_queue)

# # Simulate some orders
# test_order = OrderEvent("BTC/USDT", "MARKET", 0.1, 43000, source="test")
# event_queue.publish(test_order)

## Congratulations!

You've just built the foundation of a professional event-driven trading system! Here's what you accomplished:

โœ… **Event System**: Created a robust event queue with publisher-subscriber pattern 
โœ… **Trading Events**: Built specific event types for market data, signals, orders, and risk 
โœ… **Component Communication**: Made components talk through events (loose coupling) 
โœ… **Real Demo**: Saw the entire chain work from market data โ†’ algorithm โ†’ risk control 

## Key Takeaways:

1. **Loose Coupling**: Components don't know about each other - they only know about events
2. **Scalability**: Easy to add new components by subscribing to events
3. **Testability**: Each component can be tested independently
4. **Professional**: This is how real trading firms build their systems

## Next Steps:

In the next notebook, we'll build a complete **Order Management System** that uses this event foundation to handle market, limit, and stop orders professionally!

---
*"The best trading bots are not just profitable - they're architecturally sound."* ๐Ÿ—๏ธ๐Ÿ’ฐ