1
Fork 0

Session_05 files

This commit is contained in:
Jakub Polec 2025-06-26 15:46:49 +02:00
parent 067a4cce7c
commit 3de9fab5e2
20 changed files with 6679 additions and 0 deletions

View file

@ -0,0 +1,610 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Session 5: Event-Driven Trading Bot Foundation\n",
"\n",
"## Notebook 1: Event System Foundation\n",
"\n",
"**Learning Objectives:**\n",
"- Understand the Publisher-Subscriber pattern\n",
"- Build a basic event queue system\n",
"- Create event types for trading bots\n",
"- Test component communication through events\n",
"\n",
"**Why This Matters:**\n",
"Instead of components calling each other directly (tight coupling), they communicate through events (loose coupling). This makes your trading bot more professional, testable, and scalable.\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 1: Basic Event System\n",
"\n",
"Let's start with the simplest possible event system - a basic Event class that carries information between components."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Basic Event:\n",
"Event(price_update from market_data_handler at 19:06:23)\n",
"\n",
"Event Data:\n",
"{\n",
" \"event_type\": \"price_update\",\n",
" \"data\": {\n",
" \"symbol\": \"BTC/USDT\",\n",
" \"price\": 43250.5\n",
" },\n",
" \"timestamp\": \"2025-06-25T19:06:23.130343\",\n",
" \"source\": \"market_data_handler\"\n",
"}\n"
]
}
],
"source": [
"from datetime import datetime\n",
"from typing import Dict, Any, List, Callable\n",
"from dataclasses import dataclass\n",
"import json\n",
"\n",
"# Basic Event class - every message in our system\n",
"@dataclass\n",
"class Event:\n",
" \"\"\"Base class for all events in our trading system\"\"\"\n",
" event_type: str\n",
" data: Dict[str, Any]\n",
" timestamp: datetime = None\n",
" source: str = \"unknown\"\n",
" \n",
" def __post_init__(self):\n",
" if self.timestamp is None:\n",
" self.timestamp = datetime.now()\n",
" \n",
" def to_dict(self) -> Dict[str, Any]:\n",
" \"\"\"Convert event to dictionary for logging/debugging\"\"\"\n",
" return {\n",
" 'event_type': self.event_type,\n",
" 'data': self.data,\n",
" 'timestamp': self.timestamp.isoformat(),\n",
" 'source': self.source\n",
" }\n",
" \n",
" def __str__(self):\n",
" return f\"Event({self.event_type} from {self.source} at {self.timestamp.strftime('%H:%M:%S')})\"\n",
"\n",
"# Test our basic Event\n",
"test_event = Event(\n",
" event_type=\"price_update\",\n",
" data={\"symbol\": \"BTC/USDT\", \"price\": 43250.50},\n",
" source=\"market_data_handler\"\n",
")\n",
"\n",
"print(\"Basic Event:\")\n",
"print(test_event)\n",
"print(\"\\nEvent Data:\")\n",
"print(json.dumps(test_event.to_dict(), indent=2, default=str))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 2: Event Queue (Message Bus)\n",
"\n",
"Now we need a central place where all events go - the Event Queue. This is the \"post office\" of our trading bot."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🏗️ Event Queue created!\n",
"Initial stats: {'events_in_queue': 0, 'total_events_processed': 0, 'event_types_subscribed': [], 'subscribers_count': {}}\n"
]
}
],
"source": [
"from collections import deque\n",
"from threading import Lock\n",
"\n",
"class EventQueue:\n",
" \"\"\"Central event queue - the heart of our event-driven system\"\"\"\n",
" \n",
" def __init__(self):\n",
" self.events = deque() # Fast append/pop from both ends\n",
" self.subscribers = {} # {event_type: [callback_functions]}\n",
" self.lock = Lock() # Thread safety for production\n",
" self.event_history = [] # Keep history for debugging\n",
" \n",
" def subscribe(self, event_type: str, callback: Callable[[Event], None]):\n",
" \"\"\"Subscribe to specific event types\"\"\"\n",
" with self.lock:\n",
" if event_type not in self.subscribers:\n",
" self.subscribers[event_type] = []\n",
" self.subscribers[event_type].append(callback)\n",
" print(f\"✅ Subscribed to '{event_type}' events\")\n",
" \n",
" def publish(self, event: Event):\n",
" \"\"\"Publish an event to all subscribers\"\"\"\n",
" with self.lock:\n",
" # Add to queue\n",
" self.events.append(event)\n",
" self.event_history.append(event)\n",
" \n",
" # Notify subscribers immediately\n",
" if event.event_type in self.subscribers:\n",
" for callback in self.subscribers[event.event_type]:\n",
" try:\n",
" callback(event)\n",
" except Exception as e:\n",
" print(f\"❌ Error in callback for {event.event_type}: {e}\")\n",
" \n",
" print(f\"📤 Published: {event}\")\n",
" \n",
" def get_next_event(self) -> Event:\n",
" \"\"\"Get next event from queue (useful for batch processing)\"\"\"\n",
" with self.lock:\n",
" if self.events:\n",
" return self.events.popleft()\n",
" return None\n",
" \n",
" def get_stats(self) -> Dict[str, Any]:\n",
" \"\"\"Get queue statistics\"\"\"\n",
" with self.lock:\n",
" return {\n",
" 'events_in_queue': len(self.events),\n",
" 'total_events_processed': len(self.event_history),\n",
" 'event_types_subscribed': list(self.subscribers.keys()),\n",
" 'subscribers_count': {k: len(v) for k, v in self.subscribers.items()}\n",
" }\n",
"\n",
"# Create our central event queue\n",
"event_queue = EventQueue()\n",
"print(\"🏗️ Event Queue created!\")\n",
"print(f\"Initial stats: {event_queue.get_stats()}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 3: Trading Event Types\n",
"\n",
"Let's create specific event types that our MARKET components will use to communicate."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🧪 Testing Trading Event Types:\n",
"\n",
"Market Event: Event(market_data from market_data at 19:06:23)\n",
"Signal Event: Event(signal from algorithm at 19:06:23)\n",
"Order Event: Event(order from order_manager at 19:06:23)\n",
"Risk Event: Event(risk from risk_controller at 19:06:23)\n"
]
}
],
"source": [
"# Specific event types for our trading system\n",
"\n",
"class MarketDataEvent(Event):\n",
" \"\"\"Market data events - price updates, order book changes\"\"\"\n",
" def __init__(self, symbol: str, price: float, volume: float = 0, source: str = \"market_data\"):\n",
" super().__init__(\n",
" event_type=\"market_data\",\n",
" data={\n",
" \"symbol\": symbol,\n",
" \"price\": price,\n",
" \"volume\": volume\n",
" },\n",
" source=source\n",
" )\n",
"\n",
"class SignalEvent(Event):\n",
" \"\"\"Trading signal events - buy/sell signals from algorithm\"\"\"\n",
" def __init__(self, symbol: str, signal_type: str, strength: float, source: str = \"algorithm\"):\n",
" super().__init__(\n",
" event_type=\"signal\",\n",
" data={\n",
" \"symbol\": symbol,\n",
" \"signal_type\": signal_type, # 'BUY', 'SELL', 'HOLD'\n",
" \"strength\": strength # 0.0 to 1.0\n",
" },\n",
" source=source\n",
" )\n",
"\n",
"class OrderEvent(Event):\n",
" \"\"\"Order events - order requests, fills, cancellations\"\"\"\n",
" def __init__(self, symbol: str, order_type: str, quantity: float, price: float = None, source: str = \"order_manager\"):\n",
" super().__init__(\n",
" event_type=\"order\",\n",
" data={\n",
" \"symbol\": symbol,\n",
" \"order_type\": order_type, # 'MARKET', 'LIMIT', 'STOP'\n",
" \"quantity\": quantity,\n",
" \"price\": price\n",
" },\n",
" source=source\n",
" )\n",
"\n",
"class RiskEvent(Event):\n",
" \"\"\"Risk management events - warnings, violations, limits\"\"\"\n",
" def __init__(self, risk_type: str, message: str, severity: str = \"WARNING\", source: str = \"risk_controller\"):\n",
" super().__init__(\n",
" event_type=\"risk\",\n",
" data={\n",
" \"risk_type\": risk_type,\n",
" \"message\": message,\n",
" \"severity\": severity # 'INFO', 'WARNING', 'ERROR', 'CRITICAL'\n",
" },\n",
" source=source\n",
" )\n",
"\n",
"# Test our trading event types\n",
"print(\"🧪 Testing Trading Event Types:\\n\")\n",
"\n",
"# Market data event\n",
"market_event = MarketDataEvent(\"BTC/USDT\", 43250.50, 1.25)\n",
"print(f\"Market Event: {market_event}\")\n",
"\n",
"# Signal event\n",
"signal_event = SignalEvent(\"BTC/USDT\", \"BUY\", 0.8)\n",
"print(f\"Signal Event: {signal_event}\")\n",
"\n",
"# Order event\n",
"order_event = OrderEvent(\"BTC/USDT\", \"MARKET\", 0.1)\n",
"print(f\"Order Event: {order_event}\")\n",
"\n",
"# Risk event\n",
"risk_event = RiskEvent(\"position_limit\", \"Position size exceeds 10% of portfolio\", \"WARNING\")\n",
"print(f\"Risk Event: {risk_event}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 4: Simple Component Example\n",
"\n",
"Let's create two simple components that communicate through events - this demonstrates the publisher-subscriber pattern."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🏗️ Creating Components...\n",
"\n",
"✅ Subscribed to 'market_data' events\n",
"✅ Subscribed to 'signal' events\n",
"\n",
"📊 Current Event Queue Stats:\n",
"{\n",
" \"events_in_queue\": 0,\n",
" \"total_events_processed\": 0,\n",
" \"event_types_subscribed\": [\n",
" \"market_data\",\n",
" \"signal\"\n",
" ],\n",
" \"subscribers_count\": {\n",
" \"market_data\": 1,\n",
" \"signal\": 1\n",
" }\n",
"}\n"
]
}
],
"source": [
"class SimpleMarketDataHandler:\n",
" \"\"\"Simulates receiving market data and publishing events\"\"\"\n",
" \n",
" def __init__(self, event_queue: EventQueue):\n",
" self.event_queue = event_queue\n",
" self.name = \"market_data_handler\"\n",
" \n",
" def simulate_price_update(self, symbol: str, price: float):\n",
" \"\"\"Simulate receiving a price update from exchange\"\"\"\n",
" print(f\"📊 {self.name}: Received price update for {symbol}: ${price:,.2f}\")\n",
" \n",
" # Create and publish market data event\n",
" event = MarketDataEvent(symbol, price, source=self.name)\n",
" self.event_queue.publish(event)\n",
"\n",
"\n",
"class SimpleAlgorithm:\n",
" \"\"\"Simple algorithm that listens to market data and generates signals\"\"\"\n",
" \n",
" def __init__(self, event_queue: EventQueue):\n",
" self.event_queue = event_queue\n",
" self.name = \"simple_algorithm\"\n",
" self.last_price = None\n",
" \n",
" # Subscribe to market data events\n",
" self.event_queue.subscribe(\"market_data\", self.on_market_data)\n",
" \n",
" def on_market_data(self, event: Event):\n",
" \"\"\"Handle market data events\"\"\"\n",
" symbol = event.data[\"symbol\"]\n",
" price = event.data[\"price\"]\n",
" \n",
" print(f\"🤖 {self.name}: Processing {symbol} price: ${price:,.2f}\")\n",
" \n",
" # Simple momentum strategy\n",
" if self.last_price is not None:\n",
" price_change = (price - self.last_price) / self.last_price\n",
" \n",
" if price_change > 0.01: # 1% increase\n",
" signal = SignalEvent(symbol, \"BUY\", 0.7, source=self.name)\n",
" self.event_queue.publish(signal)\n",
" print(f\" 📈 Generated BUY signal (price up {price_change:.2%})\")\n",
" \n",
" elif price_change < -0.01: # 1% decrease\n",
" signal = SignalEvent(symbol, \"SELL\", 0.7, source=self.name)\n",
" self.event_queue.publish(signal)\n",
" print(f\" 📉 Generated SELL signal (price down {price_change:.2%})\")\n",
" \n",
" self.last_price = price\n",
"\n",
"\n",
"class SimpleRiskController:\n",
" \"\"\"Simple risk controller that monitors signals\"\"\"\n",
" \n",
" def __init__(self, event_queue: EventQueue):\n",
" self.event_queue = event_queue\n",
" self.name = \"risk_controller\"\n",
" \n",
" # Subscribe to signal events\n",
" self.event_queue.subscribe(\"signal\", self.on_signal)\n",
" \n",
" def on_signal(self, event: Event):\n",
" \"\"\"Validate trading signals\"\"\"\n",
" symbol = event.data[\"symbol\"]\n",
" signal_type = event.data[\"signal_type\"]\n",
" strength = event.data[\"strength\"]\n",
" \n",
" print(f\"🛡️ {self.name}: Validating {signal_type} signal for {symbol} (strength: {strength})\")\n",
" \n",
" # Simple validation - only allow strong signals\n",
" if strength >= 0.6:\n",
" print(f\" ✅ Signal APPROVED - strength sufficient ({strength})\")\n",
" # In real system, would publish order_request event here\n",
" else:\n",
" risk_event = RiskEvent(\n",
" \"weak_signal\", \n",
" f\"Signal strength too low: {strength} < 0.6\", \n",
" \"WARNING\",\n",
" source=self.name\n",
" )\n",
" self.event_queue.publish(risk_event)\n",
" print(f\" ❌ Signal REJECTED - strength too low\")\n",
"\n",
"\n",
"# Create our components\n",
"print(\"🏗️ Creating Components...\\n\")\n",
"\n",
"market_handler = SimpleMarketDataHandler(event_queue)\n",
"algorithm = SimpleAlgorithm(event_queue)\n",
"risk_controller = SimpleRiskController(event_queue)\n",
"\n",
"print(\"\\n📊 Current Event Queue Stats:\")\n",
"print(json.dumps(event_queue.get_stats(), indent=2))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 5: Event System in Action\n",
"\n",
"Now let's test our event-driven system by simulating market data updates!"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🎬 DEMO: Event-Driven Trading Bot in Action!\n",
"\n",
"============================================================\n",
"\n",
"📡 Market Update #1:\n",
"----------------------------------------\n",
"📊 market_data_handler: Received price update for BTC/USDT: $43,000.00\n",
"🤖 simple_algorithm: Processing BTC/USDT price: $43,000.00\n",
"📤 Published: Event(market_data from market_data_handler at 19:06:23)\n",
"\n",
"📡 Market Update #2:\n",
"----------------------------------------\n",
"📊 market_data_handler: Received price update for BTC/USDT: $43,450.00\n",
"🤖 simple_algorithm: Processing BTC/USDT price: $43,450.00\n"
]
},
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[5]\u001b[39m\u001b[32m, line 20\u001b[39m\n\u001b[32m 17\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33m\"\u001b[39m\u001b[33m-\u001b[39m\u001b[33m\"\u001b[39m * \u001b[32m40\u001b[39m)\n\u001b[32m 19\u001b[39m \u001b[38;5;66;03m# This will trigger the entire event chain!\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m20\u001b[39m \u001b[43mmarket_handler\u001b[49m\u001b[43m.\u001b[49m\u001b[43msimulate_price_update\u001b[49m\u001b[43m(\u001b[49m\u001b[43msymbol\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mprice\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 22\u001b[39m \u001b[38;5;66;03m# Small delay to make it readable\u001b[39;00m\n\u001b[32m 23\u001b[39m time.sleep(\u001b[32m0.5\u001b[39m)\n",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[4]\u001b[39m\u001b[32m, line 14\u001b[39m, in \u001b[36mSimpleMarketDataHandler.simulate_price_update\u001b[39m\u001b[34m(self, symbol, price)\u001b[39m\n\u001b[32m 12\u001b[39m \u001b[38;5;66;03m# Create and publish market data event\u001b[39;00m\n\u001b[32m 13\u001b[39m event = MarketDataEvent(symbol, price, source=\u001b[38;5;28mself\u001b[39m.name)\n\u001b[32m---> \u001b[39m\u001b[32m14\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mevent_queue\u001b[49m\u001b[43m.\u001b[49m\u001b[43mpublish\u001b[49m\u001b[43m(\u001b[49m\u001b[43mevent\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 32\u001b[39m, in \u001b[36mEventQueue.publish\u001b[39m\u001b[34m(self, event)\u001b[39m\n\u001b[32m 30\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m callback \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mself\u001b[39m.subscribers[event.event_type]:\n\u001b[32m 31\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m---> \u001b[39m\u001b[32m32\u001b[39m \u001b[43mcallback\u001b[49m\u001b[43m(\u001b[49m\u001b[43mevent\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 33\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[32m 34\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[33m❌ Error in callback for \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mevent.event_type\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00me\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m\"\u001b[39m)\n",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[4]\u001b[39m\u001b[32m, line 41\u001b[39m, in \u001b[36mSimpleAlgorithm.on_market_data\u001b[39m\u001b[34m(self, event)\u001b[39m\n\u001b[32m 39\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m price_change > \u001b[32m0.01\u001b[39m: \u001b[38;5;66;03m# 1% increase\u001b[39;00m\n\u001b[32m 40\u001b[39m signal = SignalEvent(symbol, \u001b[33m\"\u001b[39m\u001b[33mBUY\u001b[39m\u001b[33m\"\u001b[39m, \u001b[32m0.7\u001b[39m, source=\u001b[38;5;28mself\u001b[39m.name)\n\u001b[32m---> \u001b[39m\u001b[32m41\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mevent_queue\u001b[49m\u001b[43m.\u001b[49m\u001b[43mpublish\u001b[49m\u001b[43m(\u001b[49m\u001b[43msignal\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 42\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m\"\u001b[39m\u001b[33m 📈 Generated BUY signal (price up \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mprice_change\u001b[38;5;132;01m:\u001b[39;00m\u001b[33m.2%\u001b[39m\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m)\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 44\u001b[39m \u001b[38;5;28;01melif\u001b[39;00m price_change < -\u001b[32m0.01\u001b[39m: \u001b[38;5;66;03m# 1% decrease\u001b[39;00m\n",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 23\u001b[39m, in \u001b[36mEventQueue.publish\u001b[39m\u001b[34m(self, event)\u001b[39m\n\u001b[32m 21\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mpublish\u001b[39m(\u001b[38;5;28mself\u001b[39m, event: Event):\n\u001b[32m 22\u001b[39m \u001b[38;5;250m \u001b[39m\u001b[33;03m\"\"\"Publish an event to all subscribers\"\"\"\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m23\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mlock\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 24\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;66;43;03m# Add to queue\u001b[39;49;00m\n\u001b[32m 25\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mevents\u001b[49m\u001b[43m.\u001b[49m\u001b[43mappend\u001b[49m\u001b[43m(\u001b[49m\u001b[43mevent\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 26\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mevent_history\u001b[49m\u001b[43m.\u001b[49m\u001b[43mappend\u001b[49m\u001b[43m(\u001b[49m\u001b[43mevent\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[31mKeyboardInterrupt\u001b[39m: "
]
}
],
"source": [
"import time\n",
"\n",
"print(\"🎬 DEMO: Event-Driven Trading Bot in Action!\\n\")\n",
"print(\"=\" * 60)\n",
"\n",
"# Simulate market data updates\n",
"test_prices = [\n",
" (\"BTC/USDT\", 43000.00), # Starting price\n",
" (\"BTC/USDT\", 43450.00), # 1.05% increase -> should trigger BUY\n",
" (\"BTC/USDT\", 43200.00), # -0.58% change -> no signal\n",
" (\"BTC/USDT\", 42750.00), # -1.04% decrease -> should trigger SELL\n",
" (\"BTC/USDT\", 43100.00), # 0.82% increase -> no signal\n",
"]\n",
"\n",
"for i, (symbol, price) in enumerate(test_prices, 1):\n",
" print(f\"\\n📡 Market Update #{i}:\")\n",
" print(\"-\" * 40)\n",
" \n",
" # This will trigger the entire event chain!\n",
" market_handler.simulate_price_update(symbol, price)\n",
" \n",
" # Small delay to make it readable\n",
" time.sleep(0.5)\n",
"\n",
"print(\"\\n\" + \"=\" * 60)\n",
"print(\"📈 Final Event Queue Statistics:\")\n",
"print(json.dumps(event_queue.get_stats(), indent=2))\n",
"\n",
"print(f\"\\n📋 Total Events Processed: {len(event_queue.event_history)}\")\n",
"print(\"Event Types:\")\n",
"event_types = {}\n",
"for event in event_queue.event_history:\n",
" event_types[event.event_type] = event_types.get(event.event_type, 0) + 1\n",
"\n",
"for event_type, count in event_types.items():\n",
" print(f\" • {event_type}: {count} events\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 6: Your Turn - Practice Exercise\n",
"\n",
"**Challenge:** Create a simple `PortfolioTracker` component that:\n",
"1. Subscribes to `order` events\n",
"2. Keeps track of positions for each symbol\n",
"3. Publishes `portfolio_update` events when positions change\n",
"\n",
"**Hint:** You'll need to create a `PortfolioEvent` class first!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TODO: Create PortfolioEvent class\n",
"class PortfolioEvent(Event):\n",
" \"\"\"Portfolio update events\"\"\"\n",
" def __init__(self, symbol: str, position: float, avg_price: float, unrealized_pnl: float, source: str = \"portfolio_tracker\"):\n",
" # YOUR CODE HERE\n",
" pass\n",
"\n",
"# TODO: Create PortfolioTracker class\n",
"class PortfolioTracker:\n",
" \"\"\"Tracks portfolio positions and P&L\"\"\"\n",
" \n",
" def __init__(self, event_queue: EventQueue):\n",
" # YOUR CODE HERE\n",
" pass\n",
" \n",
" def on_order(self, event: Event):\n",
" \"\"\"Handle order events and update positions\"\"\"\n",
" # YOUR CODE HERE\n",
" pass\n",
"\n",
"# Test your implementation\n",
"# portfolio_tracker = PortfolioTracker(event_queue)\n",
"\n",
"# # Simulate some orders\n",
"# test_order = OrderEvent(\"BTC/USDT\", \"MARKET\", 0.1, 43000, source=\"test\")\n",
"# event_queue.publish(test_order)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Congratulations!\n",
"\n",
"You've just built the foundation of a professional event-driven trading system! Here's what you accomplished:\n",
"\n",
"✅ **Event System**: Created a robust event queue with publisher-subscriber pattern \n",
"✅ **Trading Events**: Built specific event types for market data, signals, orders, and risk \n",
"✅ **Component Communication**: Made components talk through events (loose coupling) \n",
"✅ **Real Demo**: Saw the entire chain work from market data → algorithm → risk control \n",
"\n",
"## Key Takeaways:\n",
"\n",
"1. **Loose Coupling**: Components don't know about each other - they only know about events\n",
"2. **Scalability**: Easy to add new components by subscribing to events\n",
"3. **Testability**: Each component can be tested independently\n",
"4. **Professional**: This is how real trading firms build their systems\n",
"\n",
"## Next Steps:\n",
"\n",
"In the next notebook, we'll build a complete **Order Management System** that uses this event foundation to handle market, limit, and stop orders professionally!\n",
"\n",
"---\n",
"*\"The best trading bots are not just profitable - they're architecturally sound.\"* 🏗️💰"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,990 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Session 5: Event-Driven Trading Bot\n",
"\n",
"## Notebook 3: Portfolio Management & Risk Controls\n",
"\n",
"**Learning Objectives:**\n",
"- Build a real-time Portfolio Manager (the \"Keeper\")\n",
"- Implement comprehensive Risk Controls\n",
"- Track positions, balances, and P&L in real-time\n",
"- Apply risk management rules (position sizing, daily limits)\n",
"- Integrate portfolio and risk systems with OMS\n",
"\n",
"**Why This Matters:**\n",
"The Portfolio Manager is your \"financial brain\" - it knows exactly what you own, what it's worth, and how much you've made or lost. Risk Controls are your \"safety net\" - they prevent catastrophic losses by enforcing rules. Together, they keep your trading bot profitable and safe.\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Quick Setup: Import Previous Systems\n",
"\n",
"Let's import the event system and OMS from our previous notebooks:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Import our systems from previous notebooks\n",
"from datetime import datetime, timedelta\n",
"from typing import Dict, Any, List, Callable, Optional\n",
"from dataclasses import dataclass, field\n",
"from collections import deque, defaultdict\n",
"from threading import Lock\n",
"from enum import Enum\n",
"import json\n",
"import uuid\n",
"import time\n",
"\n",
"# Event System (from Notebook 1)\n",
"@dataclass\n",
"class Event:\n",
" event_type: str\n",
" data: Dict[str, Any]\n",
" timestamp: datetime = None\n",
" source: str = \"unknown\"\n",
" \n",
" def __post_init__(self):\n",
" if self.timestamp is None:\n",
" self.timestamp = datetime.now()\n",
"\n",
"class EventQueue:\n",
" def __init__(self):\n",
" self.events = deque()\n",
" self.subscribers = {}\n",
" self.lock = Lock()\n",
" self.event_history = []\n",
" \n",
" def subscribe(self, event_type: str, callback: Callable[[Event], None]):\n",
" with self.lock:\n",
" if event_type not in self.subscribers:\n",
" self.subscribers[event_type] = []\n",
" self.subscribers[event_type].append(callback)\n",
" \n",
" def publish(self, event: Event):\n",
" with self.lock:\n",
" self.events.append(event)\n",
" self.event_history.append(event)\n",
" \n",
" if event.event_type in self.subscribers:\n",
" for callback in self.subscribers[event.event_type]:\n",
" try:\n",
" callback(event)\n",
" except Exception as e:\n",
" print(f\"❌ Error in callback for {event.event_type}: {e}\")\n",
"\n",
"# Order System (from Notebook 2) - simplified\n",
"class OrderState(Enum):\n",
" PENDING = \"PENDING\"\n",
" FILLED = \"FILLED\"\n",
" CANCELLED = \"CANCELLED\"\n",
"\n",
"class OrderSide(Enum):\n",
" BUY = \"BUY\"\n",
" SELL = \"SELL\"\n",
"\n",
"@dataclass\n",
"class Order:\n",
" symbol: str\n",
" side: OrderSide\n",
" quantity: float\n",
" price: float\n",
" order_id: str = field(default_factory=lambda: str(uuid.uuid4()))\n",
" state: OrderState = OrderState.PENDING\n",
" filled_quantity: float = 0.0\n",
" average_fill_price: float = 0.0\n",
" fees_paid: float = 0.0\n",
" created_at: datetime = field(default_factory=datetime.now)\n",
"\n",
"# Create our event queue\n",
"event_queue = EventQueue()\n",
"print(\"🏗️ Event system and Order classes ready for Portfolio & Risk integration!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 1: Position and Portfolio Data Structures\n",
"\n",
"Let's create the core data structures to track positions and portfolio state:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dataclass\n",
"class Position:\n",
" \"\"\"Represents a position in a specific symbol\"\"\"\n",
" symbol: str\n",
" quantity: float = 0.0 # Current position size (+ for long, - for short)\n",
" average_price: float = 0.0 # Average cost basis\n",
" market_price: float = 0.0 # Current market price\n",
" total_cost: float = 0.0 # Total amount invested\n",
" realized_pnl: float = 0.0 # Profit/Loss from closed positions\n",
" fees_paid: float = 0.0 # Total fees paid\n",
" last_updated: datetime = field(default_factory=datetime.now)\n",
" \n",
" @property\n",
" def market_value(self) -> float:\n",
" \"\"\"Current market value of position\"\"\"\n",
" return abs(self.quantity) * self.market_price\n",
" \n",
" @property\n",
" def unrealized_pnl(self) -> float:\n",
" \"\"\"Unrealized profit/loss\"\"\"\n",
" if self.quantity == 0:\n",
" return 0.0\n",
" \n",
" if self.quantity > 0: # Long position\n",
" return (self.market_price - self.average_price) * self.quantity\n",
" else: # Short position\n",
" return (self.average_price - self.market_price) * abs(self.quantity)\n",
" \n",
" @property\n",
" def total_pnl(self) -> float:\n",
" \"\"\"Total P&L (realized + unrealized)\"\"\"\n",
" return self.realized_pnl + self.unrealized_pnl\n",
" \n",
" @property\n",
" def pnl_percentage(self) -> float:\n",
" \"\"\"P&L as percentage of invested capital\"\"\"\n",
" if self.total_cost == 0:\n",
" return 0.0\n",
" return (self.total_pnl / self.total_cost) * 100\n",
" \n",
" def update_market_price(self, new_price: float):\n",
" \"\"\"Update current market price\"\"\"\n",
" self.market_price = new_price\n",
" self.last_updated = datetime.now()\n",
" \n",
" def add_trade(self, quantity: float, price: float, fees: float = 0.0):\n",
" \"\"\"Add a trade to this position\"\"\"\n",
" trade_value = quantity * price\n",
" \n",
" if self.quantity == 0:\n",
" # Opening new position\n",
" self.quantity = quantity\n",
" self.average_price = price\n",
" self.total_cost = abs(trade_value)\n",
" else:\n",
" # Modifying existing position\n",
" if (self.quantity > 0 and quantity > 0) or (self.quantity < 0 and quantity < 0):\n",
" # Adding to position\n",
" total_value = (self.quantity * self.average_price) + trade_value\n",
" self.quantity += quantity\n",
" self.average_price = total_value / self.quantity if self.quantity != 0 else 0\n",
" self.total_cost += abs(trade_value)\n",
" else:\n",
" # Reducing or closing position\n",
" if abs(quantity) >= abs(self.quantity):\n",
" # Closing position completely\n",
" self.realized_pnl += (price - self.average_price) * self.quantity\n",
" self.quantity = 0\n",
" self.average_price = 0\n",
" else:\n",
" # Partial close\n",
" close_pnl = (price - self.average_price) * abs(quantity)\n",
" self.realized_pnl += close_pnl\n",
" self.quantity += quantity # quantity is negative for close\n",
" \n",
" self.fees_paid += fees\n",
" self.last_updated = datetime.now()\n",
" \n",
" def to_dict(self) -> Dict[str, Any]:\n",
" return {\n",
" 'symbol': self.symbol,\n",
" 'quantity': self.quantity,\n",
" 'average_price': self.average_price,\n",
" 'market_price': self.market_price,\n",
" 'market_value': self.market_value,\n",
" 'total_cost': self.total_cost,\n",
" 'unrealized_pnl': self.unrealized_pnl,\n",
" 'realized_pnl': self.realized_pnl,\n",
" 'total_pnl': self.total_pnl,\n",
" 'pnl_percentage': f\"{self.pnl_percentage:.2f}%\",\n",
" 'fees_paid': self.fees_paid\n",
" }\n",
" \n",
" def __str__(self):\n",
" direction = \"LONG\" if self.quantity > 0 else \"SHORT\" if self.quantity < 0 else \"FLAT\"\n",
" return f\"{direction} {abs(self.quantity):.4f} {self.symbol} @ ${self.average_price:.2f} (P&L: ${self.total_pnl:.2f})\"\n",
"\n",
"\n",
"# Test Position class\n",
"print(\"🧪 Testing Position class:\\n\")\n",
"\n",
"btc_position = Position(\"BTC/USDT\")\n",
"print(f\"New position: {btc_position}\")\n",
"\n",
"# Add a buy trade\n",
"btc_position.add_trade(0.1, 43000, 4.30) # Buy 0.1 BTC at $43,000\n",
"btc_position.update_market_price(43500) # Market moves up\n",
"print(f\"After buy: {btc_position}\")\n",
"print(f\"Position details: {json.dumps(btc_position.to_dict(), indent=2, default=str)}\")\n",
"\n",
"# Partial sell\n",
"btc_position.add_trade(-0.03, 43600, 1.31) # Sell 0.03 BTC at $43,600\n",
"print(f\"\\nAfter partial sell: {btc_position}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 2: Portfolio Manager (The \"Keeper\")\n",
"\n",
"Now let's build the Portfolio Manager that tracks all positions and overall account health:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class PortfolioManager:\n",
" \"\"\"The Keeper - manages all positions and portfolio state\"\"\"\n",
" \n",
" def __init__(self, event_queue: EventQueue, initial_balance: float = 100000.0):\n",
" self.event_queue = event_queue\n",
" self.name = \"portfolio_manager\"\n",
" \n",
" # Portfolio state\n",
" self.positions: Dict[str, Position] = {} # symbol -> Position\n",
" self.cash_balance = initial_balance\n",
" self.initial_balance = initial_balance\n",
" \n",
" # Portfolio tracking\n",
" self.total_trades = 0\n",
" self.total_fees_paid = 0.0\n",
" self.daily_pnl = 0.0\n",
" self.max_drawdown = 0.0\n",
" self.peak_value = initial_balance\n",
" \n",
" # Market prices (for position valuation)\n",
" self.market_prices: Dict[str, float] = {}\n",
" \n",
" # Subscribe to relevant events\n",
" self.event_queue.subscribe(\"order_fill\", self.on_order_fill)\n",
" self.event_queue.subscribe(\"market_data\", self.on_market_data)\n",
" \n",
" print(f\"💼 {self.name}: Portfolio initialized with ${initial_balance:,.2f}\")\n",
" \n",
" def on_market_data(self, event: Event):\n",
" \"\"\"Update market prices for position valuation\"\"\"\n",
" symbol = event.data[\"symbol\"]\n",
" price = event.data[\"price\"]\n",
" \n",
" self.market_prices[symbol] = price\n",
" \n",
" # Update position market prices\n",
" if symbol in self.positions:\n",
" old_pnl = self.positions[symbol].total_pnl\n",
" self.positions[symbol].update_market_price(price)\n",
" new_pnl = self.positions[symbol].total_pnl\n",
" \n",
" # Update daily P&L\n",
" self.daily_pnl += (new_pnl - old_pnl)\n",
" \n",
" # Check for new peak/drawdown\n",
" current_value = self.total_portfolio_value\n",
" if current_value > self.peak_value:\n",
" self.peak_value = current_value\n",
" else:\n",
" drawdown = (self.peak_value - current_value) / self.peak_value * 100\n",
" self.max_drawdown = max(self.max_drawdown, drawdown)\n",
" \n",
" # Publish portfolio update event\n",
" self._publish_portfolio_update(\"price_update\", f\"{symbol} price updated to ${price:,.2f}\")\n",
" \n",
" def on_order_fill(self, event: Event):\n",
" \"\"\"Process order fills and update positions\"\"\"\n",
" symbol = event.data[\"symbol\"]\n",
" side = event.data[\"side\"]\n",
" fill_quantity = event.data[\"fill_quantity\"]\n",
" fill_price = event.data[\"fill_price\"]\n",
" fees = event.data.get(\"fees\", 0.0)\n",
" \n",
" print(f\"💼 {self.name}: Processing fill - {side} {fill_quantity} {symbol} @ ${fill_price:,.2f}\")\n",
" \n",
" # Calculate trade quantity (negative for sells)\n",
" trade_quantity = fill_quantity if side == \"BUY\" else -fill_quantity\n",
" trade_value = fill_quantity * fill_price\n",
" \n",
" # Update cash balance\n",
" if side == \"BUY\":\n",
" self.cash_balance -= (trade_value + fees)\n",
" else:\n",
" self.cash_balance += (trade_value - fees)\n",
" \n",
" # Update or create position\n",
" if symbol not in self.positions:\n",
" self.positions[symbol] = Position(symbol)\n",
" # Set initial market price if we have it\n",
" if symbol in self.market_prices:\n",
" self.positions[symbol].update_market_price(self.market_prices[symbol])\n",
" \n",
" # Add trade to position\n",
" self.positions[symbol].add_trade(trade_quantity, fill_price, fees)\n",
" \n",
" # Update statistics\n",
" self.total_trades += 1\n",
" self.total_fees_paid += fees\n",
" \n",
" # Publish portfolio update\n",
" message = f\"Position updated: {self.positions[symbol]}\"\n",
" self._publish_portfolio_update(\"trade_executed\", message)\n",
" \n",
" print(f\" 💰 Cash balance: ${self.cash_balance:,.2f}\")\n",
" print(f\" 📊 Position: {self.positions[symbol]}\")\n",
" \n",
" def _publish_portfolio_update(self, update_type: str, message: str):\n",
" \"\"\"Publish portfolio update event\"\"\"\n",
" portfolio_event = Event(\n",
" event_type=\"portfolio_update\",\n",
" data={\n",
" \"update_type\": update_type,\n",
" \"message\": message,\n",
" \"portfolio_value\": self.total_portfolio_value,\n",
" \"cash_balance\": self.cash_balance,\n",
" \"total_pnl\": self.total_pnl,\n",
" \"daily_pnl\": self.daily_pnl,\n",
" \"positions\": {symbol: pos.to_dict() for symbol, pos in self.positions.items()}\n",
" },\n",
" source=self.name\n",
" )\n",
" self.event_queue.publish(portfolio_event)\n",
" \n",
" @property\n",
" def total_portfolio_value(self) -> float:\n",
" \"\"\"Total portfolio value (cash + positions)\"\"\"\n",
" positions_value = sum(pos.market_value for pos in self.positions.values())\n",
" return self.cash_balance + positions_value\n",
" \n",
" @property\n",
" def total_pnl(self) -> float:\n",
" \"\"\"Total profit/loss\"\"\"\n",
" return self.total_portfolio_value - self.initial_balance\n",
" \n",
" @property\n",
" def total_pnl_percentage(self) -> float:\n",
" \"\"\"Total P&L as percentage\"\"\"\n",
" return (self.total_pnl / self.initial_balance) * 100\n",
" \n",
" def get_position(self, symbol: str) -> Optional[Position]:\n",
" \"\"\"Get position for a specific symbol\"\"\"\n",
" return self.positions.get(symbol)\n",
" \n",
" def get_portfolio_summary(self) -> Dict[str, Any]:\n",
" \"\"\"Get comprehensive portfolio summary\"\"\"\n",
" return {\n",
" \"cash_balance\": self.cash_balance,\n",
" \"total_portfolio_value\": self.total_portfolio_value,\n",
" \"total_pnl\": self.total_pnl,\n",
" \"total_pnl_percentage\": f\"{self.total_pnl_percentage:.2f}%\",\n",
" \"daily_pnl\": self.daily_pnl,\n",
" \"max_drawdown\": f\"{self.max_drawdown:.2f}%\",\n",
" \"total_trades\": self.total_trades,\n",
" \"total_fees_paid\": self.total_fees_paid,\n",
" \"open_positions\": len([p for p in self.positions.values() if p.quantity != 0]),\n",
" \"positions\": {symbol: pos.to_dict() for symbol, pos in self.positions.items() if pos.quantity != 0}\n",
" }\n",
"\n",
"# Create portfolio manager\n",
"portfolio = PortfolioManager(event_queue, initial_balance=100000.0)\n",
"print(f\"\\n📊 Initial Portfolio Summary:\")\n",
"print(json.dumps(portfolio.get_portfolio_summary(), indent=2, default=str))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 3: Risk Controller\n",
"\n",
"Now let's build the Risk Controller that enforces trading rules and protects our capital:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class RiskController:\n",
" \"\"\"Risk management system - the safety net for our trading bot\"\"\"\n",
" \n",
" def __init__(self, event_queue: EventQueue, portfolio: PortfolioManager):\n",
" self.event_queue = event_queue\n",
" self.portfolio = portfolio\n",
" self.name = \"risk_controller\"\n",
" \n",
" # Risk limits\n",
" self.max_position_size_pct = 25.0 # Max 25% of portfolio in one position\n",
" self.max_daily_loss_pct = 5.0 # Max 5% daily loss\n",
" self.max_total_drawdown_pct = 15.0 # Max 15% total drawdown\n",
" self.min_cash_reserve_pct = 10.0 # Keep at least 10% in cash\n",
" self.max_leverage = 1.0 # No leverage for now\n",
" \n",
" # Risk tracking\n",
" self.daily_start_value = portfolio.total_portfolio_value\n",
" self.risk_violations = []\n",
" self.blocked_orders = 0\n",
" self.risk_warnings = 0\n",
" \n",
" # Subscribe to events that need risk checking\n",
" self.event_queue.subscribe(\"order_request\", self.on_order_request)\n",
" self.event_queue.subscribe(\"portfolio_update\", self.on_portfolio_update)\n",
" \n",
" print(f\"🛡️ {self.name}: Risk management initialized\")\n",
" print(f\" Max position size: {self.max_position_size_pct}%\")\n",
" print(f\" Max daily loss: {self.max_daily_loss_pct}%\")\n",
" print(f\" Max drawdown: {self.max_total_drawdown_pct}%\")\n",
" \n",
" def on_order_request(self, event: Event):\n",
" \"\"\"Check order requests against risk rules\"\"\"\n",
" order = event.data[\"order\"]\n",
" \n",
" print(f\"🛡️ {self.name}: Evaluating order risk - {order}\")\n",
" \n",
" # Run all risk checks\n",
" risk_checks = [\n",
" self._check_position_size_limit(order),\n",
" self._check_cash_availability(order),\n",
" self._check_daily_loss_limit(),\n",
" self._check_total_drawdown_limit(),\n",
" self._check_minimum_cash_reserve(order)\n",
" ]\n",
" \n",
" # Evaluate results\n",
" violations = [check for check in risk_checks if not check[\"approved\"]]\n",
" warnings = [check for check in risk_checks if check[\"approved\"] and check.get(\"warning\")]\n",
" \n",
" if violations:\n",
" # Block the order\n",
" self.blocked_orders += 1\n",
" violation_messages = [v[\"message\"] for v in violations]\n",
" \n",
" risk_event = Event(\n",
" event_type=\"risk_violation\",\n",
" data={\n",
" \"order_id\": order.order_id,\n",
" \"violations\": violation_messages,\n",
" \"severity\": \"CRITICAL\",\n",
" \"action\": \"ORDER_BLOCKED\"\n",
" },\n",
" source=self.name\n",
" )\n",
" self.event_queue.publish(risk_event)\n",
" \n",
" print(f\" ❌ ORDER BLOCKED - Risk violations:\")\n",
" for msg in violation_messages:\n",
" print(f\" • {msg}\")\n",
" \n",
" self.risk_violations.extend(violation_messages)\n",
" \n",
" elif warnings:\n",
" # Allow order but issue warnings\n",
" self.risk_warnings += len(warnings)\n",
" warning_messages = [w[\"message\"] for w in warnings]\n",
" \n",
" risk_event = Event(\n",
" event_type=\"risk_warning\",\n",
" data={\n",
" \"order_id\": order.order_id,\n",
" \"warnings\": warning_messages,\n",
" \"severity\": \"WARNING\",\n",
" \"action\": \"ORDER_APPROVED_WITH_WARNINGS\"\n",
" },\n",
" source=self.name\n",
" )\n",
" self.event_queue.publish(risk_event)\n",
" \n",
" print(f\" ⚠️ ORDER APPROVED (with warnings):\")\n",
" for msg in warning_messages:\n",
" print(f\" • {msg}\")\n",
" \n",
" else:\n",
" # Approve order\n",
" risk_event = Event(\n",
" event_type=\"risk_approved\",\n",
" data={\n",
" \"order_id\": order.order_id,\n",
" \"severity\": \"INFO\",\n",
" \"action\": \"ORDER_APPROVED\"\n",
" },\n",
" source=self.name\n",
" )\n",
" self.event_queue.publish(risk_event)\n",
" \n",
" print(f\" ✅ ORDER APPROVED - All risk checks passed\")\n",
" \n",
" def on_portfolio_update(self, event: Event):\n",
" \"\"\"Monitor portfolio for ongoing risk issues\"\"\"\n",
" update_type = event.data[\"update_type\"]\n",
" \n",
" if update_type == \"price_update\":\n",
" # Check if we're approaching risk limits\n",
" current_drawdown = self.portfolio.max_drawdown\n",
" daily_loss_pct = self._calculate_daily_loss_percentage()\n",
" \n",
" if current_drawdown > self.max_total_drawdown_pct * 0.8: # 80% of limit\n",
" print(f\"⚠️ {self.name}: Approaching drawdown limit ({current_drawdown:.1f}% / {self.max_total_drawdown_pct}%)\")\n",
" \n",
" if daily_loss_pct > self.max_daily_loss_pct * 0.8: # 80% of limit\n",
" print(f\"⚠️ {self.name}: Approaching daily loss limit ({daily_loss_pct:.1f}% / {self.max_daily_loss_pct}%)\")\n",
" \n",
" def _check_position_size_limit(self, order: Order) -> Dict[str, Any]:\n",
" \"\"\"Check if order would exceed position size limits\"\"\"\n",
" symbol = order.symbol\n",
" order_value = order.quantity * order.price\n",
" portfolio_value = self.portfolio.total_portfolio_value\n",
" \n",
" # Calculate position value after this order\n",
" current_position = self.portfolio.get_position(symbol)\n",
" current_value = current_position.market_value if current_position else 0\n",
" \n",
" if order.side == OrderSide.BUY:\n",
" new_position_value = current_value + order_value\n",
" else:\n",
" new_position_value = max(0, current_value - order_value)\n",
" \n",
" position_pct = (new_position_value / portfolio_value) * 100\n",
" \n",
" if position_pct > self.max_position_size_pct:\n",
" return {\n",
" \"approved\": False,\n",
" \"message\": f\"Position size limit exceeded: {position_pct:.1f}% > {self.max_position_size_pct}%\"\n",
" }\n",
" elif position_pct > self.max_position_size_pct * 0.8:\n",
" return {\n",
" \"approved\": True,\n",
" \"warning\": True,\n",
" \"message\": f\"Large position warning: {position_pct:.1f}% of portfolio\"\n",
" }\n",
" \n",
" return {\"approved\": True, \"message\": f\"Position size OK: {position_pct:.1f}%\"}\n",
" \n",
" def _check_cash_availability(self, order: Order) -> Dict[str, Any]:\n",
" \"\"\"Check if we have enough cash for the order\"\"\"\n",
" if order.side == OrderSide.SELL:\n",
" return {\"approved\": True, \"message\": \"Sell order - no cash required\"}\n",
" \n",
" order_value = order.quantity * order.price\n",
" estimated_fees = order_value * 0.001 # Estimate 0.1% fees\n",
" total_needed = order_value + estimated_fees\n",
" \n",
" if total_needed > self.portfolio.cash_balance:\n",
" return {\n",
" \"approved\": False,\n",
" \"message\": f\"Insufficient cash: need ${total_needed:,.2f}, have ${self.portfolio.cash_balance:,.2f}\"\n",
" }\n",
" \n",
" return {\"approved\": True, \"message\": \"Cash availability OK\"}\n",
" \n",
" def _check_daily_loss_limit(self) -> Dict[str, Any]:\n",
" \"\"\"Check daily loss limits\"\"\"\n",
" daily_loss_pct = self._calculate_daily_loss_percentage()\n",
" \n",
" if daily_loss_pct > self.max_daily_loss_pct:\n",
" return {\n",
" \"approved\": False,\n",
" \"message\": f\"Daily loss limit exceeded: {daily_loss_pct:.1f}% > {self.max_daily_loss_pct}%\"\n",
" }\n",
" elif daily_loss_pct > self.max_daily_loss_pct * 0.8:\n",
" return {\n",
" \"approved\": True,\n",
" \"warning\": True,\n",
" \"message\": f\"Approaching daily loss limit: {daily_loss_pct:.1f}%\"\n",
" }\n",
" \n",
" return {\"approved\": True, \"message\": \"Daily loss within limits\"}\n",
" \n",
" def _check_total_drawdown_limit(self) -> Dict[str, Any]:\n",
" \"\"\"Check total drawdown limits\"\"\"\n",
" current_drawdown = self.portfolio.max_drawdown\n",
" \n",
" if current_drawdown > self.max_total_drawdown_pct:\n",
" return {\n",
" \"approved\": False,\n",
" \"message\": f\"Maximum drawdown exceeded: {current_drawdown:.1f}% > {self.max_total_drawdown_pct}%\"\n",
" }\n",
" \n",
" return {\"approved\": True, \"message\": \"Drawdown within limits\"}\n",
" \n",
" def _check_minimum_cash_reserve(self, order: Order) -> Dict[str, Any]:\n",
" \"\"\"Check minimum cash reserve requirements\"\"\"\n",
" if order.side == OrderSide.SELL:\n",
" return {\"approved\": True, \"message\": \"Sell order - increases cash\"}\n",
" \n",
" order_value = order.quantity * order.price\n",
" remaining_cash = self.portfolio.cash_balance - order_value\n",
" portfolio_value = self.portfolio.total_portfolio_value\n",
" cash_pct = (remaining_cash / portfolio_value) * 100\n",
" \n",
" if cash_pct < self.min_cash_reserve_pct:\n",
" return {\n",
" \"approved\": False,\n",
" \"message\": f\"Minimum cash reserve violated: {cash_pct:.1f}% < {self.min_cash_reserve_pct}%\"\n",
" }\n",
" \n",
" return {\"approved\": True, \"message\": \"Cash reserve OK\"}\n",
" \n",
" def _calculate_daily_loss_percentage(self) -> float:\n",
" \"\"\"Calculate daily loss as percentage\"\"\"\n",
" current_value = self.portfolio.total_portfolio_value\n",
" daily_change = current_value - self.daily_start_value\n",
" \n",
" if daily_change >= 0:\n",
" return 0.0 # No loss\n",
" \n",
" return abs(daily_change / self.daily_start_value) * 100\n",
" \n",
" def get_risk_summary(self) -> Dict[str, Any]:\n",
" \"\"\"Get risk management summary\"\"\"\n",
" return {\n",
" \"risk_limits\": {\n",
" \"max_position_size_pct\": self.max_position_size_pct,\n",
" \"max_daily_loss_pct\": self.max_daily_loss_pct,\n",
" \"max_total_drawdown_pct\": self.max_total_drawdown_pct,\n",
" \"min_cash_reserve_pct\": self.min_cash_reserve_pct\n",
" },\n",
" \"current_status\": {\n",
" \"daily_loss_pct\": self._calculate_daily_loss_percentage(),\n",
" \"total_drawdown_pct\": self.portfolio.max_drawdown,\n",
" \"cash_reserve_pct\": (self.portfolio.cash_balance / self.portfolio.total_portfolio_value) * 100\n",
" },\n",
" \"statistics\": {\n",
" \"blocked_orders\": self.blocked_orders,\n",
" \"risk_warnings\": self.risk_warnings,\n",
" \"total_violations\": len(self.risk_violations)\n",
" }\n",
" }\n",
"\n",
"# Create risk controller\n",
"risk_controller = RiskController(event_queue, portfolio)\n",
"print(f\"\\n🛡 Risk Controller Summary:\")\n",
"print(json.dumps(risk_controller.get_risk_summary(), indent=2, default=str))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 4: Market Data Simulator\n",
"\n",
"Let's create a simple market data simulator to test our portfolio and risk systems:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class MarketDataSimulator:\n",
" \"\"\"Simple market data simulator for testing\"\"\"\n",
" \n",
" def __init__(self, event_queue: EventQueue):\n",
" self.event_queue = event_queue\n",
" self.name = \"market_simulator\"\n",
" \n",
" # Initial prices\n",
" self.prices = {\n",
" \"BTC/USDT\": 43000.0,\n",
" \"ETH/USDT\": 2650.0,\n",
" \"BNB/USDT\": 310.0\n",
" }\n",
" \n",
" def update_price(self, symbol: str, new_price: float):\n",
" \"\"\"Update price and publish market data event\"\"\"\n",
" old_price = self.prices.get(symbol, 0)\n",
" self.prices[symbol] = new_price\n",
" change_pct = ((new_price - old_price) / old_price * 100) if old_price > 0 else 0\n",
" \n",
" market_event = Event(\n",
" event_type=\"market_data\",\n",
" data={\n",
" \"symbol\": symbol,\n",
" \"price\": new_price,\n",
" \"volume\": 100.0, # Mock volume\n",
" \"change_pct\": change_pct\n",
" },\n",
" source=self.name\n",
" )\n",
" \n",
" self.event_queue.publish(market_event)\n",
" print(f\"📈 {symbol}: ${old_price:,.2f} → ${new_price:,.2f} ({change_pct:+.2f}%)\")\n",
" \n",
" def simulate_order_fill(self, symbol: str, side: str, quantity: float, price: float, fees: float = None):\n",
" \"\"\"Simulate an order fill\"\"\"\n",
" if fees is None:\n",
" fees = quantity * price * 0.001 # 0.1% default fee\n",
" \n",
" fill_event = Event(\n",
" event_type=\"order_fill\",\n",
" data={\n",
" \"order_id\": str(uuid.uuid4()),\n",
" \"symbol\": symbol,\n",
" \"side\": side,\n",
" \"fill_quantity\": quantity,\n",
" \"fill_price\": price,\n",
" \"fees\": fees\n",
" },\n",
" source=self.name\n",
" )\n",
" \n",
" self.event_queue.publish(fill_event)\n",
"\n",
"# Create market simulator\n",
"market_sim = MarketDataSimulator(event_queue)\n",
"print(f\"📊 Market simulator ready with initial prices: {market_sim.prices}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 5: Complete Demo - Portfolio & Risk in Action\n",
"\n",
"Let's test our complete system with various trading scenarios!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"🎬 DEMO: Portfolio & Risk Management System\\n\")\n",
"print(\"=\" * 70)\n",
"\n",
"# Initial state\n",
"print(\"\\n📊 Initial State:\")\n",
"print(f\"Portfolio Value: ${portfolio.total_portfolio_value:,.2f}\")\n",
"print(f\"Cash Balance: ${portfolio.cash_balance:,.2f}\")\n",
"\n",
"# Test 1: Normal trade (should pass all risk checks)\n",
"print(\"\\n📊 Test 1: Normal BTC Purchase\")\n",
"print(\"-\" * 40)\n",
"btc_order = Order(\"BTC/USDT\", OrderSide.BUY, 0.5, 43000) # $21,500 order (21.5% of portfolio)\n",
"request_event = Event(\n",
" event_type=\"order_request\",\n",
" data={\"order\": btc_order},\n",
" source=\"demo\"\n",
")\n",
"event_queue.publish(request_event)\n",
"\n",
"# Simulate the fill\n",
"time.sleep(0.5)\n",
"market_sim.simulate_order_fill(\"BTC/USDT\", \"BUY\", 0.5, 43000)\n",
"\n",
"time.sleep(0.5)\n",
"\n",
"# Test 2: Price update (profit scenario)\n",
"print(\"\\n📊 Test 2: BTC Price Increase\")\n",
"print(\"-\" * 30)\n",
"market_sim.update_price(\"BTC/USDT\", 44500) # +3.49% increase\n",
"\n",
"time.sleep(0.5)\n",
"\n",
"# Test 3: Large order (should trigger position size warning)\n",
"print(\"\\n📊 Test 3: Large ETH Order (Position Size Warning)\")\n",
"print(\"-\" * 50)\n",
"eth_order = Order(\"ETH/USDT\", OrderSide.BUY, 8.0, 2650) # $21,200 order\n",
"request_event = Event(\n",
" event_type=\"order_request\",\n",
" data={\"order\": eth_order},\n",
" source=\"demo\"\n",
")\n",
"event_queue.publish(request_event)\n",
"\n",
"time.sleep(0.5)\n",
"market_sim.simulate_order_fill(\"ETH/USDT\", \"BUY\", 8.0, 2650)\n",
"\n",
"time.sleep(0.5)\n",
"\n",
"# Test 4: Excessive order (should be blocked)\n",
"print(\"\\n📊 Test 4: Excessive Order (Should be BLOCKED)\")\n",
"print(\"-\" * 45)\n",
"huge_order = Order(\"BTC/USDT\", OrderSide.BUY, 1.0, 44500) # $44,500 order (too large)\n",
"request_event = Event(\n",
" event_type=\"order_request\",\n",
" data={\"order\": huge_order},\n",
" source=\"demo\"\n",
")\n",
"event_queue.publish(request_event)\n",
"\n",
"time.sleep(0.5)\n",
"\n",
"# Test 5: Market crash (trigger risk warnings)\n",
"print(\"\\n📊 Test 5: Market Crash Simulation\")\n",
"print(\"-\" * 35)\n",
"market_sim.update_price(\"BTC/USDT\", 39000) # -12.4% crash\n",
"market_sim.update_price(\"ETH/USDT\", 2300) # -13.2% crash\n",
"\n",
"time.sleep(0.5)\n",
"\n",
"# Test 6: Try to trade during high losses (should be blocked)\n",
"print(\"\\n📊 Test 6: Order During High Losses (Should be BLOCKED)\")\n",
"print(\"-\" * 55)\n",
"risky_order = Order(\"BNB/USDT\", OrderSide.BUY, 50, 310) # $15,500 order\n",
"request_event = Event(\n",
" event_type=\"order_request\",\n",
" data={\"order\": risky_order},\n",
" source=\"demo\"\n",
")\n",
"event_queue.publish(request_event)\n",
"\n",
"time.sleep(0.5)\n",
"\n",
"print(\"\\n\" + \"=\" * 70)\n",
"print(\"📈 Final Portfolio Summary:\")\n",
"print(json.dumps(portfolio.get_portfolio_summary(), indent=2, default=str))\n",
"\n",
"print(\"\\n🛡 Final Risk Summary:\")\n",
"print(json.dumps(risk_controller.get_risk_summary(), indent=2, default=str))\n",
"\n",
"print(f\"\\n📊 Total Events Processed: {len(event_queue.event_history)}\")\n",
"print(\"\\n🎯 Key Achievements:\")\n",
"print(f\" • Tracked {len(portfolio.positions)} positions in real-time\")\n",
"print(f\" • Processed {portfolio.total_trades} trades\")\n",
"print(f\" • Blocked {risk_controller.blocked_orders} risky orders\")\n",
"print(f\" • Issued {risk_controller.risk_warnings} risk warnings\")\n",
"print(f\" • Current P&L: ${portfolio.total_pnl:,.2f} ({portfolio.total_pnl_percentage:+.2f}%)\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 6: Your Turn - Advanced Portfolio Features\n",
"\n",
"**Challenge:** Extend the system with advanced portfolio and risk features!\n",
"\n",
"Choose one or more of these challenges:\n",
"\n",
"1. **Performance Metrics**: Add Sharpe ratio, Sortino ratio, maximum drawdown duration\n",
"2. **Advanced Risk Controls**: Position correlation limits, sector exposure limits\n",
"3. **Dynamic Risk Adjustment**: Adjust risk limits based on market volatility\n",
"4. **Portfolio Rebalancing**: Automatic rebalancing when positions drift too far\n",
"5. **Risk Reports**: Generate detailed risk reports with recommendations"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Challenge 1: Performance Metrics\n",
"def calculate_sharpe_ratio(portfolio: PortfolioManager, risk_free_rate: float = 0.02) -> float:\n",
" \"\"\"Calculate Sharpe ratio for the portfolio\"\"\"\n",
" # YOUR CODE HERE\n",
" # Hint: You'll need to track daily returns over time\n",
" pass\n",
"\n",
"# Challenge 2: Correlation-based Risk Control\n",
"def check_position_correlation(portfolio: PortfolioManager, new_symbol: str, max_correlation: float = 0.7) -> bool:\n",
" \"\"\"Check if new position would create excessive correlation risk\"\"\"\n",
" # YOUR CODE HERE\n",
" # Hint: You could use simple sector groupings (BTC/ETH = crypto, etc.)\n",
" pass\n",
"\n",
"# Challenge 3: Dynamic Risk Adjustment\n",
"def adjust_risk_limits_for_volatility(risk_controller: RiskController, market_volatility: float):\n",
" \"\"\"Adjust risk limits based on market conditions\"\"\"\n",
" # YOUR CODE HERE\n",
" # Hint: Reduce position limits when volatility is high\n",
" pass\n",
"\n",
"# Challenge 4: Portfolio Rebalancing\n",
"def suggest_rebalancing_trades(portfolio: PortfolioManager, target_weights: Dict[str, float]) -> List[Order]:\n",
" \"\"\"Suggest trades to rebalance portfolio to target weights\"\"\"\n",
" # YOUR CODE HERE\n",
" pass\n",
"\n",
"# Challenge 5: Risk Report Generation\n",
"def generate_risk_report(portfolio: PortfolioManager, risk_controller: RiskController) -> Dict[str, Any]:\n",
" \"\"\"Generate comprehensive risk report\"\"\"\n",
" # YOUR CODE HERE\n",
" pass\n",
"\n",
"# Test your implementations here\n",
"print(\"🧪 Test your advanced features here!\")\n",
"\n",
"# Example tests:\n",
"# sharpe = calculate_sharpe_ratio(portfolio)\n",
"# rebalance_trades = suggest_rebalancing_trades(portfolio, {\"BTC/USDT\": 0.6, \"ETH/USDT\": 0.3, \"CASH\": 0.1})\n",
"# risk_report = generate_risk_report(portfolio, risk_controller)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Congratulations!\n",
"\n",
"You've built a professional-grade Portfolio Management and Risk Control system! Here's what you accomplished:\n",
"\n",
"✅ **Real-time Portfolio Tracking**: Positions, P&L, balances updated instantly \n",
"✅ **Comprehensive Risk Controls**: Position size, daily loss, drawdown, cash reserve limits \n",
"✅ **Event-driven Integration**: Seamless communication with order management system \n",
"✅ **Professional Risk Management**: Order blocking, warnings, violation tracking \n",
"✅ **Market Simulation**: Realistic testing environment with price movements \n",
"✅ **Complete Statistics**: Detailed reporting and monitoring capabilities \n",
"\n",
"## Key Professional Features:\n",
"\n",
"1. **Position Management**: Real-time tracking with average cost basis and P&L calculation\n",
"2. **Risk Controls**: Multi-layered protection against excessive losses\n",
"3. **Event Integration**: Loose coupling through event system\n",
"4. **Real-time Updates**: Instant portfolio updates as market prices change\n",
"5. **Comprehensive Monitoring**: Statistics, warnings, and violation tracking\n",
"\n",
"## Next Steps:\n",
"\n",
"In the next notebook, we'll add the **Market Data Handler** and **Algorithm Engine** to complete our MARKET architecture and create a fully functional trading bot!\n",
"\n",
"---\n",
"*\"Risk management is not about avoiding risk - it's about taking the right risks at the right size.\"* 🛡️💰"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,226 @@
import ccxt
import pandas as pd
import time
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
symbol = 'APEUSDT'
pause_time = 60
vol_repeat = 11
vol_time = 5
pos_size = 100
params = {'timeInForce': 'GTC'}
target = 35
max_loss = -55
vol_decimal = 0.4
timeframe = '4h'
limit = 100
sma = 20
def ask_bid(symbol=symbol):
orderbook = exchange.fetch_order_book(symbol)
bid = orderbook['bids'][0][0]
ask = orderbook['asks'][0][0]
print(f'Ask price for {symbol}: {ask}')
return ask, bid
def df_sma(symbol=symbol, timeframe=timeframe, limit=limit, sma=sma):
print('Starting technical indicators...')
bars = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
df = pd.DataFrame(bars, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df[f'sma{sma}_{timeframe}'] = df['close'].rolling(sma).mean()
_, bid = ask_bid(symbol)
df.loc[df[f'sma{sma}_{timeframe}'] > bid, 'sig'] = 'SELL'
df.loc[df[f'sma{sma}_{timeframe}'] < bid, 'sig'] = 'BUY'
df['support'] = df[:-2]['close'].min()
df['resistance'] = df[:-2]['close'].max()
df['prev_close'] = df['close'].shift(1)
df['bullish_confirmation'] = df['close'] > df['prev_close']
return df
def open_positions(symbol=symbol):
positions = exchange.fetch_positions([symbol])
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
side = pos['side']
size = pos['contracts']
is_long = side == 'long'
print(f'Open position: {side} {size} contracts')
return positions, True, size, is_long, 0, {}
return [], False, 0, None, 0, {}
def kill_switch(symbol=symbol):
print(f'Starting kill switch for {symbol}')
while True:
_, has_position, position_size, is_long, _, _ = open_positions(symbol)
if not has_position:
break
print(f'Closing position: {position_size} contracts, long: {is_long}')
exchange.cancel_all_orders(symbol)
ask, bid = ask_bid(symbol)
position_size = float(position_size)
if is_long:
exchange.create_limit_sell_order(symbol, position_size, ask, params)
print(f'SELL to close: {position_size} {symbol} at {ask}')
else:
exchange.create_limit_buy_order(symbol, position_size, bid, params)
print(f'BUY to close: {position_size} {symbol} at {bid}')
print('Waiting 30 seconds for fill...')
time.sleep(30)
def sleep_on_close(symbol=symbol, pause_time=pause_time):
closed_orders = exchange.fetch_closed_orders(symbol)
for order in reversed(closed_orders):
if order['status'] == 'closed':
order_time = int(order['timestamp'] / 1000)
current_time = int(exchange.fetch_order_book(symbol)['timestamp'] / 1000)
time_since_trade = (current_time - order_time) / 60
print(f'Last trade was {time_since_trade:.1f} minutes ago')
if time_since_trade < pause_time - 1:
print(f'Sleeping 60 seconds (recent trade)')
time.sleep(60)
else:
print('No sleep needed')
break
print(f'Sleep check complete for {symbol}')
def ob(symbol=symbol, vol_repeat=vol_repeat, vol_time=vol_time):
print(f'Analyzing order book volume for {symbol}...')
volume_data = []
for i in range(vol_repeat):
orderbook = exchange.fetch_order_book(symbol)
bid_volume = sum([bid[1] for bid in orderbook['bids']])
ask_volume = sum([ask[1] for ask in orderbook['asks']])
volume_data.append({'bid_vol': bid_volume, 'ask_vol': ask_volume})
if i < vol_repeat - 1:
time.sleep(vol_time)
print(f'Sample {i+1}: Bid Vol: {bid_volume}, Ask Vol: {ask_volume}')
df = pd.DataFrame(volume_data)
total_bid_vol = df['bid_vol'].sum()
total_ask_vol = df['ask_vol'].sum()
minutes = (vol_time * vol_repeat) / 60
print(f'Total volume over {minutes:.1f} minutes:')
print(f'Bid: {total_bid_vol}, Ask: {total_ask_vol}')
if total_bid_vol > total_ask_vol:
control_ratio = total_ask_vol / total_bid_vol
print(f'Bulls in control: {control_ratio:.3f}')
else:
control_ratio = total_bid_vol / total_ask_vol
print(f'Bears in control: {control_ratio:.3f}')
_, has_position, _, is_long, _, _ = open_positions(symbol)
if has_position:
volume_under_threshold = control_ratio < vol_decimal
position_type = 'long' if is_long else 'short'
print(f'In {position_type} position. Volume under threshold: {volume_under_threshold}')
return volume_under_threshold
else:
print('Not in position')
return None
def pnl_close(symbol=symbol, target=target, max_loss=max_loss):
print(f'Checking PnL for {symbol}...')
positions = exchange.fetch_positions([symbol])
position = None
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
position = pos
break
if position is None:
print('No position open')
return False, False, 0, None
side = position['side']
size = position['contracts']
entry_price = float(position['entryPrice'])
leverage = float(position['leverage'])
is_long = side == 'long'
_, current_price = ask_bid(symbol)
if is_long:
price_diff = current_price - entry_price
else:
price_diff = entry_price - current_price
try:
pnl_percent = ((price_diff / entry_price) * leverage) * 100
except:
pnl_percent = 0
print(f'PnL: {pnl_percent:.2f}% (Entry: {entry_price}, Current: {current_price})')
if pnl_percent > target:
print(f'Target hit! Checking volume before closing...')
volume_under_threshold = ob(symbol)
if volume_under_threshold:
print(f'Volume too low, waiting 30s...')
time.sleep(30)
else:
print(f'Closing profitable position!')
kill_switch(symbol)
return True, True, size, is_long
elif pnl_percent <= max_loss:
print(f'Max loss hit! Closing position immediately.')
kill_switch(symbol)
return True, True, size, is_long
elif pnl_percent > 0:
print(f'In profit but target not reached')
else:
print(f'In loss but within acceptable range')
# Stop loss check using 15m SMA
if True: # Position exists
df_15m = df_sma(symbol, '15m', 100, 20)
sma_15m = df_15m.iloc[-1][f'sma{sma}_15m']
stop_loss_level = sma_15m * 1.008
print(f'SMA stop loss level: {stop_loss_level}')
print(f'PnL check complete for {symbol}')
return False, True, size, is_long

View file

@ -0,0 +1,144 @@
import pandas as pd
from datetime import datetime, time
from pytz import timezone
from time import sleep
import math
def in_timeframe():
now = datetime.now(timezone('US/Eastern')).time()
day = datetime.today().weekday()
return (now >= time(9,30) and now < time(16)) and (day < 5 and day >= 0)
def get_position(exchange, symbol):
positions = exchange.fetch_positions([symbol])
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
position_info = pos
in_position = True
long = pos['side'] == 'long'
return position_info, in_position, long
return {}, False, None
def get_candle_df(exchange, symbol, timeframe, limit=55):
ohlcv = pd.DataFrame(
exchange.fetch_ohlcv(symbol, timeframe, limit=limit),
columns=['time', 'open', 'high', 'low', 'close', 'volume']
).set_index('time')
return ohlcv
def calc_tr(df):
df['True_Range'] = df.ta.atr()
return df['True_Range'].iloc[-1]
def calc_atr(df, length=None):
df['ATR'] = df.ta.atr(length=length)
def calc_sup_res(df, length=20):
df['support'] = df['low'].rolling(window=length).min()
df['resistance'] = df['high'].rolling(window=length).max()
return df['support'].iloc[-1], df['resistance'].iloc[-1]
def hit_target(price, tp, sl, long: bool):
if long:
if price >= tp:
print('TAKE PROFIT REACHED, CLOSING POSITION')
return True
elif price <= sl:
print('STOP LOSS REACHED, CLOSING POSITION')
return True
else:
if price <= tp:
print('TAKE PROFIT REACHED, CLOSING POSITION')
return True
elif price >= sl:
print('STOP LOSS REACHED, CLOSING POSITION')
return True
return False
def close_position(exchange, symbol):
exchange.cancel_all_orders(symbol)
position, in_position, long = get_position(exchange, symbol)
while in_position:
ticker = exchange.fetch_ticker(symbol)
if long:
bid = ticker['bid']
exchange.create_limit_sell_order(
symbol,
position['contracts'],
bid,
{'timeInForce': 'GTC', 'reduceOnly': True}
)
print(f'SELL to close: {position["contracts"]} {symbol} at {bid}')
else:
ask = ticker['ask']
exchange.create_limit_buy_order(
symbol,
position['contracts'],
ask,
{'timeInForce': 'GTC', 'reduceOnly': True}
)
print(f'BUY to close: {position["contracts"]} {symbol} at {ask}')
sleep(30)
position, in_position, long = get_position(exchange, symbol)
exchange.cancel_all_orders(symbol)
sleep(60)
def end_of_trading_week():
now = datetime.now(timezone('US/Eastern')).time()
day = datetime.today().weekday()
return day == 4 and now >= time(16) and now < time(16,1)
def get_extreme_of_consolidation(df, percent):
for index, row in df.iloc[::-1].iterrows():
if (row['True_Range'] / row['close']) * 100 > percent:
consolidation_data = df[df.index > index]
return consolidation_data.low.min(), consolidation_data.high.max()
return df.low.min(), df.high.max()
def calc_stoch_rsi(df, lookback=14):
rsi = df.ta.rsi(length=lookback)
min_rsi = rsi.tail(lookback).min()
max_rsi = rsi.tail(lookback).max()
df['stoch_rsi'] = (rsi.iloc[-1] - min_rsi) / (max_rsi - min_rsi)
def calc_nadarya(df, bandwidth=8, source='close'):
src = df[source]
out = []
for i, v1 in src.items():
tsum = 0
sumw = 0
for j, v2 in src.items():
w = math.exp(-(math.pow(i-j, 2) / (bandwidth * bandwidth * 2)))
tsum += v2 * w
sumw += w
out.append(tsum / sumw)
df['nadarya'] = out
d = df['nadarya'].rolling(window=2).apply(lambda x: x.iloc[1] - x.iloc[0])
df['nadarya_buy'] = (d > 0) & (d.shift(1) < 0)
df['nadarya_sell'] = (d < 0) & (d.shift(1) > 0)
return df['nadarya_buy'].iloc[-1], df['nadarya_sell'].iloc[-1]
def is_oversold(rsi, window=14, times=1, target=10):
rsi_window = rsi.tail(window)
rsi_crossed = [
v for ind, v in enumerate(rsi_window.values)
if v <= target and rsi_window.values[ind-1] >= target and ind > 0
]
return len(rsi_crossed) >= times
def is_overbought(rsi, window=14, times=1, target=90):
rsi_window = rsi.tail(window)
rsi_crossed = [
v for ind, v in enumerate(rsi_window.values)
if v >= target and rsi_window.values[ind-1] <= target and ind > 0
]
return len(rsi_crossed) >= times

View file

2
Session_05/bots/.env Normal file
View file

@ -0,0 +1,2 @@
BINANCE_API_KEY = "isV8Eo73qCBkrySBWXg4OcHlqp3BxVH9K6p5K5Qv63Bk9DLkwuUNRmSfh6Fq9YCH"
BINANCE_SECRET_KEY = "LB6T8AEuD210xM6Cx3MuOckDbZb4iLu5JiXhrJiuxOOPy9Q1hf0vVjsnlNbvtXO3"

174
Session_05/bots/Breakout.py Normal file
View file

@ -0,0 +1,174 @@
'''
Breakout Trading Strategy Overview:
Support/Resistance Calculation:
- Uses 3 days of 15-minute data (289 periods) for range analysis
- Calculates support (lowest close) and resistance (highest close) levels
- Establishes key levels based on recent price action
Breakout Detection:
- BULLISH breakout: Current bid > recent resistance level
- BEARISH breakdown: Current bid < recent support level
- Confirms momentum before placing trades
Entry Strategy:
- LONG: Place buy order 0.1% above broken resistance (retest entry)
- SHORT: Place sell order 0.1% below broken support (retest entry)
- Waits for price to break key levels before entering
Retest Logic:
- Enters on pullbacks to previously broken levels
- Resistance becomes support after bullish breakout
- Support becomes resistance after bearish breakdown
- Higher probability entries on retests vs initial breakouts
Risk Management:
- Profit target: 9% gain
- Stop loss: 8% loss
- Position size management with limits
- Trade pause system to avoid overtrading
Technical Setup:
- Timeframe: 15 minutes
- Lookback: 3 days (289 periods)
- Uses close-based support/resistance (more reliable than wicks)
- Breakout confirmation with 0.1% threshold for noise filtering
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt
import time, schedule
import Functions.funcs as func
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
# Configuration
symbol = 'BTCUSDT'
pos_size = 30
target = 9
max_loss = -8
pause_time = 10
vol_repeat = 11
vol_time = 5
vol_decimal = 0.4
params = {'timeInForce': 'GTC'}
def get_support_resistance(symbol):
"""Calculate support and resistance from 3 days of 15m data"""
df = func.df_sma(symbol, '15m', 289, 20) # 3 days of 15m data
support = df['close'].min()
resistance = df['close'].max()
# Also get recent support/resistance for comparison
recent_support = df['support'].iloc[-1]
recent_resistance = df['resis'].iloc[-1]
print(f'Levels - Support: {support:.2f} | Resistance: {resistance:.2f}')
print(f'Recent - Support: {recent_support:.2f} | Resistance: {recent_resistance:.2f}')
return support, resistance, recent_support, recent_resistance, df
def detect_breakout(symbol):
"""Detect if price is breaking above resistance or below support"""
support, resistance, recent_support, recent_resistance, df = get_support_resistance(symbol)
ask, bid = func.ask_bid(symbol)
buy_breakout = False
sell_breakdown = False
breakout_price = False
breakdown_price = False
# Check for breakout above recent resistance
if bid > recent_resistance:
print(f'BULLISH BREAKOUT detected - Bid {bid:.2f} > Resistance {recent_resistance:.2f}')
buy_breakout = True
breakout_price = recent_resistance * 1.001 # 0.1% above broken resistance
# Check for breakdown below recent support
elif bid < recent_support:
print(f'BEARISH BREAKDOWN detected - Bid {bid:.2f} < Support {recent_support:.2f}')
sell_breakdown = True
breakdown_price = recent_support * 0.999 # 0.1% below broken support
else:
print(f'Price {bid:.2f} within range [{recent_support:.2f} - {recent_resistance:.2f}]')
return buy_breakout, sell_breakdown, breakout_price, breakdown_price
def bot():
try:
# Check PnL and manage existing positions
func.pnl_close(symbol, target, max_loss)
# Check sleep on close (pause between trades)
func.sleep_on_close(symbol, pause_time)
# Get current prices
ask, bid = func.ask_bid(symbol)
# Detect breakout signals
buy_breakout, sell_breakdown, breakout_price, breakdown_price = detect_breakout(symbol)
# Get position info
_, in_position, position_size, _ = func.open_positions(symbol)
current_size = int(position_size) if position_size else 0
print(f'Status - In position: {in_position} | Size: {current_size} | Price: {bid:.2f}')
print(f'Signals - Breakout: {buy_breakout} | Breakdown: {sell_breakdown}')
# Only trade if not in position and size within limits
if not in_position and current_size < pos_size:
exchange.cancel_all_orders(symbol)
# Refresh prices after canceling orders
ask, bid = func.ask_bid(symbol)
if buy_breakout and breakout_price:
print(f'Placing LONG order - Size: {pos_size} at {breakout_price:.2f}')
exchange.create_limit_buy_order(symbol, pos_size, breakout_price, params)
print('BUY order placed - Sleeping 2 minutes...')
time.sleep(120)
elif sell_breakdown and breakdown_price:
print(f'Placing SHORT order - Size: {pos_size} at {breakdown_price:.2f}')
exchange.create_limit_sell_order(symbol, pos_size, breakdown_price, params)
print('SELL order placed - Sleeping 2 minutes...')
time.sleep(120)
else:
print('No breakout signal - Sleeping 1 minute...')
time.sleep(60)
else:
if in_position:
print('Already in position - Monitoring...')
else:
print('Position size at limit - No new trades')
except Exception as e:
print(f'Bot error: {e}')
schedule.every(28).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'Schedule error: {e} - Sleeping 30 seconds')
time.sleep(30)

View file

@ -0,0 +1,137 @@
'''
Consolidation Pop Strategy Overview:
Consolidation Detection:
- Identifies sideways price movement using True Range analysis
- Consolidation confirmed when TR deviation < 0.7% of current price
- Uses recent 20-candle period to establish consolidation range
Range Trading Logic:
- LONG: Buy in lower 1/3 of consolidation range (near support)
- SHORT: Sell in upper 1/3 of consolidation range (near resistance)
- Anticipates breakouts in either direction from established ranges
Entry Strategy:
- Waits for price to enter specific zones within consolidation
- Lower 1/3: Expects bounce from support with breakout potential
- Upper 1/3: Expects rejection from resistance with breakdown potential
- Contrarian approach: Buy weakness, sell strength within range
Risk Management:
- Take profit: 0.3% gain from entry
- Stop loss: 0.25% loss from entry
- Quick scalping approach for range-bound markets
Technical Setup:
- Timeframe: 5 minutes (adjustable for 1m, 3m, 15m, 1h)
- Lookback: 20 candles for range establishment
- Consolidation threshold: 0.7% TR deviation from price
- Targets small, quick moves within established ranges
Market Conditions:
- Best suited for sideways, range-bound markets
- Avoids trending markets where consolidation patterns break down
- Designed for short-term momentum bursts within ranges
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt, time, schedule
from Functions.functions import *
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
# Configuration
timeframe = '5m'
limit = 20
symbol = 'ETHUSDT'
size = 1
tp_percent = 0.3
sl_percent = 0.25
consolidation_percent = 0.7
params = {'timeInForce': 'GTC'}
def bot():
try:
# Check current position status
position_info, in_position, is_long = get_position(exchange, symbol)
if in_position:
print('Already in position - monitoring...')
return
# Get market data
candles = get_candle_df(exchange, symbol, timeframe, limit)
tr = calc_tr(candles)
current_price = exchange.fetch_ticker(symbol)['bid']
# Calculate TR deviation from current price
tr_deviation = (tr / candles['close'].iloc[-1]) * 100
print(f'Price: {current_price:.2f} | TR: {tr:.2f} | TR Deviation: {tr_deviation:.2f}%')
# Only trade during consolidation periods
if tr_deviation < consolidation_percent:
print('CONSOLIDATION detected - Analyzing range...')
# Get consolidation range extremes
low, high = get_extreme_of_consolidation(candles, consolidation_percent)
range_size = high - low
lower_third = low + (range_size / 3)
upper_third = high - (range_size / 3)
print(f'Range: {low:.2f} - {high:.2f} | Size: {range_size:.2f}')
print(f'Lower 1/3: {lower_third:.2f} | Upper 1/3: {upper_third:.2f}')
# LONG setup: Buy in lower 1/3 of consolidation (near support)
if current_price <= lower_third:
stop_loss = current_price * (1 - (sl_percent / 100))
take_profit = current_price * (1 + (tp_percent / 100))
print(f'LONG setup - Price in lower 1/3 of range')
print(f'Entry: {current_price:.2f} | SL: {stop_loss:.2f} | TP: {take_profit:.2f}')
exchange.create_limit_buy_order(symbol, size, current_price, params)
print('BUY order placed - Expecting consolidation pop upward')
# SHORT setup: Sell in upper 1/3 of consolidation (near resistance)
elif current_price >= upper_third:
stop_loss = current_price * (1 + (sl_percent / 100))
take_profit = current_price * (1 - (tp_percent / 100))
print(f'SHORT setup - Price in upper 1/3 of range')
print(f'Entry: {current_price:.2f} | SL: {stop_loss:.2f} | TP: {take_profit:.2f}')
exchange.create_limit_sell_order(symbol, size, current_price, params)
print('SELL order placed - Expecting consolidation pop downward')
else:
print(f'Price in middle zone ({current_price:.2f}) - No trade')
else:
print(f'TR deviation too high ({tr_deviation:.2f}%) - Not consolidating')
except Exception as e:
print(f'Bot error: {e}')
schedule.every(20).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'Schedule error: {e} - Sleeping 30 seconds')
time.sleep(30)

View file

@ -0,0 +1,233 @@
'''
Correlation Trading Strategy Overview:
Lead Asset Analysis:
- Monitors ETH price movements for breakouts
- Uses True Range and support/resistance levels as breakout signals
- Detects when ETH moves outside recent trading range
Correlation Logic:
- When ETH breaks above resistance/ATR: Look for LONG opportunities in lagging altcoins
- When ETH breaks below support/ATR: Look for SHORT opportunities in lagging altcoins
- Identifies the most "lagging" altcoin (least price movement) to trade
Entry Strategy:
- Calculates percentage change from last candle for each altcoin
- Trades the altcoin with minimal movement (highest correlation lag)
- Assumes lagging altcoins will catch up to ETH's direction
Risk Management:
- Stop loss: 0.2% against position
- Take profit: 0.25% in favor of position
- Quick execution to capture correlation momentum
Altcoin Universe:
- ADAUSDT, DOTUSDT, MANAUSDT, XRPUSDT, UNIUSDT, SOLUSDT
- All major altcoins that typically correlate with ETH movements
Technical Setup:
- Timeframe: 15 minutes (900 seconds)
- Lookback period: 20 candles
- Uses ATR and support/resistance for breakout confirmation
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt, time, schedule
from Functions.functions import *
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
# Configuration
timeframe = '15m'
data_range = 20
sl_percent = 0.2
tp_percent = 0.25
size = 1
params = {'timeInForce': 'GTC'}
# Lead symbol to monitor
lead_symbol = 'ETHUSDT'
# Altcoins to trade based on correlation
alt_coins = ['ADAUSDT', 'DOTUSDT', 'MANAUSDT', 'XRPUSDT', 'UNIUSDT', 'SOLUSDT']
def get_eth_signal():
"""Get ETH breakout signal and current price"""
try:
# Get ETH candle data
candles = get_candle_df(exchange, lead_symbol, timeframe, data_range)
# Calculate technical indicators
calc_atr(candles)
support, resistance = calc_sup_res(candles, data_range)
# Get current price
ticker = exchange.fetch_ticker(lead_symbol)
current_price = ticker['bid']
last_close = candles['close'].iloc[-1]
atr = candles['ATR'].iloc[-1]
print(f'ETH - Price: {current_price:.2f} | Last Close: {last_close:.2f} | ATR: {atr:.2f}')
print(f'Support: {support:.2f} | Resistance: {resistance:.2f}')
# Check for breakout signals
if current_price > last_close + atr or current_price > resistance:
return 'BULLISH', current_price
elif current_price < last_close - atr or current_price < support:
return 'BEARISH', current_price
else:
return 'NEUTRAL', current_price
except Exception as e:
print(f'Error getting ETH signal: {e}')
return 'NEUTRAL', 0
def find_most_lagging_altcoin(eth_price):
"""Find the altcoin with the least movement (most lagging)"""
try:
coin_data = {}
for coin in alt_coins:
try:
# Get current price and recent candle data
ticker = exchange.fetch_ticker(coin)
current_price = ticker['bid']
candles = get_candle_df(exchange, coin, timeframe, 5) # Last 5 candles
recent_close = candles['close'].iloc[-1]
# Calculate percentage change from recent close
pct_change = abs((current_price - recent_close) / recent_close) * 100
coin_data[coin] = pct_change
print(f'{coin}: Current: {current_price:.4f} | Recent Close: {recent_close:.4f} | Change: {pct_change:.2f}%')
except Exception as e:
print(f'Error getting data for {coin}: {e}')
continue
if coin_data:
# Find coin with minimum movement (most lagging)
most_lagging = min(coin_data, key=coin_data.get)
lag_amount = coin_data[most_lagging]
print(f'Most lagging coin: {most_lagging} (Change: {lag_amount:.2f}%)')
return most_lagging
else:
return None
except Exception as e:
print(f'Error finding lagging altcoin: {e}')
return None
def place_correlation_trade(signal, target_coin):
"""Place trade on the most lagging altcoin"""
try:
if not target_coin:
print('No target coin identified')
return
# Check if already in position for this coin
_, in_position, _, _ = get_position(exchange, target_coin)
if in_position:
print(f'Already in position for {target_coin}')
return
# Get current price for target coin
ticker = exchange.fetch_ticker(target_coin)
current_price = ticker['bid'] if signal == 'BULLISH' else ticker['ask']
# Calculate stop loss and take profit
if signal == 'BULLISH':
stop_loss = current_price * (1 - (sl_percent / 100))
take_profit = current_price * (1 + (tp_percent / 100))
print(f'Placing LONG on {target_coin} at {current_price:.4f}')
print(f'Stop Loss: {stop_loss:.4f} | Take Profit: {take_profit:.4f}')
exchange.create_limit_buy_order(target_coin, size, current_price, params)
elif signal == 'BEARISH':
stop_loss = current_price * (1 + (sl_percent / 100))
take_profit = current_price * (1 - (tp_percent / 100))
print(f'Placing SHORT on {target_coin} at {current_price:.4f}')
print(f'Stop Loss: {stop_loss:.4f} | Take Profit: {take_profit:.4f}')
exchange.create_limit_sell_order(target_coin, size, current_price, params)
# Note: In a full implementation, you'd want to place actual stop loss and take profit orders
# This would require additional order management logic
except Exception as e:
print(f'Error placing trade: {e}')
def manage_existing_positions():
"""Check and manage existing positions"""
for coin in alt_coins:
try:
position_info, in_position, _ = get_position(exchange, coin)
if in_position:
# Here you would implement PnL checking and position management
# For now, just log that we have a position
print(f'Managing position in {coin}')
# You could call a PnL management function here
# hit_target() or similar logic from other strategies
except Exception as e:
print(f'Error managing position for {coin}: {e}')
def bot():
try:
print('\n---- CORRELATION STRATEGY ACTIVE ----')
# Step 1: Manage existing positions
manage_existing_positions()
# Step 2: Check ETH for breakout signal
eth_signal, eth_price = get_eth_signal()
if eth_signal == 'NEUTRAL':
print('ETH: No breakout signal detected')
return
print(f'ETH BREAKOUT: {eth_signal} signal at {eth_price:.2f}')
# Step 3: Find most lagging altcoin
target_coin = find_most_lagging_altcoin(eth_price)
if target_coin:
# Step 4: Place correlation trade
place_correlation_trade(eth_signal, target_coin)
else:
print('No suitable target coin found')
print('=' * 50)
except Exception as e:
print(f'Bot error: {e}')
schedule.every(20).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'Schedule error: {e} - Sleeping 30 seconds')
time.sleep(30)

View file

@ -0,0 +1,119 @@
'''
Engulfing Candle Strategy Overview:
Pattern Recognition:
- Identifies engulfing candle patterns on 15-minute timeframe
- BULLISH engulfing: Current bid > previous candle close
- BEARISH engulfing: Current bid < previous candle close
Entry Conditions:
- LONG: Bid > last close AND bid > 20-period SMA (trend confirmation)
- SHORT: Bid < last close AND bid < 20-period SMA (trend confirmation)
- Uses dual confirmation: price action + trend direction
Order Management:
- Entry orders placed at 0.1% offset from current bid/ask
- Cancels all existing orders before placing new ones
- 2-minute pause after order placement
Exit Conditions:
- Profit target: 9% gain
- Stop loss: 8% loss
- Managed through external PnL function
Technical Setup:
- Timeframe: 15 minutes
- SMA period: 20
- Combines reversal pattern with trend following for higher probability trades
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt
import time, schedule
import Functions.funcs as func
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
symbol = 'BTCUSDT'
pos_size = 1
target = 9
max_loss = -8
params = {'timeInForce': 'GTC'}
def bot():
# Check PnL and manage existing positions
func.pnl_close(symbol, target, max_loss)
# Get market data and indicators
timeframe = '15m'
limit = 97
sma = 20
df = func.df_sma(symbol, timeframe, limit, sma)
# Get current prices
ask, bid = func.ask_bid(symbol)
# Get position info
_, in_position, current_size, _ = func.open_positions(symbol)
current_size = int(current_size) if current_size else 0
# Get technical levels
sma_20_15m = df.iloc[-1][f'sma{sma}_{timeframe}']
last_close = df.iloc[-1]['close']
print(f'Position: {in_position} | Size: {current_size} | Bid: {bid:.2f} | SMA: {sma_20_15m:.2f} | Last Close: {last_close:.2f}')
if not in_position:
# BULLISH engulfing: bid above last close + above SMA (uptrend)
if bid > last_close and bid > sma_20_15m:
print('BULLISH engulfing pattern detected - Opening LONG')
exchange.cancel_all_orders(symbol)
# Refresh prices and place order
ask, bid = func.ask_bid(symbol)
order_price = bid * 0.999 # 0.1% below bid
exchange.create_limit_buy_order(symbol, pos_size, order_price, params)
print(f'BUY order placed at {order_price:.2f} - Sleeping 2 minutes')
time.sleep(120)
# BEARISH engulfing: bid below last close + below SMA (downtrend)
elif bid < last_close and bid < sma_20_15m:
print('BEARISH engulfing pattern detected - Opening SHORT')
exchange.cancel_all_orders(symbol)
# Refresh prices and place order
ask, bid = func.ask_bid(symbol)
order_price = ask * 1.001 # 0.1% above ask
exchange.create_limit_sell_order(symbol, pos_size, order_price, params)
print(f'SELL order placed at {order_price:.2f} - Sleeping 2 minutes')
time.sleep(120)
else:
print('No engulfing pattern or trend confirmation - No trade')
else:
print('Already in position - Monitoring...')
schedule.every(28).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'Error: {e} - Sleeping 30 seconds')
time.sleep(30)

View file

@ -0,0 +1,205 @@
'''
Bollinger Bands Volatility Breakout Strategy Overview:
Volatility Detection:
- Uses BTC Bollinger Bands on 1-minute timeframe to gauge market volatility
- "Tight bands" indicate low volatility periods (compression before expansion)
- Strategy activates only during low volatility conditions
Breakout Anticipation:
- Places BOTH buy and sell orders simultaneously when bands are tight
- Anticipates volatility expansion (breakout) in either direction
- Uses 11th level bid/ask for better execution probability
Dual Order Logic:
- BUY order: Positioned to catch upward breakout
- SELL order: Positioned to catch downward breakout
- One order gets filled, the other gets canceled when volatility returns
- Position size split in half to accommodate dual orders
Risk Management:
- Profit target: 5% gain
- Stop loss: 10% loss
- Automatic position closure when volatility returns (bands widen)
- Maximum 1 position with controlled exposure
Technical Setup:
- Reference: BTC Bollinger Bands (market volatility proxy)
- Trading Symbol: WIF (or configurable)
- Timeframe: 1 minute (500 periods for BB calculation)
- Position sizing: 50% of normal size per order (dual orders)
- Leverage: 3x (adjustable)
Strategy Logic:
- Low volatility (tight bands): Place dual orders expecting breakout
- High volatility (wide bands): Close all positions and orders
- Uses BTC as volatility gauge but trades altcoins for better moves
WARNING: Do not run without thorough backtesting and understanding!
'''
import Functions.funcs as func
import eth_account
import time
import schedule
import os
from dotenv import load_dotenv
load_dotenv()
# Configuration
symbol = 'WIF'
timeframe = '15m'
sma_window = 20
lookback_days = 1
size = 1
target = 5
max_loss = -10
leverage = 3
max_positions = 1
def get_account():
"""Initialize trading account from private key"""
secret = os.getenv('PRIVATE_KEY')
return eth_account.Account.from_key(secret)
def get_position_info(symbol, account):
"""Get current position and setup information"""
try:
positions, in_position, position_size, pos_symbol, entry_price, pnl_percent, is_long, num_positions = func.get_position_andmaxpos(
symbol, account, max_positions
)
print(f'Position Status - In position: {in_position} | Size: {position_size} | PnL: {pnl_percent}%')
# Setup leverage and get adjusted position size
leverage_set, pos_size = func.adjust_leverage_size_signal(symbol, leverage, account)
# Split position size for dual orders
dual_order_size = pos_size / 2
return in_position, dual_order_size, positions
except Exception as e:
print(f'Error getting position info: {e}')
return False, size / 2, []
def check_bollinger_volatility():
"""Check if Bollinger Bands are tight (low volatility condition)"""
try:
# Get BTC data for volatility analysis
snapshot_data = func.get_ohlcv2('BTC', '1m', 500)
df = func.process_data_to_df(snapshot_data)
# Calculate Bollinger Bands
bb_data = func.calculate_bollinger_bands(df)
bands_tight = bb_data[1] # Boolean indicating if bands are tight
print(f'BTC Bollinger Bands Analysis - Tight (Low Volatility): {bands_tight}')
return bands_tight
except Exception as e:
print(f'Error checking Bollinger volatility: {e}')
return False
def get_market_levels(symbol):
"""Get current market levels for order placement"""
try:
ask, bid, l2_data = func.ask_bid(symbol)
# Get 11th level for better execution
bid_11 = float(l2_data[0][10]['px'])
ask_11 = float(l2_data[1][10]['px'])
print(f'Market Levels - Bid: {bid:.4f} | Ask: {ask:.4f}')
print(f'11th Level - Bid: {bid_11:.4f} | Ask: {ask_11:.4f}')
return bid_11, ask_11
except Exception as e:
print(f'Error getting market levels: {e}')
return 0, 0
def place_dual_orders(symbol, bid_11, ask_11, position_size, account):
"""Place both buy and sell orders for breakout capture"""
try:
func.cancel_all_orders(account)
print('Canceled all existing orders')
# Place buy order (upward breakout)
func.limit_order(symbol, True, position_size, bid_11, False, account)
print(f'BUY order placed: {position_size} at {bid_11:.4f}')
# Place sell order (downward breakout)
func.limit_order(symbol, False, position_size, ask_11, False, account)
print(f'SELL order placed: {position_size} at {ask_11:.4f}')
print('Dual orders active - Ready for volatility breakout')
except Exception as e:
print(f'Error placing dual orders: {e}')
def manage_volatility_exit(account):
"""Close all positions and orders when volatility returns"""
try:
print('High volatility detected - Closing all positions and orders')
func.cancel_all_orders(account)
func.close_all_positions(account)
print('All positions and orders closed')
except Exception as e:
print(f'Error managing volatility exit: {e}')
def bot():
try:
print(f'\n---- Bollinger Bands Volatility Strategy for {symbol} ----')
# Initialize account
account = get_account()
# Get position information
in_position, dual_order_size, positions = get_position_info(symbol, account)
# Check existing positions first
if in_position:
func.cancel_all_orders(account)
print('Managing existing position - checking PnL')
func.pnl_close(symbol, target, max_loss, account)
return
# Check Bollinger Bands volatility condition
bands_tight = check_bollinger_volatility()
if not in_position and bands_tight:
print('LOW VOLATILITY detected - Placing dual breakout orders')
# Get market levels
bid_11, ask_11 = get_market_levels(symbol)
if bid_11 > 0 and ask_11 > 0:
place_dual_orders(symbol, bid_11, ask_11, dual_order_size, account)
else:
print('Invalid market data - skipping dual orders')
elif not bands_tight:
print('HIGH VOLATILITY detected - Managing exits')
manage_volatility_exit(account)
else:
print(f'Position status: {in_position} | Volatility condition: {"Low" if bands_tight else "High"}')
print('=' * 60)
except Exception as e:
print(f'Bot error: {e}')
schedule.every(30).seconds.do(bot)
while True:
try:
schedule.run_pending()
time.sleep(10)
except Exception as e:
print(f'Schedule error: {e} - Sleeping 30 seconds')
time.sleep(30)

View file

@ -0,0 +1,191 @@
'''
Supply & Demand Zone Trading Strategy Overview:
Zone Identification:
- Calculates supply zones (resistance areas where selling pressure exists)
- Calculates demand zones (support areas where buying pressure exists)
- Uses historical price action analysis to identify key levels
- Zones represent areas where institutional orders likely cluster
Entry Strategy:
- BUY: Places orders at demand zone levels (support areas)
- SELL: Places orders at supply zone levels (resistance areas)
- Chooses the zone closest to current price for higher probability fills
- Uses average of multiple zone levels for more reliable entries
Position Management:
- Supports partial position building (adds to positions under target size)
- Maximum 1 full position at a time with partial fills allowed
- Automatic position sizing adjustments based on existing exposure
- Leverage: 3x (adjustable)
Risk Management:
- Profit target: 5% gain
- Stop loss: 10% loss
- Continuous PnL monitoring for existing positions
- Order cancellation and replacement system
Technical Setup:
- Symbol: WIF (configurable)
- Timeframe: 15 minutes for zone calculation
- SMA confirmation: 20-period moving average
- Lookback: 1 day for zone identification
- Platform: Hyper Liquid (on-chain execution)
Zone Logic:
- Supply zones: Areas where price previously dropped (selling pressure)
- Demand zones: Areas where price previously bounced (buying pressure)
- Strategy assumes these levels will act as support/resistance again
WARNING: Do not run without thorough backtesting and understanding!
'''
import Functions.funcs as func
import eth_account
import time
import pandas as pd
import schedule
import os
from dotenv import load_dotenv
load_dotenv()
# Configuration
symbol = 'WIF'
timeframe = '15m'
sma_window = 20
lookback_days = 1
size = 1
target = 5
max_loss = -10
leverage = 3
max_positions = 1
def get_account():
"""Initialize trading account from private key"""
secret = os.getenv('PRIVATE_KEY')
return eth_account.Account.from_key(secret)
def setup_account():
"""Setup account with leverage"""
account = get_account()
func.adjust_leverage_size_signal(symbol, leverage, account)
return account
def get_position_info(symbol, account):
"""Get current position information and calculate adjusted size"""
try:
positions, in_position, position_size, pos_symbol, entry_price, pnl_percent, is_long = func.get_position(symbol, account)
print(f'Position Status - In position: {in_position} | Size: {position_size} | PnL: {pnl_percent}%')
# Calculate adjusted position size for partial fills
if 0 < position_size < size:
adjusted_size = size - position_size
print(f'Partial position detected - Current: {position_size} | Needed: {adjusted_size}')
return False, adjusted_size, positions # Treat as not in position for adding
else:
return in_position, size, positions
except Exception as e:
print(f'Error getting position info: {e}')
return False, size, []
def get_supply_demand_zones(symbol):
"""Calculate supply and demand zones and determine best entry"""
try:
# Get supply and demand zones
sd_df = func.supply_demand_zones_hl(symbol, timeframe, lookback_days)
print('Supply & Demand Zones:')
print(sd_df)
# Convert to numeric
sd_df[f'{timeframe}_dz'] = pd.to_numeric(sd_df[f'{timeframe}_dz'], errors='coerce')
sd_df[f'{timeframe}_sz'] = pd.to_numeric(sd_df[f'{timeframe}_sz'], errors='coerce')
# Calculate average zone levels
demand_zone_avg = float(sd_df[f'{timeframe}_dz'].mean())
supply_zone_avg = float(sd_df[f'{timeframe}_sz'].mean())
# Get current price and SMA confirmation
current_price = func.ask_bid(symbol)[0]
latest_sma = func.get_latest_sma(symbol, timeframe, sma_window, 2)
print(f'Market Data - Price: {current_price:.4f} | SMA: {latest_sma:.4f}')
print(f'Zones - Demand: {demand_zone_avg:.4f} | Supply: {supply_zone_avg:.4f}')
# Calculate distance to each zone
distance_to_demand = abs(current_price - demand_zone_avg)
distance_to_supply = abs(current_price - supply_zone_avg)
print(f'Distances - To Demand: {distance_to_demand:.4f} | To Supply: {distance_to_supply:.4f}')
# Choose closest zone for entry
if distance_to_demand < distance_to_supply:
return 'BUY', demand_zone_avg, 'demand'
else:
return 'SELL', supply_zone_avg, 'supply'
except Exception as e:
print(f'Error calculating supply/demand zones: {e}')
return None, None, None
def place_zone_trade(signal, entry_price, zone_type, position_size, account):
"""Place trade order at supply or demand zone"""
try:
func.cancel_all_orders(account)
print('Canceled all existing orders')
if signal == 'BUY':
func.limit_order(symbol, True, position_size, entry_price, False, account)
print(f'LONG order placed at {zone_type} zone: {position_size} at {entry_price:.4f}')
elif signal == 'SELL':
func.limit_order(symbol, False, position_size, entry_price, False, account)
print(f'SHORT order placed at {zone_type} zone: {position_size} at {entry_price:.4f}')
except Exception as e:
print(f'Error placing zone trade: {e}')
def bot():
try:
print(f'\n---- Supply & Demand Zone Strategy for {symbol} ----')
# Setup account
account = get_account()
# Get position information
in_position, position_size, positions = get_position_info(symbol, account)
# Check PnL for existing positions
if in_position:
print('Managing existing position')
func.pnl_close(symbol, target, max_loss, account)
return
# Look for new trade opportunities
print('Analyzing supply & demand zones...')
signal, entry_price, zone_type = get_supply_demand_zones(symbol)
if signal and entry_price:
place_zone_trade(signal, entry_price, zone_type, position_size, account)
else:
print('No valid zone signals found')
print('=' * 60)
except Exception as e:
print(f'Bot error: {e}')
# Initial setup
account = setup_account()
schedule.every(30).seconds.do(bot)
while True:
try:
schedule.run_pending()
time.sleep(10)
except Exception as e:
print(f'Schedule error: {e} - Sleeping 30 seconds')
time.sleep(30)

View file

@ -0,0 +1,173 @@
'''
VWAP Trading Strategy Overview:
VWAP Signal Logic:
- Uses Volume Weighted Average Price as primary trend indicator
- Price > VWAP: Bullish bias (70% chance to go long)
- Price < VWAP: Bearish bias (30% chance to go long)
- Incorporates randomization to avoid predictable patterns
Order Placement Strategy:
- Uses 11th level bid/ask for better fill probability
- Avoids competing at best bid/offer for improved execution
- Places limit orders to capture spread while getting fills
Position Management:
- Maximum 1 position at a time
- Leverage: 3x (adjustable)
- Profit target: 5% gain
- Stop loss: 10% loss
- Dynamic position sizing based on leverage
Risk Management:
- Automatic position and PnL monitoring
- Order cancellation and re-placement system
- Maximum position limits to control exposure
- Ethereum account-based execution with private key security
Technical Setup:
- Symbol: LINK (configurable)
- Timeframe: 1 minute
- VWAP calculation: Real-time volume weighted average
- Execution: DeFi/on-chain trading platform
WARNING: Do not run without thorough backtesting and understanding!
'''
import Functions.funcs as func
import eth_account
import time, random
import schedule
import os
from dotenv import load_dotenv
load_dotenv()
# Configuration
symbol = 'LINK'
timeframe = '1m'
sma_window = 20
lookback_days = 1
size = 1
target = 5
max_loss = -10
leverage = 3
max_positions = 1
def get_account():
"""Initialize trading account from private key"""
secret = os.getenv('PRIVATE_KEY')
return eth_account.Account.from_key(secret)
def get_vwap_signal(symbol):
"""Calculate VWAP signal and determine trade direction"""
try:
ask, bid, l2_data = func.ask_bid(symbol)
# Get 11th level bid and ask for better execution
bid_11 = float(l2_data[0][10]['px'])
ask_11 = float(l2_data[1][10]['px'])
# Get current VWAP
latest_vwap = func.calculate_vwap_with_symbol(symbol)[1]
print(f'Market Data - Bid: {bid:.4f} | Ask: {ask:.4f} | VWAP: {latest_vwap:.4f}')
print(f'11th Level - Bid: {bid_11:.4f} | Ask: {ask_11:.4f}')
# Randomization factor for signal variation
random_chance = random.random()
# VWAP-based signal logic
if bid > latest_vwap:
# Price above VWAP - bullish bias
go_long = random_chance <= 0.7 # 70% chance
signal_strength = "Strong Bullish" if go_long else "Weak Bullish"
print(f'Price > VWAP: {signal_strength} (Random: {random_chance:.2f})')
else:
# Price below VWAP - bearish bias
go_long = random_chance <= 0.3 # 30% chance
signal_strength = "Contrarian Long" if go_long else "Bearish"
print(f'Price < VWAP: {signal_strength} (Random: {random_chance:.2f})')
return go_long, bid_11, ask_11, latest_vwap
except Exception as e:
print(f'Error getting VWAP signal: {e}')
return False, 0, 0, 0
def manage_position(symbol, account):
"""Check and manage existing positions"""
try:
positions, in_position, position_size, pos_symbol, entry_price, pnl_percent, is_long, num_positions = func.get_position_andmaxpos(
symbol, account, max_positions
)
print(f'Position Status - In position: {in_position} | Size: {position_size} | PnL: {pnl_percent}%')
if in_position:
func.cancel_all_orders(account)
print('Managing existing position - checking PnL')
func.pnl_close(symbol, target, max_loss, account)
return True
else:
print('No position - ready for new trade')
return False
except Exception as e:
print(f'Error managing position: {e}')
return False
def place_trade(symbol, go_long, bid_11, ask_11, account):
"""Place trade order based on signal"""
try:
# Adjust leverage and position size
leverage_set, position_size = func.adjust_leverage_size_signal(symbol, leverage, account)
func.cancel_all_orders(account)
print('Canceled all existing orders')
if go_long:
# Place buy order at 11th bid level
func.limit_order(symbol, True, position_size, bid_11, False, account)
print(f'LONG order placed: {position_size} at {bid_11:.4f}')
else:
# Place sell order at 11th ask level
func.limit_order(symbol, False, position_size, ask_11, False, account)
print(f'SHORT order placed: {position_size} at {ask_11:.4f}')
except Exception as e:
print(f'Error placing trade: {e}')
def bot():
try:
print(f'\n---- VWAP Strategy Running for {symbol} ----')
# Initialize account
account = get_account()
# Check existing positions first
in_position = manage_position(symbol, account)
if not in_position:
# Get VWAP signal and market data
go_long, bid_11, ask_11, vwap = get_vwap_signal(symbol)
if bid_11 > 0 and ask_11 > 0: # Valid market data
place_trade(symbol, go_long, bid_11, ask_11, account)
else:
print('Invalid market data - skipping trade')
print('=' * 50)
except Exception as e:
print(f'Bot error: {e}')
schedule.every(3).seconds.do(bot)
while True:
try:
schedule.run_pending()
time.sleep(10)
except Exception as e:
print(f'Schedule error: {e} - Sleeping 30 seconds')
time.sleep(30)

View file

@ -0,0 +1,341 @@
'''
Market Maker Strategy Overview:
Market Making Logic:
- Places buy orders near recent lows, sell orders near recent highs
- Uses dynamic range based on recent high-low spread
- Only trades within configured percentage of recent range extremes
Risk Management:
- ATR-based volatility filtering (no trading if volatility too high)
- Emergency position size limits with automatic closure
- Time-based position limits (auto-close after time limit)
- Stop losses with trailing functionality
Entry Conditions:
- BUY: When bid drops below (low + range_percentage)
- SELL: When ask rises above (high - range_percentage)
- Range calculated from recent N-period high/low
- Filtered out during high volatility periods
Exit Conditions:
- Profit target: 0.4% gain from entry
- Stop loss: 0.1% loss from entry
- Time limit: 2 hours maximum hold time
- Emergency size limit: $1100 maximum risk
Technical Filters:
- Uses 5-minute timeframe with 180-period lookback (15 hours)
- ATR volatility filter prevents trading in choppy markets
- Recent high/low trend analysis prevents bad entries
WARNING: This strategy is complex and has not been fully backtested.
Use only for educational purposes without proper testing.
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt
import pandas as pd
import time
from datetime import datetime, timedelta
import schedule
import warnings
import os
from dotenv import load_dotenv
warnings.filterwarnings("ignore")
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
# Configuration
size = 4200
symbol = 'DYDXUSDT'
perc_from_lh = 0.35
close_seconds = 60 * 47 # 47 minutes
trade_pause_mins = 15
max_lh = 1250
timeframe = '5m'
num_bars = 180
max_risk = 1100
sl_perc = 0.1
exit_perc = 0.004
max_tr = 550
quartile = 0.33
time_limit = 120 # minutes
sleep_time = 30
params = {'timeInForce': 'GTC'}
def get_bid_ask(symbol=symbol):
orderbook = exchange.fetch_order_book(symbol)
bid = orderbook['bids'][0][0]
ask = orderbook['asks'][0][0]
bid_vol = orderbook['bids'][0][1]
ask_vol = orderbook['asks'][0][1]
return ask, bid, ask_vol, bid_vol
def get_open_positions(symbol=symbol):
positions = exchange.fetch_positions([symbol])
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
return {
'side': pos['side'],
'size': float(pos['contracts']),
'entry_price': float(pos['entryPrice']),
'is_long': pos['side'] == 'long',
'has_position': True
}
return {
'side': None,
'size': 0,
'entry_price': 0,
'is_long': None,
'has_position': False
}
def size_kill(symbol=symbol):
position = get_open_positions(symbol)
if position['has_position']:
position_cost = abs(position['size'] * position['entry_price'])
if position_cost > max_risk:
print(f'EMERGENCY: Position size ${position_cost} exceeds max risk ${max_risk}')
kill_switch(symbol)
print('Emergency closure complete. Sleeping 72 hours...')
time.sleep(260000)
def kill_switch(symbol=symbol):
print('KILL SWITCH ACTIVATED')
while True:
position = get_open_positions(symbol)
if not position['has_position']:
break
exchange.cancel_all_orders(symbol)
ask, bid, _, _ = get_bid_ask(symbol)
if position['is_long']:
exchange.create_limit_sell_order(symbol, position['size'], ask, params)
print(f'SELL to close: {position["size"]} at {ask}')
else:
exchange.create_limit_buy_order(symbol, position['size'], bid, params)
print(f'BUY to close: {position["size"]} at {bid}')
time.sleep(30)
def get_order_status(symbol=symbol):
open_orders = exchange.fetch_open_orders(symbol)
has_stop_loss = any(order['type'] == 'stop_market' for order in open_orders)
has_close_order = any(order['type'] == 'limit' for order in open_orders)
has_entry_order = len(open_orders) > 0 and not get_open_positions(symbol)['has_position']
return {
'has_both': has_stop_loss and has_close_order,
'needs_stop_loss': not has_stop_loss,
'needs_close_order': not has_close_order,
'has_entry_pending': has_entry_order
}
def calculate_atr(df, period=7):
df['prev_close'] = df['close'].shift(1)
df['high_low'] = abs(df['high'] - df['low'])
df['high_prev_close'] = abs(df['high'] - df['prev_close'])
df['low_prev_close'] = abs(df['low'] - df['prev_close'])
df['tr'] = df[['high_low', 'high_prev_close', 'low_prev_close']].max(axis=1)
df['atr'] = df['tr'].rolling(period).mean()
return df
def get_pnl(symbol=symbol):
position = get_open_positions(symbol)
if not position['has_position']:
return "No position", 0, 0
ask, bid, _, _ = get_bid_ask(symbol)
current_price = bid
entry_price = position['entry_price']
if position['is_long']:
price_diff = current_price - entry_price
else:
price_diff = entry_price - current_price
try:
pnl_percent = (price_diff / entry_price) * 100
except:
pnl_percent = 0
pnl_text = f'PnL: {pnl_percent:.2f}%'
print(pnl_text)
return pnl_text, ask, bid
def create_stop_order(symbol, reference_price, direction):
position = get_open_positions(symbol)
position_size = position['size'] if position['has_position'] else size
if direction == 'SELL': # Stop for long position
stop_price = reference_price * 1.001
exchange.create_order(
symbol, 'stop_market', 'sell', position_size, None,
params={'stopPrice': stop_price}
)
print(f'Stop loss created: SELL at {stop_price}')
elif direction == 'BUY': # Stop for short position
stop_price = reference_price * 0.999
exchange.create_order(
symbol, 'stop_market', 'buy', position_size, None,
params={'stopPrice': stop_price}
)
print(f'Stop loss created: BUY at {stop_price}')
def bot():
print('\n---- MARKET MAKER ACTIVE ----\n')
# Check PnL and get current prices
pnl_info = get_pnl(symbol)
ask = pnl_info[1]
bid = pnl_info[2]
# Emergency size check
size_kill()
# Get market data
bars = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=num_bars)
df = pd.DataFrame(bars[:-1], columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
# Calculate technical indicators
df = calculate_atr(df)
# Get range data
low = df['low'].min()
high = df['high'].max()
range_size = high - low
avg_price = (high + low) / 2
print(f'Range: {low:.2f} - {high:.2f} | Size: {range_size:.2f} | Avg: {avg_price:.2f}')
# Check if range is too large (high volatility)
if range_size > max_lh:
print('RANGE TOO LARGE - No trading')
kill_switch()
return
# Check recent trend (no new highs/lows in last 17 bars)
recent_lows = df['low'].tail(17).tolist()
no_trading = any(low >= recent_low for recent_low in recent_lows)
if no_trading:
print('Recent trend unfavorable - No trading')
return
# Get position and order status
position = get_open_positions(symbol)
order_status = get_order_status(symbol)
# Calculate entry levels
open_range = range_size * perc_from_lh
sell_to_open_limit = high - open_range
buy_to_open_limit = low + open_range
print(f'Entry levels - Buy below: {buy_to_open_limit:.2f}, Sell above: {sell_to_open_limit:.2f}')
# Calculate exit price for existing position
if position['has_position']:
if position['is_long']:
exit_price = position['entry_price'] * (1 + exit_perc)
else:
exit_price = position['entry_price'] * (1 - exit_perc)
# Check time limit for existing positions
current_time = int(time.time())
# Main trading logic
if not position['has_position'] and not order_status['has_entry_pending']:
# Look for entry opportunities
if ask > sell_to_open_limit:
print('SELLING to open - Price above sell level')
exchange.cancel_all_orders(symbol)
# Create sell order
sell_price = ask * 1.0009
exchange.create_limit_sell_order(symbol, size, sell_price, params)
print(f'SELL order placed at {sell_price}')
# Create stop loss
create_stop_order(symbol, high, 'SELL')
time.sleep(300) # 5 minute pause
elif bid < buy_to_open_limit:
print('BUYING to open - Price below buy level')
exchange.cancel_all_orders(symbol)
# Create buy order
buy_price = bid * 0.9991
exchange.create_limit_buy_order(symbol, size, buy_price, params)
print(f'BUY order placed at {buy_price}')
# Create stop loss
create_stop_order(symbol, low, 'BUY')
time.sleep(300) # 5 minute pause
else:
print('Price in middle of range - No entry')
elif position['has_position']:
# Manage existing position
if order_status['needs_close_order']:
# Create close order at profit target
if position['is_long']:
exchange.create_limit_sell_order(symbol, position['size'], exit_price, params)
print(f'Close order: SELL at {exit_price}')
else:
exchange.create_limit_buy_order(symbol, position['size'], exit_price, params)
print(f'Close order: BUY at {exit_price}')
if order_status['needs_stop_loss']:
# Create missing stop loss
if position['is_long']:
create_stop_order(symbol, low, 'BUY')
else:
create_stop_order(symbol, high, 'SELL')
else:
print('All orders in place - Monitoring...')
time.sleep(12)
print('=' * 40)
schedule.every(25).seconds.do(bot)
while True:
try:
schedule.run_pending()
time.sleep(15)
except Exception as e:
print(f'Bot error: {e}')
print('Sleeping 75 seconds...')
time.sleep(75)

View file

@ -0,0 +1,261 @@
'''
Multi-Ticker Mean Reversion Strategy Overview:
Strategy Logic:
- Trades multiple cryptocurrencies using mean reversion around 15m SMA
- Uses 4h SMA for trend direction (only trade with the trend)
- BULLISH trend (4h): Only LONG when price drops below 15m SMA
- BEARISH trend (4h): Only SHORT when price rises above 15m SMA
- 5m confirmation: Requires bullish/bearish candle confirmation
Entry Conditions:
- LONG: 4h bullish + price < 15m SMA + 5m bullish confirmation
- SHORT: 4h bearish + price > 15m SMA + 5m bearish confirmation
- Entry at 0.8% deviation from 15m SMA
Exit Conditions:
- Profit target: 9% gain
- Stop loss: 8% loss
- Automatic order cancellation every 30 minutes
Risk Management:
- Position sizing based on account balance
- Multiple symbol exposure with individual PnL tracking
- Leverage: 10x (configurable)
WARNING: This strategy has not been backtested. Do not run live without proper testing.
'''
import ccxt
import pandas as pd
import numpy as np
from datetime import datetime
import time, schedule
import random
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
pos_size = 30
target = 9
max_loss = -8
leverage = 10
timeframe = '15m'
limit = 97
sma = 20
params = {'timeInForce': 'GTC'}
# Popular crypto symbols for Binance futures
SYMBOLS = [
'BTCUSDT', 'ETHUSDT', 'ADAUSDT', 'DOTUSDT', 'LINKUSDT',
'LTCUSDT', 'BCHUSDT', 'XLMUSDT', 'EOSUSDT', 'TRXUSDT',
'ETCUSDT', 'DASHUSDT', 'XMRUSDT', 'ZECUSDT', 'XRPUSDT',
'BNBUSDT', 'SOLUSDT', 'AVAXUSDT', 'MATICUSDT', 'UNIUSDT',
'SUSHIUSDT', 'AAVEUSDT', 'COMPUSDT', 'MKRUSDT', 'YFIUSDT'
]
def ask_bid(symbol):
orderbook = exchange.fetch_order_book(symbol)
bid = orderbook['bids'][0][0]
ask = orderbook['asks'][0][0]
return ask, bid
def get_open_positions():
positions = exchange.fetch_positions()
active_positions = []
for pos in positions:
if pos['symbol'] in SYMBOLS and float(pos['contracts']) != 0:
active_positions.append({
'symbol': pos['symbol'],
'side': pos['side'],
'size': float(pos['contracts']),
'entry_price': float(pos['entryPrice']),
'is_long': pos['side'] == 'long'
})
return active_positions
def kill_switch(symbol):
print(f'Closing position for {symbol}')
positions = exchange.fetch_positions([symbol])
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
size = float(pos['contracts'])
is_long = pos['side'] == 'long'
exchange.cancel_all_orders(symbol)
ask, bid = ask_bid(symbol)
if is_long:
exchange.create_limit_sell_order(symbol, size, ask, params)
print(f'SELL to close: {symbol}')
else:
exchange.create_limit_buy_order(symbol, size, bid, params)
print(f'BUY to close: {symbol}')
time.sleep(5)
break
def kill_switch_all():
print('Closing all positions...')
active_positions = get_open_positions()
for pos in active_positions:
kill_switch(pos['symbol'])
def get_sma_data(symbol, timeframe, limit, sma_period):
try:
bars = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)
df = pd.DataFrame(bars, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df[f'sma{sma_period}_{timeframe}'] = df['close'].rolling(sma_period).mean()
# Add signal based on price vs SMA
_, bid = ask_bid(symbol)
df.loc[df[f'sma{sma_period}_{timeframe}'] > bid, 'sig'] = 'SELL'
df.loc[df[f'sma{sma_period}_{timeframe}'] < bid, 'sig'] = 'BUY'
# Add bullish confirmation (current close > previous close)
df['prev_close'] = df['close'].shift(1)
df['bullish_confirmation'] = df['close'] > df['prev_close']
return df
except Exception as e:
print(f'Error getting SMA data for {symbol}: {e}')
return pd.DataFrame()
def pnl_close(symbol):
positions = exchange.fetch_positions([symbol])
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
entry_price = float(pos['entryPrice'])
leverage_used = float(pos['leverage'])
is_long = pos['side'] == 'long'
_, current_price = ask_bid(symbol)
if is_long:
price_diff = current_price - entry_price
else:
price_diff = entry_price - current_price
try:
pnl_percent = ((price_diff / entry_price) * leverage_used) * 100
except:
pnl_percent = 0
print(f'{symbol} PnL: {pnl_percent:.2f}%')
if pnl_percent >= target:
print(f'{symbol} hit profit target: {target}%')
kill_switch(symbol)
elif pnl_percent <= max_loss:
print(f'{symbol} hit stop loss: {max_loss}%')
kill_switch(symbol)
break
def get_current_time_info():
now = datetime.now()
comp24time = int(now.strftime('%H%M'))
return comp24time
def cancel_all_orders_scheduled():
comp24time = get_current_time_info()
# Cancel orders every 30 minutes
cancel_times = [0, 30, 100, 130, 200, 230, 300, 330, 400, 430, 500, 530,
600, 630, 700, 730, 800, 830, 900, 930, 1000, 1030, 1100,
1130, 1200, 1230, 1300, 1330, 1400, 1430, 1500, 1530, 1600,
1630, 1700, 1730, 1800, 1830, 1900, 1930, 2000, 2030, 2100,
2130, 2200, 2230, 2300, 2330]
if comp24time in cancel_times:
print('Cancelling all pending orders...')
for symbol in SYMBOLS:
try:
exchange.cancel_all_orders(symbol)
except:
pass
def bot():
# Check and close positions first
active_positions = get_open_positions()
for pos in active_positions:
pnl_close(pos['symbol'])
# Cancel orders if scheduled
cancel_all_orders_scheduled()
# Select random symbol to analyze
symbol = random.choice(SYMBOLS)
try:
# Set leverage
exchange.set_leverage(leverage, symbol)
# Get SMA data for different timeframes
df_4h = get_sma_data(symbol, '4h', 31, sma)
df_15m = get_sma_data(symbol, '15m', 97, sma)
df_5m = get_sma_data(symbol, '5m', 100, sma)
if df_4h.empty or df_15m.empty or df_5m.empty:
return
# Get current market data
ask, bid = ask_bid(symbol)
# Get signals
sig_4h = df_4h['sig'].iloc[-1]
sma_15m = df_15m[f'sma{sma}_15m'].iloc[-1]
bullish_confirmation_5m = df_5m['bullish_confirmation'].iloc[-1]
# Calculate entry levels
sma_minus_008 = sma_15m * 0.992 # 0.8% below SMA
sma_plus_008 = sma_15m * 1.008 # 0.8% above SMA
# Check if already in position
in_position = any(pos['symbol'] == symbol for pos in active_positions)
# Mean reversion strategy
if not in_position:
if sig_4h == 'BUY': # Bullish 4h trend
# Look for longs when price drops below 15m SMA
if bid <= sma_15m and bullish_confirmation_5m:
exchange.cancel_all_orders(symbol)
exchange.create_limit_buy_order(symbol, pos_size, sma_minus_008, params)
print(f'BUY to open: {symbol} at {sma_minus_008}')
elif sig_4h == 'SELL': # Bearish 4h trend
# Look for shorts when price rises above 15m SMA
if bid >= sma_15m and not bullish_confirmation_5m:
exchange.cancel_all_orders(symbol)
exchange.create_limit_sell_order(symbol, pos_size, sma_plus_008, params)
print(f'SELL to open: {symbol} at {sma_plus_008}')
except Exception as e:
print(f'Error processing {symbol}: {e}')
schedule.every(10).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'Bot error: {e}. Sleeping 7 seconds...')
time.sleep(7)

View file

@ -0,0 +1,300 @@
'''
SMA + Order Book Strategy Overview:
Direction Signal:
- Uses 20-day SMA on daily timeframe to determine market bias
- BUY when price > daily SMA, SELL when price < daily SMA
Entry Logic:
- Uses 15-minute SMA with offset levels for precise entries
- Places dual limit orders around 15m SMA (±0.1% to ±0.3%)
- Only enters when price aligns with daily SMA direction
Risk Management:
- Profit target: 8% gain
- Stop loss: 9% loss
- Order book volume analysis confirms exits
- Position size management with incremental entries
Volume Confirmation:
- Analyzes bid/ask volume ratio over 55 seconds
- Delays exits if volume ratio < 0.4 (insufficient confirmation)
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt
import pandas as pd
import time, schedule
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
symbol = 'BTCUSDT'
pos_size = 30
params = {'timeInForce': 'GTC'}
target = 8
max_loss = -9
vol_decimal = 0.4
def ask_bid():
orderbook = exchange.fetch_order_book(symbol)
bid = orderbook['bids'][0][0]
ask = orderbook['asks'][0][0]
return ask, bid
def daily_sma():
print('Calculating daily SMA...')
bars = exchange.fetch_ohlcv(symbol, timeframe='1d', limit=100)
df = pd.DataFrame(bars, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df['sma20_d'] = df['close'].rolling(20).mean()
_, bid = ask_bid()
df.loc[df['sma20_d'] > bid, 'sig'] = 'SELL'
df.loc[df['sma20_d'] < bid, 'sig'] = 'BUY'
return df
def f15_sma():
print('Calculating 15m SMA...')
bars = exchange.fetch_ohlcv(symbol, timeframe='15m', limit=100)
df = pd.DataFrame(bars, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df['sma20_15'] = df['close'].rolling(20).mean()
df['bp_1'] = df['sma20_15'] * 1.001
df['bp_2'] = df['sma20_15'] * 0.997
df['sp_1'] = df['sma20_15'] * 0.999
df['sp_2'] = df['sma20_15'] * 1.003
return df
def open_positions():
positions = exchange.fetch_positions([symbol])
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
side = pos['side']
size = pos['contracts']
is_long = side == 'long'
return positions, True, size, is_long
return [], False, 0, None
def kill_switch():
print('Starting kill switch...')
while True:
_, has_position, position_size, is_long = open_positions()
if not has_position:
break
exchange.cancel_all_orders(symbol)
ask, bid = ask_bid()
position_size = float(position_size)
if is_long:
exchange.create_limit_sell_order(symbol, position_size, ask, params)
print(f'SELL to close: {position_size} {symbol} at {ask}')
else:
exchange.create_limit_buy_order(symbol, position_size, bid, params)
print(f'BUY to close: {position_size} {symbol} at {bid}')
time.sleep(30)
def sleep_on_close():
closed_orders = exchange.fetch_closed_orders(symbol)
for order in reversed(closed_orders):
if order['status'] == 'closed':
order_time = int(order['timestamp'] / 1000)
current_time = int(exchange.fetch_order_book(symbol)['timestamp'] / 1000)
time_since_trade = (current_time - order_time) / 60
if time_since_trade < 59:
print(f'Recent trade {time_since_trade:.1f}m ago, sleeping 60s...')
time.sleep(60)
else:
print(f'Last trade {time_since_trade:.1f}m ago, no sleep needed')
break
print('Sleep check complete')
def ob():
print('Analyzing order book volume...')
volume_data = []
for i in range(11):
orderbook = exchange.fetch_order_book(symbol)
bid_volume = sum([bid[1] for bid in orderbook['bids']])
ask_volume = sum([ask[1] for ask in orderbook['asks']])
volume_data.append({'bid_vol': bid_volume, 'ask_vol': ask_volume})
if i < 10:
time.sleep(5)
print(f'Sample {i+1}: Bid: {bid_volume}, Ask: {ask_volume}')
df = pd.DataFrame(volume_data)
total_bid_vol = df['bid_vol'].sum()
total_ask_vol = df['ask_vol'].sum()
print(f'Total volumes - Bid: {total_bid_vol}, Ask: {total_ask_vol}')
if total_bid_vol > total_ask_vol:
control_ratio = total_ask_vol / total_bid_vol
print(f'Bulls in control: {control_ratio:.3f}')
else:
control_ratio = total_bid_vol / total_ask_vol
print(f'Bears in control: {control_ratio:.3f}')
_, has_position, _, is_long = open_positions()
if has_position:
volume_under_threshold = control_ratio < vol_decimal
position_type = 'long' if is_long else 'short'
print(f'In {position_type} position. Volume under threshold: {volume_under_threshold}')
return volume_under_threshold
else:
print('Not in position')
return None
def pnl_close():
print('Checking PnL...')
positions = exchange.fetch_positions([symbol])
position = None
for pos in positions:
if pos['symbol'] == symbol and float(pos['contracts']) != 0:
position = pos
break
if position is None:
print('No position open')
return False, False, 0, None
side = position['side']
size = position['contracts']
entry_price = float(position['entryPrice'])
leverage = float(position['leverage'])
is_long = side == 'long'
_, current_price = ask_bid()
if is_long:
price_diff = current_price - entry_price
else:
price_diff = entry_price - current_price
try:
pnl_percent = ((price_diff / entry_price) * leverage) * 100
except:
pnl_percent = 0
print(f'PnL: {pnl_percent:.2f}% (Entry: {entry_price}, Current: {current_price})')
if pnl_percent > target:
print('Target hit! Checking volume...')
volume_under_threshold = ob()
if volume_under_threshold:
print(f'Volume too low, waiting 30s...')
time.sleep(30)
else:
print('Closing profitable position!')
kill_switch()
return True, True, size, is_long
elif pnl_percent <= max_loss:
print('Max loss hit! Closing position.')
kill_switch()
return True, True, size, is_long
elif pnl_percent > 0:
print('In profit but target not reached')
else:
print('In loss but within acceptable range')
# SMA stop loss check
if True:
df_15m = f15_sma()
sma_15m = df_15m.iloc[-1]['sma20_15']
stop_loss_level = sma_15m * 1.008
print(f'SMA stop level: {stop_loss_level}')
return False, True, size, is_long
def bot():
pnl_close()
sleep_on_close()
df_daily = daily_sma()
df_15m = f15_sma()
ask, bid = ask_bid()
signal = df_daily.iloc[-1]['sig']
open_size = pos_size / 2
_, in_position, current_size, _ = open_positions()
current_size = float(current_size) if current_size else 0
current_price = bid
sma_15m = df_15m.iloc[-1]['sma20_15']
if not in_position and current_size < pos_size:
exchange.cancel_all_orders(symbol)
if signal == 'BUY' and current_price > sma_15m:
print('Making BUY orders')
bp_1 = df_15m.iloc[-1]['bp_1']
bp_2 = df_15m.iloc[-1]['bp_2']
print(f'Buy prices: {bp_1}, {bp_2}')
exchange.create_limit_buy_order(symbol, open_size, bp_1, params)
exchange.create_limit_buy_order(symbol, open_size, bp_2, params)
print('Orders placed, sleeping 2 minutes...')
time.sleep(120)
elif signal == 'SELL' and current_price < sma_15m:
print('Making SELL orders')
sp_1 = df_15m.iloc[-1]['sp_1']
sp_2 = df_15m.iloc[-1]['sp_2']
print(f'Sell prices: {sp_1}, {sp_2}')
exchange.create_limit_sell_order(symbol, open_size, sp_1, params)
exchange.create_limit_sell_order(symbol, open_size, sp_2, params)
print('Orders placed, sleeping 2 minutes...')
time.sleep(120)
else:
print('Price not aligned with SMA, sleeping 10 minutes...')
time.sleep(600)
else:
print('Already in position or at size limit')
schedule.every(28).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'ERROR: {e}. Sleeping 30 seconds...')
time.sleep(30)

View file

@ -0,0 +1,72 @@
'''
Stochastic RSI + Nadarya Strategy Overview:
Entry Signals:
- LONG: Nadarya buy signal OR Stochastic RSI shows oversold (below 10)
- SHORT: Nadarya sell signal OR Stochastic RSI shows overbought (above 90)
Exit Signals:
- Close LONG: Nadarya sell signal OR Stochastic RSI overbought 2+ times
- Close SHORT: Nadarya buy signal OR Stochastic RSI oversold 2+ times
Uses dual confirmation system combining trend-following (Nadarya) with
momentum oscillator (Stochastic RSI) for improved entry/exit timing.
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt, schedule
from Functions.functions import *
import time
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
rsi_targets = [10, 90]
rsi_window = 14
timeframe = '1h'
symbol = 'ETHUSDT'
size = 1000
params = {'timeInForce': 'GTC'}
def bot():
position_info, in_position, long = get_position(exchange, symbol)
candles = get_candle_df(exchange, symbol, timeframe)
nadarya_buy_signal, nadarya_sell_signal = calc_nadarya(candles)
calc_stoch_rsi(candles)
bid = exchange.fetch_ticker(symbol)['bid']
if not in_position:
if nadarya_buy_signal or is_oversold(candles['stoch_rsi'], rsi_window, 1, rsi_targets[0]):
exchange.create_limit_buy_order(symbol, size, bid, params)
elif nadarya_sell_signal or is_overbought(candles['stoch_rsi'], rsi_window, 1, rsi_targets[1]):
exchange.create_limit_sell_order(symbol, size, bid, params)
elif in_position:
if long:
if nadarya_sell_signal or is_overbought(candles['stoch_rsi'], rsi_window, times=2, target=rsi_targets[1]):
close_position(exchange, symbol)
else:
if nadarya_buy_signal or is_oversold(candles['stoch_rsi'], rsi_window, times=2, target=rsi_targets[0]):
close_position(exchange, symbol)
schedule.every(1).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'ERROR RUNNING BOT: {e}. Sleeping 30 seconds...')
time.sleep(30)

84
Session_05/bots/Turtle.py Normal file
View file

@ -0,0 +1,84 @@
'''
Turtle Strategy Overview:
- Works on multiple timeframes: 1m, 5m, 15m, 1h, 4h
- Enter LONG when 55-bar high is broken with upward momentum (limit order at bid)
- Enter SHORT when 55-bar low is broken with downward momentum (limit order at bid)
- Only trades during market hours: 9:30 AM - 4:00 PM EST, Monday-Friday
- Exit before 4:00 PM on Fridays if still in position
Exit Conditions:
- Take Profit: 0.2% gain from entry price
- Stop Loss: 2x ATR (Average True Range) from entry price
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt, time, schedule
from Functions.functions import *
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
timeframe = '1m'
symbol = 'ETHUSDT'
size = 1
params = {'timeInForce': 'GTC'}
take_profit_percent = 0.2
def bot():
if in_timeframe():
position_info, in_position, long = get_position(exchange, symbol)
candles = get_candle_df(exchange, symbol, timeframe, limit=55)
ticker = exchange.fetch_ticker(symbol)
bid = ticker['bid']
open_price = ticker['open']
if not in_position:
min_price = candles['low'].min()
max_price = candles['high'].max()
if bid <= min_price and open_price > min_price:
exchange.create_limit_sell_order(symbol, size, bid, params)
elif bid >= max_price and open_price < max_price:
exchange.create_limit_buy_order(symbol, size, bid, params)
elif in_position:
calc_atr(candles)
entry_price = float(position_info['entryPrice'])
atr = candles['ATR'].iloc[-1]
if long:
take_profit_price = entry_price * (1 + (take_profit_percent / 100))
stop_loss_price = entry_price - (atr * 2)
else:
take_profit_price = entry_price * (1 - (take_profit_percent / 100))
stop_loss_price = entry_price + (atr * 2)
if hit_target(bid, take_profit_price, stop_loss_price, long):
close_position(exchange, symbol)
elif end_of_trading_week():
close_position(exchange, symbol)
schedule.every(60).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'ERROR RUNNING BOT: {e}. Sleeping 30 seconds...')
time.sleep(30)