1
Fork 0

initial trading bot

This commit is contained in:
rizary 2025-07-15 08:17:22 +07:00
parent b114d29716
commit a870b88581
Signed by untrusted user who does not match committer: rizary
GPG key ID: 2CE8D69D02F1CEB5
14 changed files with 4727 additions and 0 deletions

73
trading_bot/Dockerfile Normal file
View file

@ -0,0 +1,73 @@
# Trading Bot Dockerfile
# Multi-stage build for optimized production image
FROM python:3.11-slim as builder
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PIP_NO_CACHE_DIR=1
ENV PIP_DISABLE_PIP_VERSION_CHECK=1
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# Create and set working directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Production stage
FROM python:3.11-slim as production
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# Install minimal runtime dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user for security
RUN groupadd -r tradingbot && useradd -r -g tradingbot tradingbot
# Create application directory
WORKDIR /app
# Copy Python packages from builder stage
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application code
COPY --chown=tradingbot:tradingbot . .
# Create necessary directories
RUN mkdir -p /app/logs /app/data && \
chown -R tradingbot:tradingbot /app/logs /app/data
# Create volume mount points
VOLUME ["/app/logs", "/app/data"]
# Switch to non-root user
USER tradingbot
# Expose port (if needed for monitoring)
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1
# Default command
CMD ["python", "main.py"]

348
trading_bot/README.md Normal file
View file

@ -0,0 +1,348 @@
# 🤖 Binance Trading Bot
Advanced automated cryptocurrency trading bot for Binance exchange with real-time WebSocket connectivity, technical indicators, and comprehensive risk management.
## 🚀 Features
### Core Trading Features
- **Real-time Data**: WebSocket connection to Binance for live market data
- **Technical Indicators**: Bollinger Bands, Moving Averages (SMA/EMA), RSI, MACD
- **Multiple Strategies**: Bollinger Bands + MA, Mean Reversion, MA Crossover
- **Automated Execution**: Buy/sell orders with configurable position sizing
- **Paper Trading**: Safe testing mode without real money
### Risk Management
- **Stop-Loss & Take-Profit**: Configurable risk/reward ratios
- **Position Sizing**: Dynamic position sizing based on risk parameters
- **Daily Loss Limits**: Automatic trading halt on daily loss thresholds
- **Maximum Drawdown Protection**: Portfolio-level risk controls
- **Emergency Stop**: Manual and automatic trading halt capabilities
### Monitoring & Analytics
- **Performance Tracking**: Comprehensive trade and portfolio analytics
- **Real-time Logging**: Structured logging with multiple log levels
- **Health Monitoring**: WebSocket connection health and reconnection logic
- **Trade History**: Persistent storage of all trading activities
### Deployment & Operations
- **Docker Support**: Containerized deployment with docker-compose
- **Environment Configuration**: Flexible configuration via environment variables
- **Database Integration**: SQLite and PostgreSQL support
- **Optional Monitoring**: Prometheus and Grafana integration
## 📋 Prerequisites
- Docker and Docker Compose (recommended)
- OR Python 3.11+ with pip
- Binance account with API keys
- Basic understanding of cryptocurrency trading
## 🛠️ Quick Setup (Docker - Recommended)
### 1. Clone and Setup
```bash
git clone <repository-url>
cd trading_bot
# Copy and configure environment file
cp env_example .env
nano .env # Edit with your configuration
```
### 2. Configure Environment
Edit `.env` file with your settings:
```bash
# Essential Settings
BINANCE_API_KEY=your_api_key_here
BINANCE_SECRET_KEY=your_secret_key_here
BINANCE_TESTNET=true # Start with testnet!
# Trading Configuration
TRADING_SYMBOL=BTCUSDT
POSITION_SIZE=0.001
PAPER_TRADING=true # Keep this true for testing!
ENABLE_TRADING=false # Set to true only when ready
# Risk Management
STOP_LOSS_PERCENT=2.0
TAKE_PROFIT_PERCENT=3.0
MAX_DAILY_LOSS=5.0
```
### 3. Start the Bot
```bash
# Start in paper trading mode
docker-compose up -d
# View logs
docker-compose logs -f trading-bot
# Stop the bot
docker-compose down
```
## 🐍 Manual Setup (Python)
### 1. Setup Python Environment with pyenv
#### Install pyenv (if not already installed)
**On Linux/macOS:**
```bash
# Install pyenv
curl https://pyenv.run | bash
# Add to your shell profile (~/.bashrc, ~/.zshrc, etc.)
export PYENV_ROOT="$HOME/.pyenv"
export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
# Reload your shell
source ~/.bashrc # or ~/.zshrc
```
**On Windows:**
```powershell
# Install pyenv-win using git
git clone https://github.com/pyenv-win/pyenv-win.git %USERPROFILE%\.pyenv
# Add to your PATH (in PowerShell)
$env:PYENV = "$env:USERPROFILE\.pyenv\pyenv-win"
$env:PATH = "$env:PYENV\bin;$env:PYENV\shims;$env:PATH"
# Or set permanently in system environment variables
```
#### Install and Configure Python
```bash
# Install Python 3.11 (recommended)
pyenv install 3.11.0
# Create virtual environment for the trading bot
pyenv virtualenv 3.11.0 trading-bot-env
# Activate the environment
pyenv activate trading-bot-env
# OR set it as local environment for this directory
pyenv local trading-bot-env
```
### 2. Install Dependencies
```bash
# Make sure you're in the trading bot environment
pyenv activate trading-bot-env # if not already activated
# Install required packages
pip install -r requirements.txt
```
### 3. Configure Environment
```bash
cp env_example .env
# Edit .env with your configuration
```
### 4. Run the Bot
```bash
# Ensure environment is activated
pyenv activate trading-bot-env
# Run the trading bot
python main.py
```
### 5. Deactivate Environment (when done)
```bash
pyenv deactivate
```
## 📊 Configuration Guide
### Trading Strategies
#### 1. Bollinger Bands + Moving Average (Default)
```bash
STRATEGY=BOLLINGER_MA
BOLLINGER_PERIOD=20
BOLLINGER_STD=2.0
MA_PERIOD=50
```
- Buys when price touches lower Bollinger Band and is above MA
- Sells when price touches upper Bollinger Band and is below MA
#### 2. Bollinger Bands Only
```bash
STRATEGY=BOLLINGER_ONLY
```
- Pure mean reversion strategy
- Buys at lower band, sells at upper band
#### 3. Moving Average Crossover
```bash
STRATEGY=MA_CROSSOVER
```
- Buys on golden cross (fast MA crosses above slow MA)
- Sells on death cross (fast MA crosses below slow MA)
#### 4. Mean Reversion
```bash
STRATEGY=MEAN_REVERSION
```
- Combines MA, RSI, and price distance from mean
- Buys oversold conditions, sells overbought conditions
### Risk Management Configuration
```bash
# Position Sizing
POSITION_SIZE=0.001 # Size per trade
MAX_POSITIONS=1 # Maximum concurrent positions
# Risk Limits
STOP_LOSS_PERCENT=2.0 # 2% stop loss
TAKE_PROFIT_PERCENT=3.0 # 3% take profit
MAX_DAILY_LOSS=5.0 # 5% daily loss limit
MAX_PORTFOLIO_RISK=10.0 # 10% max portfolio exposure
```
## 🔧 Advanced Configuration
### WebSocket Settings
```bash
WS_RECONNECT_DELAY=5 # Reconnection delay in seconds
WS_PING_INTERVAL=20 # Ping interval for connection health
WS_TIMEOUT=30 # WebSocket timeout
```
### Logging Configuration
```bash
LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_FILE=logs/trading_bot.log
LOG_MAX_SIZE=10485760 # 10MB log rotation
LOG_BACKUP_COUNT=5 # Keep 5 backup files
```
### Database Options
```bash
# SQLite (default)
DATABASE_URL=sqlite:///data/trading_bot.db
# PostgreSQL (advanced)
DATABASE_URL=postgresql://user:password@localhost:5432/trading_bot
```
## 📈 Monitoring
### Health Checks
- WebSocket connection status
- Order execution monitoring
- Risk limit monitoring
- Performance tracking
### Log Analysis
```bash
# View real-time logs
docker-compose logs -f trading-bot
# Search for specific events
docker-compose logs trading-bot | grep "SIGNAL"
docker-compose logs trading-bot | grep "TRADE"
docker-compose logs trading-bot | grep "ERROR"
```
### Performance Metrics
The bot tracks:
- Total trades and win rate
- PnL and portfolio value
- Risk metrics and drawdown
- Trade execution statistics
## 🚨 Safety Guidelines
### Before Live Trading
1. **Test Extensively**: Use paper trading for weeks/months
2. **Verify Strategy**: Backtest your strategy on historical data
3. **Start Small**: Begin with minimal position sizes
4. **Monitor Closely**: Watch the bot's behavior carefully
5. **Have Exit Plan**: Know how to stop the bot quickly
### API Key Security
1. **Read-Only Testing**: Start with read-only API keys
2. **IP Restrictions**: Restrict API keys to your IP address
3. **No Withdrawals**: Never enable withdrawal permissions
4. **Regular Rotation**: Rotate API keys regularly
### Risk Management
1. **Daily Limits**: Set and respect daily loss limits
2. **Position Sizing**: Never risk more than you can afford to lose
3. **Stop Losses**: Always use stop-loss orders
4. **Diversification**: Don't put all funds in one strategy
## 🐛 Troubleshooting
### Common Issues
#### WebSocket Connection Problems
```bash
# Check network connectivity
docker-compose logs trading-bot | grep "WebSocket"
# Restart WebSocket connection
docker-compose restart trading-bot
```
#### API Permission Errors
```bash
# Verify API key permissions
# Ensure API key has spot trading enabled (if not paper trading)
# Check IP restrictions
```
#### High Memory Usage
```bash
# Check container resources
docker stats trading-bot
# Adjust log levels
LOG_LEVEL=WARNING # In .env file
```
### Log Levels
- **DEBUG**: Detailed technical information
- **INFO**: General operational information
- **WARNING**: Important warnings and issues
- **ERROR**: Error conditions that need attention
- **CRITICAL**: Critical errors requiring immediate action
## 📝 Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests if applicable
5. Submit a pull request
## 🔗 Related Projects
- [CCXT](https://github.com/ccxt/ccxt) - Cryptocurrency trading library
- [pandas-ta](https://github.com/twopirllc/pandas-ta) - Technical analysis indicators
- [Binance API](https://binance-docs.github.io/apidocs/) - Official Binance API documentation
## 📄 License
This project is for educational purposes only. Use at your own risk.
## 🆘 Support
For questions and support:
1. Check the troubleshooting section
2. Review the logs for error messages
3. Ensure your configuration is correct
4. Test with paper trading first
## ⚠️ Final Warning
**Cryptocurrency trading is extremely risky. This bot is provided as-is for educational purposes. The developers are not responsible for any financial losses. Always trade responsibly and never invest more than you can afford to lose.**

682
trading_bot/bot_core/bot.py Normal file
View file

@ -0,0 +1,682 @@
"""
Main Trading Bot
Orchestrates all components including WebSocket, indicators, orders, and risk management
"""
import asyncio
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
import ccxt
import pandas as pd
import numpy as np
from .config import Config
from .logger import TradingLogger
from .websocket_manager import WebSocketManager
from .indicators import IndicatorManager
from .order_manager import OrderManager
from .risk_manager import RiskManager
from .position_manager import PositionManager
from .performance_tracker import PerformanceTracker
@dataclass
class TradingSignal:
"""Trading signal data structure"""
symbol: str
signal_type: str # 'BUY', 'SELL', 'HOLD'
strength: float # Signal strength 0-1
price: float
timestamp: datetime
indicators: Dict[str, Any] = field(default_factory=dict)
reason: str = ""
class TradingBot:
"""
Main trading bot class that orchestrates all components
"""
def __init__(self, config: Config, logger: TradingLogger):
self.config = config
self.logger = logger
# Core components
self.exchange: ccxt.Exchange = None
self.websocket_manager: WebSocketManager = None
self.indicator_manager: IndicatorManager = None
self.order_manager: OrderManager = None
self.risk_manager: RiskManager = None
self.position_manager: PositionManager = None
self.performance_tracker: PerformanceTracker = None
# Bot state
self.running = False
self.initialized = False
self.last_signal: Optional[TradingSignal] = None
self.market_data: Dict[str, Any] = {}
self.current_price: float = 0.0
# Performance tracking
self.trade_count = 0
self.successful_trades = 0
self.failed_trades = 0
self.total_pnl = 0.0
self.session_start = datetime.now()
# Data storage
self.price_history: List[Dict] = []
self.signal_history: List[TradingSignal] = []
# Async tasks
self.tasks: List[asyncio.Task] = []
async def initialize(self):
"""Initialize all bot components"""
if self.initialized:
return
self.logger.info("🔧 Initializing trading bot components...")
try:
# Initialize exchange
await self._initialize_exchange()
# Initialize components
await self._initialize_components()
# Validate configuration
await self._validate_setup()
self.initialized = True
self.logger.info("✅ Bot initialization completed successfully")
except Exception as e:
self.logger.error(f"❌ Bot initialization failed: {e}")
raise
async def _initialize_exchange(self):
"""Initialize CCXT exchange connection"""
try:
self.logger.info("🔌 Initializing Binance exchange connection...")
# Create exchange instance
exchange_config = self.config.get_binance_config()
self.exchange = ccxt.binance(exchange_config)
# Test connection
if not self.config.is_paper_trading():
await self.exchange.load_markets()
balance = await self.exchange.fetch_balance()
self.logger.info(f"✅ Exchange connected. Available balance: {balance.get('USDT', {}).get('free', 0)} USDT")
else:
self.logger.info("📄 Paper trading mode - Exchange connection simulated")
except Exception as e:
self.logger.error(f"❌ Exchange initialization failed: {e}")
raise
async def _initialize_components(self):
"""Initialize all bot components"""
# WebSocket manager for real-time data
self.websocket_manager = WebSocketManager(
self.config,
self.logger,
self._on_websocket_message
)
# Indicator manager for technical analysis
self.indicator_manager = IndicatorManager(
self.config,
self.logger
)
# Order manager for trade execution
self.order_manager = OrderManager(
self.config,
self.logger,
self.exchange
)
# Risk manager for position sizing and risk control
self.risk_manager = RiskManager(
self.config,
self.logger
)
# Position manager for tracking positions
self.position_manager = PositionManager(
self.config,
self.logger,
self.exchange
)
# Performance tracker for metrics
self.performance_tracker = PerformanceTracker(
self.config,
self.logger
)
self.logger.info("✅ All components initialized")
async def _validate_setup(self):
"""Validate bot setup and configuration"""
# Check symbol exists
if not self.config.is_paper_trading():
markets = await self.exchange.load_markets()
if self.config.SYMBOL not in markets:
raise ValueError(f"Symbol {self.config.SYMBOL} not found in markets")
# Validate minimum balance for trading
if not self.config.is_paper_trading() and self.config.ENABLE_TRADING:
balance = await self.exchange.fetch_balance()
usdt_balance = balance.get('USDT', {}).get('free', 0)
min_balance = self.config.POSITION_SIZE * self.current_price * 1.1 # 10% buffer
if usdt_balance < min_balance:
self.logger.warning(f"⚠️ Low balance: {usdt_balance} USDT (minimum recommended: {min_balance})")
self.logger.info("✅ Setup validation completed")
async def start(self):
"""Start the trading bot"""
if not self.initialized:
await self.initialize()
if self.running:
self.logger.warning("⚠️ Bot is already running")
return
self.running = True
self.logger.info("🚀 Starting trading bot...")
try:
# Start WebSocket connection
await self.websocket_manager.start()
# Start background tasks
self.tasks = [
asyncio.create_task(self._trading_loop()),
asyncio.create_task(self._monitoring_loop()),
asyncio.create_task(self._risk_monitoring_loop()),
asyncio.create_task(self._performance_update_loop())
]
# Wait for all tasks
await asyncio.gather(*self.tasks)
except Exception as e:
self.logger.error(f"❌ Bot encountered error: {e}")
await self.stop()
raise
async def stop(self):
"""Stop the trading bot gracefully"""
if not self.running:
return
self.logger.info("🛑 Stopping trading bot...")
self.running = False
# Cancel all tasks
for task in self.tasks:
task.cancel()
# Stop WebSocket
if self.websocket_manager:
await self.websocket_manager.stop()
# Cancel all open orders
if self.order_manager:
await self.order_manager.cancel_all_orders()
# Close all positions if configured
if self.position_manager and self.config.ENABLE_TRADING:
await self.position_manager.close_all_positions()
# Save performance data
if self.performance_tracker:
await self.performance_tracker.save_session_data()
self.logger.info("✅ Bot stopped successfully")
async def _trading_loop(self):
"""Main trading loop"""
self.logger.info("🎯 Starting trading loop...")
while self.running:
try:
# Check if we have sufficient data
if not self.market_data or not self.current_price:
await asyncio.sleep(1)
continue
# Generate trading signal
signal = await self._generate_trading_signal()
if signal:
self.last_signal = signal
self.signal_history.append(signal)
# Execute signal if conditions are met
await self._execute_signal(signal)
# Update positions
await self.position_manager.update_positions()
# Check for position exits
await self._check_position_exits()
# Brief pause before next iteration
await asyncio.sleep(1)
except Exception as e:
self.logger.error(f"❌ Error in trading loop: {e}")
await asyncio.sleep(5)
async def _monitoring_loop(self):
"""Monitor bot health and performance"""
self.logger.info("📊 Starting monitoring loop...")
while self.running:
try:
# Log current status
positions = await self.position_manager.get_positions()
open_orders = await self.order_manager.get_open_orders()
status = {
'current_price': self.current_price,
'positions': len(positions),
'open_orders': len(open_orders),
'last_signal': self.last_signal.signal_type if self.last_signal else 'NONE'
}
self.logger.debug(f"📈 Status: {status}")
# Check for stale data
if self.market_data:
last_update = self.market_data.get('timestamp', 0)
if time.time() - last_update > 60: # 1 minute
self.logger.warning("⚠️ Market data is stale, checking connection...")
await self.websocket_manager.reconnect()
await asyncio.sleep(30) # Monitor every 30 seconds
except Exception as e:
self.logger.error(f"❌ Error in monitoring loop: {e}")
await asyncio.sleep(10)
async def _risk_monitoring_loop(self):
"""Monitor risk limits and safety checks"""
self.logger.info("🛡️ Starting risk monitoring loop...")
while self.running:
try:
# Check daily loss limit
daily_pnl = await self.performance_tracker.get_daily_pnl()
if daily_pnl < -self.config.MAX_DAILY_LOSS:
self.logger.warning(f"⚠️ Daily loss limit exceeded: {daily_pnl:.2f}%")
await self.position_manager.close_all_positions()
await self.order_manager.cancel_all_orders()
# Check maximum positions
positions = await self.position_manager.get_positions()
if len(positions) > self.config.MAX_POSITIONS:
self.logger.warning(f"⚠️ Maximum positions exceeded: {len(positions)}")
# Check for stuck orders
await self.order_manager.check_stuck_orders()
await asyncio.sleep(60) # Check every minute
except Exception as e:
self.logger.error(f"❌ Error in risk monitoring: {e}")
await asyncio.sleep(30)
async def _performance_update_loop(self):
"""Update performance metrics"""
self.logger.info("📈 Starting performance update loop...")
while self.running:
try:
# Update performance metrics
await self.performance_tracker.update_metrics()
# Log performance summary
if self.trade_count > 0:
win_rate = (self.successful_trades / self.trade_count) * 100
self.logger.performance({
'total_trades': self.trade_count,
'win_rate': f"{win_rate:.1f}%",
'total_pnl': f"{self.total_pnl:.2f}",
'uptime': str(datetime.now() - self.session_start)
})
await asyncio.sleep(self.config.PERFORMANCE_UPDATE_INTERVAL)
except Exception as e:
self.logger.error(f"❌ Error in performance update: {e}")
await asyncio.sleep(30)
async def _generate_trading_signal(self) -> Optional[TradingSignal]:
"""Generate trading signal based on indicators"""
try:
# Get latest price data
if len(self.price_history) < self.config.LOOKBACK_PERIODS:
return None
# Create DataFrame from price history
df = pd.DataFrame(self.price_history[-self.config.LOOKBACK_PERIODS:])
# Calculate indicators
indicators = await self.indicator_manager.calculate_indicators(df)
# Generate signal based on strategy
signal = await self._evaluate_strategy(indicators)
if signal:
self.logger.signal(signal.signal_type, signal.symbol, {
'price': signal.price,
'strength': signal.strength,
'indicators': signal.indicators
})
return signal
except Exception as e:
self.logger.error(f"❌ Error generating signal: {e}")
return None
async def _evaluate_strategy(self, indicators: Dict[str, Any]) -> Optional[TradingSignal]:
"""Evaluate trading strategy based on indicators"""
try:
if self.config.STRATEGY == 'BOLLINGER_MA':
return await self._bollinger_ma_strategy(indicators)
elif self.config.STRATEGY == 'BOLLINGER_ONLY':
return await self._bollinger_only_strategy(indicators)
elif self.config.STRATEGY == 'MA_CROSSOVER':
return await self._ma_crossover_strategy(indicators)
elif self.config.STRATEGY == 'MEAN_REVERSION':
return await self._mean_reversion_strategy(indicators)
else:
self.logger.warning(f"⚠️ Unknown strategy: {self.config.STRATEGY}")
return None
except Exception as e:
self.logger.error(f"❌ Error evaluating strategy: {e}")
return None
async def _bollinger_ma_strategy(self, indicators: Dict[str, Any]) -> Optional[TradingSignal]:
"""Bollinger Bands + Moving Average strategy"""
bb_upper = indicators.get('bb_upper')
bb_lower = indicators.get('bb_lower')
bb_middle = indicators.get('bb_middle')
ma_value = indicators.get('ma_value')
if not all([bb_upper, bb_lower, bb_middle, ma_value]):
return None
current_price = self.current_price
# Buy signal: Price near lower band and above MA
if current_price <= bb_lower * 1.01 and current_price > ma_value:
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='BUY',
strength=0.8,
price=current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Price at lower Bollinger Band and above MA"
)
# Sell signal: Price near upper band and below MA
elif current_price >= bb_upper * 0.99 and current_price < ma_value:
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='SELL',
strength=0.8,
price=current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Price at upper Bollinger Band and below MA"
)
return None
async def _bollinger_only_strategy(self, indicators: Dict[str, Any]) -> Optional[TradingSignal]:
"""Bollinger Bands only strategy"""
bb_upper = indicators.get('bb_upper')
bb_lower = indicators.get('bb_lower')
bb_squeeze = indicators.get('bb_squeeze', False)
if not all([bb_upper, bb_lower]):
return None
current_price = self.current_price
# Buy signal: Price touches lower band
if current_price <= bb_lower:
strength = 0.9 if bb_squeeze else 0.7
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='BUY',
strength=strength,
price=current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Price at lower Bollinger Band"
)
# Sell signal: Price touches upper band
elif current_price >= bb_upper:
strength = 0.9 if bb_squeeze else 0.7
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='SELL',
strength=strength,
price=current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Price at upper Bollinger Band"
)
return None
async def _ma_crossover_strategy(self, indicators: Dict[str, Any]) -> Optional[TradingSignal]:
"""Moving Average crossover strategy"""
ma_fast = indicators.get('ma_fast')
ma_slow = indicators.get('ma_slow')
ma_fast_prev = indicators.get('ma_fast_prev')
ma_slow_prev = indicators.get('ma_slow_prev')
if not all([ma_fast, ma_slow, ma_fast_prev, ma_slow_prev]):
return None
# Golden cross: fast MA crosses above slow MA
if ma_fast > ma_slow and ma_fast_prev <= ma_slow_prev:
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='BUY',
strength=0.8,
price=self.current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Golden cross - fast MA above slow MA"
)
# Death cross: fast MA crosses below slow MA
elif ma_fast < ma_slow and ma_fast_prev >= ma_slow_prev:
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='SELL',
strength=0.8,
price=self.current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Death cross - fast MA below slow MA"
)
return None
async def _mean_reversion_strategy(self, indicators: Dict[str, Any]) -> Optional[TradingSignal]:
"""Mean reversion strategy"""
bb_middle = indicators.get('bb_middle')
rsi = indicators.get('rsi')
if not all([bb_middle, rsi]):
return None
current_price = self.current_price
distance_from_mean = abs(current_price - bb_middle) / bb_middle
# Buy signal: Price below mean and RSI oversold
if current_price < bb_middle and rsi < 30 and distance_from_mean > 0.02:
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='BUY',
strength=0.7,
price=current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Mean reversion - price below mean and RSI oversold"
)
# Sell signal: Price above mean and RSI overbought
elif current_price > bb_middle and rsi > 70 and distance_from_mean > 0.02:
return TradingSignal(
symbol=self.config.SYMBOL,
signal_type='SELL',
strength=0.7,
price=current_price,
timestamp=datetime.now(),
indicators=indicators,
reason="Mean reversion - price above mean and RSI overbought"
)
return None
async def _execute_signal(self, signal: TradingSignal):
"""Execute trading signal"""
try:
# Check if trading is enabled
if not self.config.ENABLE_TRADING:
self.logger.info(f"📄 Paper trading: {signal.signal_type} signal for {signal.symbol}")
return
# Risk checks
if not await self.risk_manager.check_signal_risk(signal):
self.logger.warning(f"⚠️ Signal failed risk check: {signal.signal_type} {signal.symbol}")
return
# Check for existing positions
current_positions = await self.position_manager.get_positions()
if signal.signal_type == 'BUY':
# Don't buy if already long
if any(pos.side == 'long' for pos in current_positions):
self.logger.debug("📈 Already long - skipping BUY signal")
return
# Calculate position size
position_size = await self.risk_manager.calculate_position_size(signal)
# Place buy order
await self.order_manager.place_buy_order(
symbol=signal.symbol,
quantity=position_size,
price=signal.price
)
elif signal.signal_type == 'SELL':
# Don't sell if already short
if any(pos.side == 'short' for pos in current_positions):
self.logger.debug("📉 Already short - skipping SELL signal")
return
# Calculate position size
position_size = await self.risk_manager.calculate_position_size(signal)
# Place sell order
await self.order_manager.place_sell_order(
symbol=signal.symbol,
quantity=position_size,
price=signal.price
)
except Exception as e:
self.logger.error(f"❌ Error executing signal: {e}")
async def _check_position_exits(self):
"""Check if positions should be closed"""
positions = await self.position_manager.get_positions()
for position in positions:
# Check stop loss and take profit
should_close, reason = await self.risk_manager.check_position_exit(position, self.current_price)
if should_close:
await self.position_manager.close_position(position, reason)
# Update trade statistics
if reason.startswith('TAKE_PROFIT'):
self.successful_trades += 1
elif reason.startswith('STOP_LOSS'):
self.failed_trades += 1
async def _on_websocket_message(self, message: Dict[str, Any]):
"""Handle WebSocket messages"""
try:
# Update market data
self.market_data = message
# Extract price information
if 'price' in message:
self.current_price = float(message['price'])
# Store price history
price_data = {
'timestamp': message.get('timestamp', time.time()),
'price': self.current_price,
'volume': message.get('volume', 0)
}
self.price_history.append(price_data)
# Keep only recent data
if len(self.price_history) > self.config.LOOKBACK_PERIODS * 2:
self.price_history = self.price_history[-self.config.LOOKBACK_PERIODS:]
# Update order manager with market data
await self.order_manager.update_market_data(message)
except Exception as e:
self.logger.error(f"❌ Error processing WebSocket message: {e}")
def get_status(self) -> Dict[str, Any]:
"""Get current bot status"""
return {
'running': self.running,
'initialized': self.initialized,
'current_price': self.current_price,
'trade_count': self.trade_count,
'successful_trades': self.successful_trades,
'failed_trades': self.failed_trades,
'total_pnl': self.total_pnl,
'uptime': str(datetime.now() - self.session_start),
'last_signal': self.last_signal.signal_type if self.last_signal else None
}

View file

@ -0,0 +1,198 @@
"""
Configuration Management for Trading Bot
Handles environment variables with validation and defaults
"""
import os
import logging
from typing import Optional, Union
from dataclasses import dataclass, field
from dotenv import load_dotenv
@dataclass
class Config:
"""Trading bot configuration with environment variable support"""
# Load environment variables
def __post_init__(self):
load_dotenv()
self._load_config()
self._validate_config()
def _load_config(self):
"""Load configuration from environment variables"""
# Binance API Configuration
self.BINANCE_API_KEY: str = os.getenv('BINANCE_API_KEY', '')
self.BINANCE_SECRET_KEY: str = os.getenv('BINANCE_SECRET_KEY', '')
self.BINANCE_TESTNET: bool = os.getenv('BINANCE_TESTNET', 'false').lower() == 'true'
# Trading Configuration
self.SYMBOL: str = os.getenv('TRADING_SYMBOL', 'BTCUSDT')
self.POSITION_SIZE: float = float(os.getenv('POSITION_SIZE', '0.001'))
self.LEVERAGE: int = int(os.getenv('LEVERAGE', '1'))
self.MAX_POSITIONS: int = int(os.getenv('MAX_POSITIONS', '1'))
# Risk Management
self.STOP_LOSS_PERCENT: float = float(os.getenv('STOP_LOSS_PERCENT', '2.0'))
self.TAKE_PROFIT_PERCENT: float = float(os.getenv('TAKE_PROFIT_PERCENT', '3.0'))
self.MAX_DAILY_LOSS: float = float(os.getenv('MAX_DAILY_LOSS', '5.0'))
self.MAX_PORTFOLIO_RISK: float = float(os.getenv('MAX_PORTFOLIO_RISK', '10.0'))
# Technical Indicators
self.BOLLINGER_PERIOD: int = int(os.getenv('BOLLINGER_PERIOD', '20'))
self.BOLLINGER_STD: float = float(os.getenv('BOLLINGER_STD', '2.0'))
self.MA_PERIOD: int = int(os.getenv('MA_PERIOD', '50'))
self.MA_TYPE: str = os.getenv('MA_TYPE', 'SMA') # SMA, EMA
# Trading Strategy
self.STRATEGY: str = os.getenv('STRATEGY', 'BOLLINGER_MA')
self.TIMEFRAME: str = os.getenv('TIMEFRAME', '1m')
self.LOOKBACK_PERIODS: int = int(os.getenv('LOOKBACK_PERIODS', '100'))
# WebSocket Configuration
self.WS_RECONNECT_DELAY: int = int(os.getenv('WS_RECONNECT_DELAY', '5'))
self.WS_PING_INTERVAL: int = int(os.getenv('WS_PING_INTERVAL', '20'))
self.WS_TIMEOUT: int = int(os.getenv('WS_TIMEOUT', '30'))
# Logging Configuration
self.LOG_LEVEL: str = os.getenv('LOG_LEVEL', 'INFO')
self.LOG_FILE: str = os.getenv('LOG_FILE', 'logs/trading_bot.log')
self.LOG_MAX_SIZE: int = int(os.getenv('LOG_MAX_SIZE', '10485760')) # 10MB
self.LOG_BACKUP_COUNT: int = int(os.getenv('LOG_BACKUP_COUNT', '5'))
# Database Configuration (for tracking)
self.DATABASE_URL: str = os.getenv('DATABASE_URL', 'sqlite:///trading_bot.db')
self.TRACK_PERFORMANCE: bool = os.getenv('TRACK_PERFORMANCE', 'true').lower() == 'true'
# Notification Configuration
self.TELEGRAM_BOT_TOKEN: str = os.getenv('TELEGRAM_BOT_TOKEN', '')
self.TELEGRAM_CHAT_ID: str = os.getenv('TELEGRAM_CHAT_ID', '')
self.ENABLE_NOTIFICATIONS: bool = os.getenv('ENABLE_NOTIFICATIONS', 'false').lower() == 'true'
# Safety Features
self.PAPER_TRADING: bool = os.getenv('PAPER_TRADING', 'true').lower() == 'true'
self.ENABLE_TRADING: bool = os.getenv('ENABLE_TRADING', 'false').lower() == 'true'
self.TRADING_HOURS_START: str = os.getenv('TRADING_HOURS_START', '00:00')
self.TRADING_HOURS_END: str = os.getenv('TRADING_HOURS_END', '23:59')
# Performance Monitoring
self.PERFORMANCE_UPDATE_INTERVAL: int = int(os.getenv('PERFORMANCE_UPDATE_INTERVAL', '60'))
self.SAVE_TRADES_TO_FILE: bool = os.getenv('SAVE_TRADES_TO_FILE', 'true').lower() == 'true'
self.TRADES_FILE_PATH: str = os.getenv('TRADES_FILE_PATH', 'data/trades.json')
# Advanced Features
self.ENABLE_VOLUME_ANALYSIS: bool = os.getenv('ENABLE_VOLUME_ANALYSIS', 'false').lower() == 'true'
self.VOLUME_THRESHOLD: float = float(os.getenv('VOLUME_THRESHOLD', '1.5'))
self.ENABLE_SENTIMENT_ANALYSIS: bool = os.getenv('ENABLE_SENTIMENT_ANALYSIS', 'false').lower() == 'true'
# Exchange Rate Limits
self.RATE_LIMIT_REQUESTS: int = int(os.getenv('RATE_LIMIT_REQUESTS', '1200'))
self.RATE_LIMIT_INTERVAL: int = int(os.getenv('RATE_LIMIT_INTERVAL', '60'))
# Backtesting Configuration
self.BACKTEST_START_DATE: str = os.getenv('BACKTEST_START_DATE', '2024-01-01')
self.BACKTEST_END_DATE: str = os.getenv('BACKTEST_END_DATE', '2024-12-31')
self.BACKTEST_INITIAL_CAPITAL: float = float(os.getenv('BACKTEST_INITIAL_CAPITAL', '10000'))
def _validate_config(self):
"""Validate critical configuration parameters"""
# Validate API keys if not in paper trading mode
if not self.PAPER_TRADING and self.ENABLE_TRADING:
if not self.BINANCE_API_KEY:
raise ValueError("BINANCE_API_KEY is required for live trading")
if not self.BINANCE_SECRET_KEY:
raise ValueError("BINANCE_SECRET_KEY is required for live trading")
# Validate risk parameters
if self.STOP_LOSS_PERCENT <= 0:
raise ValueError("STOP_LOSS_PERCENT must be positive")
if self.TAKE_PROFIT_PERCENT <= 0:
raise ValueError("TAKE_PROFIT_PERCENT must be positive")
if self.POSITION_SIZE <= 0:
raise ValueError("POSITION_SIZE must be positive")
# Validate indicator parameters
if self.BOLLINGER_PERIOD <= 0:
raise ValueError("BOLLINGER_PERIOD must be positive")
if self.BOLLINGER_STD <= 0:
raise ValueError("BOLLINGER_STD must be positive")
if self.MA_PERIOD <= 0:
raise ValueError("MA_PERIOD must be positive")
# Validate symbol format
if not self.SYMBOL or len(self.SYMBOL) < 3:
raise ValueError("Invalid SYMBOL format")
# Validate timeframe
valid_timeframes = ['1m', '3m', '5m', '15m', '30m', '1h', '2h', '4h', '6h', '8h', '12h', '1d', '3d', '1w', '1M']
if self.TIMEFRAME not in valid_timeframes:
raise ValueError(f"Invalid TIMEFRAME. Must be one of: {valid_timeframes}")
# Validate MA type
valid_ma_types = ['SMA', 'EMA']
if self.MA_TYPE not in valid_ma_types:
raise ValueError(f"Invalid MA_TYPE. Must be one of: {valid_ma_types}")
# Validate strategy
valid_strategies = ['BOLLINGER_MA', 'BOLLINGER_ONLY', 'MA_CROSSOVER', 'MEAN_REVERSION']
if self.STRATEGY not in valid_strategies:
raise ValueError(f"Invalid STRATEGY. Must be one of: {valid_strategies}")
def get_binance_config(self) -> dict:
"""Get Binance exchange configuration"""
config = {
'apiKey': self.BINANCE_API_KEY,
'secret': self.BINANCE_SECRET_KEY,
'enableRateLimit': True,
'options': {
'defaultType': 'spot', # or 'future' for futures
}
}
if self.BINANCE_TESTNET:
config['sandbox'] = True
config['urls'] = {
'api': 'https://testnet.binance.vision/api',
'stream': 'wss://testnet.binance.vision/ws',
}
return config
def get_websocket_config(self) -> dict:
"""Get WebSocket configuration"""
return {
'reconnect_delay': self.WS_RECONNECT_DELAY,
'ping_interval': self.WS_PING_INTERVAL,
'timeout': self.WS_TIMEOUT,
}
def get_risk_config(self) -> dict:
"""Get risk management configuration"""
return {
'stop_loss_percent': self.STOP_LOSS_PERCENT,
'take_profit_percent': self.TAKE_PROFIT_PERCENT,
'max_daily_loss': self.MAX_DAILY_LOSS,
'max_portfolio_risk': self.MAX_PORTFOLIO_RISK,
'max_positions': self.MAX_POSITIONS,
}
def get_indicator_config(self) -> dict:
"""Get technical indicator configuration"""
return {
'bollinger_period': self.BOLLINGER_PERIOD,
'bollinger_std': self.BOLLINGER_STD,
'ma_period': self.MA_PERIOD,
'ma_type': self.MA_TYPE,
'lookback_periods': self.LOOKBACK_PERIODS,
}
def is_paper_trading(self) -> bool:
"""Check if bot is in paper trading mode"""
return self.PAPER_TRADING or not self.ENABLE_TRADING
def __str__(self) -> str:
"""String representation of configuration"""
return f"Config(symbol={self.SYMBOL}, strategy={self.STRATEGY}, paper_trading={self.PAPER_TRADING})"

View file

@ -0,0 +1,668 @@
"""
Technical Indicators Manager
Calculates Bollinger Bands, Moving Averages, and other technical indicators
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from datetime import datetime
from .config import Config
from .logger import TradingLogger
@dataclass
class IndicatorResult:
"""Result container for indicator calculations"""
name: str
value: float
timestamp: datetime
parameters: Dict[str, Any]
metadata: Dict[str, Any] = None
class IndicatorManager:
"""
Manager for technical indicator calculations
"""
def __init__(self, config: Config, logger: TradingLogger):
self.config = config
self.logger = logger
# Indicator cache
self.indicator_cache: Dict[str, Any] = {}
self.last_calculation = None
# Parameters
self.bollinger_period = config.BOLLINGER_PERIOD
self.bollinger_std = config.BOLLINGER_STD
self.ma_period = config.MA_PERIOD
self.ma_type = config.MA_TYPE
self.logger.info(f"📊 Indicator manager initialized with BB({self.bollinger_period}, {self.bollinger_std}) and {self.ma_type}({self.ma_period})")
async def calculate_indicators(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate all configured indicators
Args:
df: DataFrame with OHLCV data
Returns:
Dictionary of indicator values
"""
if df.empty or len(df) < max(self.bollinger_period, self.ma_period):
self.logger.warning("⚠️ Insufficient data for indicator calculation")
return {}
try:
indicators = {}
# Calculate Bollinger Bands
bb_result = self._calculate_bollinger_bands(df)
indicators.update(bb_result)
# Calculate Moving Averages
ma_result = self._calculate_moving_averages(df)
indicators.update(ma_result)
# Calculate RSI
rsi_result = self._calculate_rsi(df)
indicators.update(rsi_result)
# Calculate additional indicators
additional_result = self._calculate_additional_indicators(df)
indicators.update(additional_result)
# Cache results
self.indicator_cache = indicators
self.last_calculation = datetime.now()
# Log indicator values
self.logger.indicator("ALL_INDICATORS", self.config.SYMBOL, indicators)
return indicators
except Exception as e:
self.logger.error(f"❌ Error calculating indicators: {e}")
return {}
def _calculate_bollinger_bands(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate Bollinger Bands
Args:
df: DataFrame with price data
Returns:
Dictionary with BB values
"""
try:
# Use close prices for calculation
close_prices = df['close'] if 'close' in df.columns else df['price']
# Calculate moving average (middle band)
sma = close_prices.rolling(window=self.bollinger_period).mean()
# Calculate standard deviation
std = close_prices.rolling(window=self.bollinger_period).std()
# Calculate upper and lower bands
upper_band = sma + (std * self.bollinger_std)
lower_band = sma - (std * self.bollinger_std)
# Get current values
current_price = close_prices.iloc[-1]
current_upper = upper_band.iloc[-1]
current_lower = lower_band.iloc[-1]
current_middle = sma.iloc[-1]
# Calculate band width and squeeze
band_width = (current_upper - current_lower) / current_middle
band_width_avg = ((upper_band - lower_band) / sma).rolling(window=20).mean().iloc[-1]
# Detect squeeze (bands tightening)
squeeze_threshold = 0.02 # 2% of price
is_squeeze = band_width < squeeze_threshold
# Calculate %B (position within bands)
percent_b = (current_price - current_lower) / (current_upper - current_lower)
# Calculate band position
if current_price > current_upper:
position = "ABOVE_UPPER"
elif current_price < current_lower:
position = "BELOW_LOWER"
elif current_price > current_middle:
position = "UPPER_HALF"
else:
position = "LOWER_HALF"
return {
'bb_upper': current_upper,
'bb_lower': current_lower,
'bb_middle': current_middle,
'bb_width': band_width,
'bb_width_avg': band_width_avg,
'bb_squeeze': is_squeeze,
'bb_percent_b': percent_b,
'bb_position': position,
'bb_signal': self._get_bollinger_signal(current_price, current_upper, current_lower, current_middle, is_squeeze)
}
except Exception as e:
self.logger.error(f"❌ Error calculating Bollinger Bands: {e}")
return {}
def _get_bollinger_signal(self, price: float, upper: float, lower: float, middle: float, squeeze: bool) -> str:
"""Get Bollinger Bands trading signal"""
# Squeeze breakout strategy
if squeeze:
return "SQUEEZE_WAIT"
# Mean reversion strategy
if price <= lower:
return "BUY_OVERSOLD"
elif price >= upper:
return "SELL_OVERBOUGHT"
elif price < middle:
return "BELOW_MEAN"
elif price > middle:
return "ABOVE_MEAN"
else:
return "NEUTRAL"
def _calculate_moving_averages(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate Moving Averages
Args:
df: DataFrame with price data
Returns:
Dictionary with MA values
"""
try:
close_prices = df['close'] if 'close' in df.columns else df['price']
# Calculate primary moving average
if self.ma_type == 'EMA':
ma_value = close_prices.ewm(span=self.ma_period).mean().iloc[-1]
ma_values = close_prices.ewm(span=self.ma_period).mean()
else: # SMA
ma_value = close_prices.rolling(window=self.ma_period).mean().iloc[-1]
ma_values = close_prices.rolling(window=self.ma_period).mean()
# Calculate fast and slow MAs for crossover
fast_period = max(10, self.ma_period // 2)
slow_period = self.ma_period
if self.ma_type == 'EMA':
ma_fast = close_prices.ewm(span=fast_period).mean().iloc[-1]
ma_slow = close_prices.ewm(span=slow_period).mean().iloc[-1]
ma_fast_prev = close_prices.ewm(span=fast_period).mean().iloc[-2]
ma_slow_prev = close_prices.ewm(span=slow_period).mean().iloc[-2]
else:
ma_fast = close_prices.rolling(window=fast_period).mean().iloc[-1]
ma_slow = close_prices.rolling(window=slow_period).mean().iloc[-1]
ma_fast_prev = close_prices.rolling(window=fast_period).mean().iloc[-2]
ma_slow_prev = close_prices.rolling(window=slow_period).mean().iloc[-2]
# Calculate MA slope
ma_slope = (ma_values.iloc[-1] - ma_values.iloc[-5]) / ma_values.iloc[-5] # 5-period slope
# Determine trend
current_price = close_prices.iloc[-1]
if current_price > ma_value and ma_slope > 0:
trend = "BULLISH"
elif current_price < ma_value and ma_slope < 0:
trend = "BEARISH"
else:
trend = "SIDEWAYS"
# Get crossover signal
crossover_signal = self._get_ma_crossover_signal(ma_fast, ma_slow, ma_fast_prev, ma_slow_prev)
return {
'ma_value': ma_value,
'ma_fast': ma_fast,
'ma_slow': ma_slow,
'ma_fast_prev': ma_fast_prev,
'ma_slow_prev': ma_slow_prev,
'ma_slope': ma_slope,
'ma_trend': trend,
'ma_crossover_signal': crossover_signal,
'ma_signal': self._get_ma_signal(current_price, ma_value, trend)
}
except Exception as e:
self.logger.error(f"❌ Error calculating Moving Averages: {e}")
return {}
def _get_ma_crossover_signal(self, fast: float, slow: float, fast_prev: float, slow_prev: float) -> str:
"""Get MA crossover signal"""
# Golden cross
if fast > slow and fast_prev <= slow_prev:
return "GOLDEN_CROSS"
# Death cross
elif fast < slow and fast_prev >= slow_prev:
return "DEATH_CROSS"
# Above
elif fast > slow:
return "FAST_ABOVE_SLOW"
# Below
elif fast < slow:
return "FAST_BELOW_SLOW"
else:
return "NEUTRAL"
def _get_ma_signal(self, price: float, ma_value: float, trend: str) -> str:
"""Get MA trading signal"""
if price > ma_value and trend == "BULLISH":
return "BUY_ABOVE_MA"
elif price < ma_value and trend == "BEARISH":
return "SELL_BELOW_MA"
elif price > ma_value and trend == "SIDEWAYS":
return "ABOVE_MA_SIDEWAYS"
elif price < ma_value and trend == "SIDEWAYS":
return "BELOW_MA_SIDEWAYS"
else:
return "NEUTRAL"
def _calculate_rsi(self, df: pd.DataFrame, period: int = 14) -> Dict[str, Any]:
"""
Calculate RSI (Relative Strength Index)
Args:
df: DataFrame with price data
period: RSI period
Returns:
Dictionary with RSI values
"""
try:
close_prices = df['close'] if 'close' in df.columns else df['price']
# Calculate price changes
delta = close_prices.diff()
# Separate gains and losses
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate average gains and losses
avg_gains = gains.rolling(window=period).mean()
avg_losses = losses.rolling(window=period).mean()
# Calculate RS and RSI
rs = avg_gains / avg_losses
rsi = 100 - (100 / (1 + rs))
current_rsi = rsi.iloc[-1]
# Determine RSI signal
if current_rsi < 30:
rsi_signal = "OVERSOLD"
elif current_rsi > 70:
rsi_signal = "OVERBOUGHT"
elif current_rsi < 50:
rsi_signal = "BEARISH"
else:
rsi_signal = "BULLISH"
return {
'rsi': current_rsi,
'rsi_signal': rsi_signal,
'rsi_oversold': current_rsi < 30,
'rsi_overbought': current_rsi > 70
}
except Exception as e:
self.logger.error(f"❌ Error calculating RSI: {e}")
return {}
def _calculate_additional_indicators(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate additional technical indicators
Args:
df: DataFrame with price data
Returns:
Dictionary with additional indicator values
"""
try:
indicators = {}
# MACD
macd_result = self._calculate_macd(df)
indicators.update(macd_result)
# Volume indicators
if 'volume' in df.columns:
volume_result = self._calculate_volume_indicators(df)
indicators.update(volume_result)
# Volatility indicators
volatility_result = self._calculate_volatility_indicators(df)
indicators.update(volatility_result)
# Support/Resistance
support_resistance = self._calculate_support_resistance(df)
indicators.update(support_resistance)
return indicators
except Exception as e:
self.logger.error(f"❌ Error calculating additional indicators: {e}")
return {}
def _calculate_macd(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate MACD indicator"""
try:
close_prices = df['close'] if 'close' in df.columns else df['price']
# Calculate MACD
exp1 = close_prices.ewm(span=12).mean()
exp2 = close_prices.ewm(span=26).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9).mean()
histogram = macd - signal
return {
'macd': macd.iloc[-1],
'macd_signal': signal.iloc[-1],
'macd_histogram': histogram.iloc[-1],
'macd_bullish': macd.iloc[-1] > signal.iloc[-1],
'macd_cross': self._detect_macd_cross(macd, signal)
}
except Exception as e:
self.logger.error(f"❌ Error calculating MACD: {e}")
return {}
def _detect_macd_cross(self, macd: pd.Series, signal: pd.Series) -> str:
"""Detect MACD crossover"""
if len(macd) < 2 or len(signal) < 2:
return "INSUFFICIENT_DATA"
current_macd = macd.iloc[-1]
current_signal = signal.iloc[-1]
prev_macd = macd.iloc[-2]
prev_signal = signal.iloc[-2]
if current_macd > current_signal and prev_macd <= prev_signal:
return "BULLISH_CROSS"
elif current_macd < current_signal and prev_macd >= prev_signal:
return "BEARISH_CROSS"
else:
return "NO_CROSS"
def _calculate_volume_indicators(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate volume-based indicators"""
try:
volume = df['volume']
close_prices = df['close'] if 'close' in df.columns else df['price']
# Volume moving average
volume_ma = volume.rolling(window=20).mean()
# Volume ratio
volume_ratio = volume.iloc[-1] / volume_ma.iloc[-1]
# On-Balance Volume (OBV)
obv = self._calculate_obv(close_prices, volume)
return {
'volume_current': volume.iloc[-1],
'volume_ma': volume_ma.iloc[-1],
'volume_ratio': volume_ratio,
'volume_high': volume_ratio > 1.5,
'obv': obv.iloc[-1],
'obv_trend': self._get_obv_trend(obv)
}
except Exception as e:
self.logger.error(f"❌ Error calculating volume indicators: {e}")
return {}
def _calculate_obv(self, close_prices: pd.Series, volume: pd.Series) -> pd.Series:
"""Calculate On-Balance Volume"""
price_change = close_prices.diff()
obv = pd.Series(index=close_prices.index, dtype=float)
obv.iloc[0] = volume.iloc[0]
for i in range(1, len(close_prices)):
if price_change.iloc[i] > 0:
obv.iloc[i] = obv.iloc[i-1] + volume.iloc[i]
elif price_change.iloc[i] < 0:
obv.iloc[i] = obv.iloc[i-1] - volume.iloc[i]
else:
obv.iloc[i] = obv.iloc[i-1]
return obv
def _get_obv_trend(self, obv: pd.Series) -> str:
"""Get OBV trend"""
if len(obv) < 5:
return "INSUFFICIENT_DATA"
recent_obv = obv.iloc[-5:]
trend = np.polyfit(range(len(recent_obv)), recent_obv, 1)[0]
if trend > 0:
return "BULLISH"
elif trend < 0:
return "BEARISH"
else:
return "NEUTRAL"
def _calculate_volatility_indicators(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate volatility indicators"""
try:
close_prices = df['close'] if 'close' in df.columns else df['price']
# Average True Range (ATR)
if 'high' in df.columns and 'low' in df.columns:
atr = self._calculate_atr(df)
else:
atr = close_prices.rolling(window=14).std()
# Volatility ratio
short_vol = close_prices.rolling(window=10).std()
long_vol = close_prices.rolling(window=30).std()
volatility_ratio = short_vol.iloc[-1] / long_vol.iloc[-1]
return {
'atr': atr.iloc[-1] if hasattr(atr, 'iloc') else atr,
'volatility_ratio': volatility_ratio,
'volatility_high': volatility_ratio > 1.2,
'volatility_low': volatility_ratio < 0.8
}
except Exception as e:
self.logger.error(f"❌ Error calculating volatility indicators: {e}")
return {}
def _calculate_atr(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
"""Calculate Average True Range"""
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = abs(high - close.shift())
tr3 = abs(low - close.shift())
true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = true_range.rolling(window=period).mean()
return atr
def _calculate_support_resistance(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate support and resistance levels"""
try:
close_prices = df['close'] if 'close' in df.columns else df['price']
# Simple support/resistance based on recent highs/lows
if 'high' in df.columns and 'low' in df.columns:
high_prices = df['high']
low_prices = df['low']
# Recent resistance (highest high in last 20 periods)
resistance = high_prices.rolling(window=20).max().iloc[-1]
# Recent support (lowest low in last 20 periods)
support = low_prices.rolling(window=20).min().iloc[-1]
else:
# Use close prices
resistance = close_prices.rolling(window=20).max().iloc[-1]
support = close_prices.rolling(window=20).min().iloc[-1]
current_price = close_prices.iloc[-1]
# Calculate distance to support/resistance
resistance_distance = (resistance - current_price) / current_price * 100
support_distance = (current_price - support) / current_price * 100
return {
'resistance': resistance,
'support': support,
'resistance_distance': resistance_distance,
'support_distance': support_distance,
'near_resistance': resistance_distance < 2, # Within 2%
'near_support': support_distance < 2 # Within 2%
}
except Exception as e:
self.logger.error(f"❌ Error calculating support/resistance: {e}")
return {}
def get_combined_signal(self, indicators: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate combined trading signal from all indicators
Args:
indicators: Dictionary of indicator values
Returns:
Combined signal analysis
"""
try:
signals = {
'buy_signals': 0,
'sell_signals': 0,
'neutral_signals': 0,
'overall_signal': 'NEUTRAL',
'confidence': 0.0,
'reasons': []
}
# Bollinger Bands signals
bb_signal = indicators.get('bb_signal', 'NEUTRAL')
if bb_signal == 'BUY_OVERSOLD':
signals['buy_signals'] += 1
signals['reasons'].append('BB: Oversold')
elif bb_signal == 'SELL_OVERBOUGHT':
signals['sell_signals'] += 1
signals['reasons'].append('BB: Overbought')
# Moving Average signals
ma_signal = indicators.get('ma_signal', 'NEUTRAL')
if 'BUY' in ma_signal:
signals['buy_signals'] += 1
signals['reasons'].append('MA: Bullish')
elif 'SELL' in ma_signal:
signals['sell_signals'] += 1
signals['reasons'].append('MA: Bearish')
# RSI signals
rsi_signal = indicators.get('rsi_signal', 'NEUTRAL')
if rsi_signal == 'OVERSOLD':
signals['buy_signals'] += 1
signals['reasons'].append('RSI: Oversold')
elif rsi_signal == 'OVERBOUGHT':
signals['sell_signals'] += 1
signals['reasons'].append('RSI: Overbought')
# MACD signals
macd_cross = indicators.get('macd_cross', 'NO_CROSS')
if macd_cross == 'BULLISH_CROSS':
signals['buy_signals'] += 1
signals['reasons'].append('MACD: Bullish cross')
elif macd_cross == 'BEARISH_CROSS':
signals['sell_signals'] += 1
signals['reasons'].append('MACD: Bearish cross')
# Volume confirmation
volume_high = indicators.get('volume_high', False)
if volume_high:
signals['reasons'].append('Volume: High')
# Determine overall signal
total_signals = signals['buy_signals'] + signals['sell_signals']
if total_signals > 0:
if signals['buy_signals'] > signals['sell_signals']:
signals['overall_signal'] = 'BUY'
signals['confidence'] = signals['buy_signals'] / total_signals
elif signals['sell_signals'] > signals['buy_signals']:
signals['overall_signal'] = 'SELL'
signals['confidence'] = signals['sell_signals'] / total_signals
else:
signals['overall_signal'] = 'NEUTRAL'
signals['confidence'] = 0.5
return signals
except Exception as e:
self.logger.error(f"❌ Error generating combined signal: {e}")
return {'overall_signal': 'NEUTRAL', 'confidence': 0.0, 'reasons': []}
def get_indicator_summary(self) -> Dict[str, Any]:
"""Get summary of current indicators"""
if not self.indicator_cache:
return {'status': 'No indicators calculated yet'}
summary = {
'last_update': self.last_calculation.isoformat() if self.last_calculation else None,
'bollinger_bands': {
'upper': self.indicator_cache.get('bb_upper'),
'lower': self.indicator_cache.get('bb_lower'),
'signal': self.indicator_cache.get('bb_signal')
},
'moving_average': {
'value': self.indicator_cache.get('ma_value'),
'trend': self.indicator_cache.get('ma_trend'),
'signal': self.indicator_cache.get('ma_signal')
},
'rsi': {
'value': self.indicator_cache.get('rsi'),
'signal': self.indicator_cache.get('rsi_signal')
},
'combined_signal': self.get_combined_signal(self.indicator_cache)
}
return summary

View file

@ -0,0 +1,263 @@
"""
Advanced Logging System for Trading Bot
Supports file rotation, structured logging, and multiple log levels
"""
import logging
import logging.handlers
import os
import sys
from datetime import datetime
from typing import Optional
class ColoredFormatter(logging.Formatter):
"""Custom formatter with color support for console output"""
COLORS = {
'DEBUG': '\033[36m', # Cyan
'INFO': '\033[32m', # Green
'WARNING': '\033[33m', # Yellow
'ERROR': '\033[31m', # Red
'CRITICAL': '\033[35m', # Magenta
}
RESET = '\033[0m'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.use_color = sys.stdout.isatty() # Only use color if outputting to terminal
def format(self, record):
if self.use_color and record.levelname in self.COLORS:
record.levelname = f"{self.COLORS[record.levelname]}{record.levelname}{self.RESET}"
return super().format(record)
def setup_logger(
name: str = "TradingBot",
level: str = "INFO",
log_file: Optional[str] = None,
max_size: int = 10485760, # 10MB
backup_count: int = 5,
console_output: bool = True
) -> logging.Logger:
"""
Setup comprehensive logging with file rotation and console output
Args:
name: Logger name
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_file: Path to log file (optional)
max_size: Maximum log file size before rotation
backup_count: Number of backup files to keep
console_output: Whether to output to console
Returns:
Configured logger instance
"""
# Create logger
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, level.upper()))
# Clear existing handlers
logger.handlers.clear()
# Create formatters
console_formatter = ColoredFormatter(
fmt='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_formatter = logging.Formatter(
fmt='%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Console handler
if console_output:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(getattr(logging, level.upper()))
logger.addHandler(console_handler)
# File handler with rotation
if log_file:
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(log_file), exist_ok=True)
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=max_size,
backupCount=backup_count,
encoding='utf-8'
)
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.DEBUG) # File gets all levels
logger.addHandler(file_handler)
# Error handler for critical issues
error_handler = logging.StreamHandler(sys.stderr)
error_handler.setFormatter(file_formatter)
error_handler.setLevel(logging.ERROR)
logger.addHandler(error_handler)
return logger
class TradingLogger:
"""
Enhanced logger wrapper for trading bot with specialized methods
"""
def __init__(self, logger: logging.Logger):
self.logger = logger
self.trade_count = 0
self.error_count = 0
self.session_start = datetime.now()
def info(self, message: str, **kwargs):
"""Log info message with context"""
self.logger.info(self._format_message(message, **kwargs))
def warning(self, message: str, **kwargs):
"""Log warning message with context"""
self.logger.warning(self._format_message(message, **kwargs))
def error(self, message: str, **kwargs):
"""Log error message with context"""
self.error_count += 1
self.logger.error(self._format_message(message, **kwargs))
def critical(self, message: str, **kwargs):
"""Log critical message with context"""
self.error_count += 1
self.logger.critical(self._format_message(message, **kwargs))
def debug(self, message: str, **kwargs):
"""Log debug message with context"""
self.logger.debug(self._format_message(message, **kwargs))
def trade(self, action: str, symbol: str, quantity: float, price: float, **kwargs):
"""Log trade execution"""
self.trade_count += 1
message = f"TRADE #{self.trade_count}: {action} {quantity} {symbol} @ {price}"
if kwargs:
message += f" | {kwargs}"
self.logger.info(message)
def signal(self, signal_type: str, symbol: str, details: dict = None):
"""Log trading signal"""
message = f"SIGNAL: {signal_type} for {symbol}"
if details:
message += f" | {details}"
self.logger.info(message)
def position(self, action: str, symbol: str, details: dict = None):
"""Log position changes"""
message = f"POSITION: {action} {symbol}"
if details:
message += f" | {details}"
self.logger.info(message)
def websocket(self, event: str, details: dict = None):
"""Log WebSocket events"""
message = f"WS: {event}"
if details:
message += f" | {details}"
self.logger.debug(message)
def indicator(self, name: str, symbol: str, values: dict):
"""Log indicator calculations"""
message = f"INDICATOR: {name} for {symbol} | {values}"
self.logger.debug(message)
def risk(self, check: str, result: str, details: dict = None):
"""Log risk management events"""
message = f"RISK: {check} -> {result}"
if details:
message += f" | {details}"
self.logger.info(message)
def performance(self, metrics: dict):
"""Log performance metrics"""
message = f"PERFORMANCE: {metrics}"
self.logger.info(message)
def session_summary(self):
"""Log session summary"""
uptime = datetime.now() - self.session_start
self.logger.info(f"SESSION SUMMARY: Uptime: {uptime}, Trades: {self.trade_count}, Errors: {self.error_count}")
def _format_message(self, message: str, **kwargs) -> str:
"""Format message with optional context"""
if kwargs:
context = " | ".join([f"{k}={v}" for k, v in kwargs.items()])
return f"{message} | {context}"
return message
class TradeLogger:
"""
Specialized logger for trade events with structured output
"""
def __init__(self, logger: logging.Logger):
self.logger = logger
def log_order_created(self, order_id: str, symbol: str, side: str, quantity: float, price: float = None):
"""Log order creation"""
price_str = f" @ {price}" if price else " (MARKET)"
self.logger.info(f"ORDER_CREATED: {order_id} | {side} {quantity} {symbol}{price_str}")
def log_order_filled(self, order_id: str, symbol: str, side: str, quantity: float, price: float, fee: float = 0):
"""Log order fill"""
self.logger.info(f"ORDER_FILLED: {order_id} | {side} {quantity} {symbol} @ {price} | Fee: {fee}")
def log_order_cancelled(self, order_id: str, reason: str = ""):
"""Log order cancellation"""
reason_str = f" | Reason: {reason}" if reason else ""
self.logger.info(f"ORDER_CANCELLED: {order_id}{reason_str}")
def log_position_opened(self, symbol: str, side: str, quantity: float, entry_price: float):
"""Log position opening"""
self.logger.info(f"POSITION_OPENED: {side} {quantity} {symbol} @ {entry_price}")
def log_position_closed(self, symbol: str, side: str, quantity: float, exit_price: float, pnl: float):
"""Log position closing"""
pnl_str = f"+{pnl:.2f}" if pnl > 0 else f"{pnl:.2f}"
self.logger.info(f"POSITION_CLOSED: {side} {quantity} {symbol} @ {exit_price} | PnL: {pnl_str}")
def log_stop_loss_triggered(self, symbol: str, price: float, loss: float):
"""Log stop loss trigger"""
self.logger.warning(f"STOP_LOSS_TRIGGERED: {symbol} @ {price} | Loss: {loss:.2f}")
def log_take_profit_triggered(self, symbol: str, price: float, profit: float):
"""Log take profit trigger"""
self.logger.info(f"TAKE_PROFIT_TRIGGERED: {symbol} @ {price} | Profit: {profit:.2f}")
def log_risk_check_failed(self, check: str, details: str):
"""Log failed risk check"""
self.logger.warning(f"RISK_CHECK_FAILED: {check} | {details}")
# Convenience function for getting enhanced logger
def get_trading_logger(name: str = "TradingBot", **kwargs) -> TradingLogger:
"""Get enhanced trading logger with specialized methods"""
base_logger = setup_logger(name, **kwargs)
return TradingLogger(base_logger)
# Error handling decorator
def log_exceptions(logger: logging.Logger):
"""Decorator to log exceptions in functions"""
def decorator(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Exception in {func.__name__}: {str(e)}", exc_info=True)
raise
return wrapper
return decorator

View file

@ -0,0 +1,544 @@
"""
Order Management System
Handles order creation, execution, and tracking
"""
import asyncio
import time
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
import ccxt
from .config import Config
from .logger import TradingLogger
class OrderStatus(Enum):
"""Order status enumeration"""
PENDING = "pending"
SUBMITTED = "submitted"
OPEN = "open"
PARTIALLY_FILLED = "partially_filled"
FILLED = "filled"
CANCELLED = "cancelled"
REJECTED = "rejected"
EXPIRED = "expired"
class OrderType(Enum):
"""Order type enumeration"""
MARKET = "market"
LIMIT = "limit"
STOP = "stop"
STOP_LIMIT = "stop_limit"
class OrderSide(Enum):
"""Order side enumeration"""
BUY = "buy"
SELL = "sell"
@dataclass
class Order:
"""Order data structure"""
id: str
symbol: str
side: OrderSide
type: OrderType
quantity: float
price: Optional[float] = None
stop_price: Optional[float] = None
status: OrderStatus = OrderStatus.PENDING
filled_quantity: float = 0.0
average_price: float = 0.0
fees: float = 0.0
created_at: datetime = None
updated_at: datetime = None
exchange_order_id: Optional[str] = None
client_order_id: Optional[str] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
if self.updated_at is None:
self.updated_at = datetime.now()
if self.client_order_id is None:
self.client_order_id = f"bot_{int(time.time() * 1000)}"
class OrderManager:
"""
Manages order creation, execution, and tracking
"""
def __init__(self, config: Config, logger: TradingLogger, exchange: ccxt.Exchange):
self.config = config
self.logger = logger
self.exchange = exchange
# Order tracking
self.orders: Dict[str, Order] = {}
self.order_history: List[Order] = []
# Statistics
self.orders_created = 0
self.orders_filled = 0
self.orders_cancelled = 0
self.orders_rejected = 0
# Rate limiting
self.last_order_time = 0
self.min_order_interval = 1.0 # Minimum seconds between orders
# Market data
self.current_price = 0.0
self.bid_price = 0.0
self.ask_price = 0.0
self.spread = 0.0
self.logger.info("💼 Order Manager initialized")
async def place_buy_order(self, symbol: str, quantity: float, price: float = None) -> Optional[Order]:
"""
Place a buy order
Args:
symbol: Trading symbol
quantity: Order quantity
price: Order price (None for market order)
Returns:
Order object if successful, None otherwise
"""
try:
# Validate order parameters
if not self._validate_order_params(symbol, quantity, price):
return None
# Check rate limiting
if not await self._check_rate_limit():
return None
# Create order object
order = Order(
id=str(uuid.uuid4()),
symbol=symbol,
side=OrderSide.BUY,
type=OrderType.LIMIT if price else OrderType.MARKET,
quantity=quantity,
price=price
)
# Place order based on trading mode
if self.config.is_paper_trading():
await self._place_paper_order(order)
else:
await self._place_live_order(order)
# Track order
self.orders[order.id] = order
self.orders_created += 1
self.logger.trade("BUY", symbol, quantity, price or self.current_price)
return order
except Exception as e:
self.logger.error(f"❌ Error placing buy order: {e}")
return None
async def place_sell_order(self, symbol: str, quantity: float, price: float = None) -> Optional[Order]:
"""
Place a sell order
Args:
symbol: Trading symbol
quantity: Order quantity
price: Order price (None for market order)
Returns:
Order object if successful, None otherwise
"""
try:
# Validate order parameters
if not self._validate_order_params(symbol, quantity, price):
return None
# Check rate limiting
if not await self._check_rate_limit():
return None
# Create order object
order = Order(
id=str(uuid.uuid4()),
symbol=symbol,
side=OrderSide.SELL,
type=OrderType.LIMIT if price else OrderType.MARKET,
quantity=quantity,
price=price
)
# Place order based on trading mode
if self.config.is_paper_trading():
await self._place_paper_order(order)
else:
await self._place_live_order(order)
# Track order
self.orders[order.id] = order
self.orders_created += 1
self.logger.trade("SELL", symbol, quantity, price or self.current_price)
return order
except Exception as e:
self.logger.error(f"❌ Error placing sell order: {e}")
return None
async def cancel_order(self, order_id: str) -> bool:
"""
Cancel an order
Args:
order_id: Order ID to cancel
Returns:
True if successful, False otherwise
"""
try:
if order_id not in self.orders:
self.logger.warning(f"⚠️ Order {order_id} not found")
return False
order = self.orders[order_id]
# Can only cancel open orders
if order.status not in [OrderStatus.OPEN, OrderStatus.PARTIALLY_FILLED]:
self.logger.warning(f"⚠️ Cannot cancel order {order_id} with status {order.status}")
return False
# Cancel order based on trading mode
if self.config.is_paper_trading():
await self._cancel_paper_order(order)
else:
await self._cancel_live_order(order)
order.status = OrderStatus.CANCELLED
order.updated_at = datetime.now()
# Move to history
self.order_history.append(order)
del self.orders[order_id]
self.orders_cancelled += 1
self.logger.info(f"✅ Order {order_id} cancelled")
return True
except Exception as e:
self.logger.error(f"❌ Error cancelling order {order_id}: {e}")
return False
async def cancel_all_orders(self) -> int:
"""
Cancel all open orders
Returns:
Number of orders cancelled
"""
try:
cancelled_count = 0
order_ids = list(self.orders.keys())
for order_id in order_ids:
if await self.cancel_order(order_id):
cancelled_count += 1
self.logger.info(f"✅ Cancelled {cancelled_count} orders")
return cancelled_count
except Exception as e:
self.logger.error(f"❌ Error cancelling all orders: {e}")
return 0
async def get_open_orders(self) -> List[Order]:
"""Get all open orders"""
open_orders = []
for order in self.orders.values():
if order.status in [OrderStatus.OPEN, OrderStatus.PARTIALLY_FILLED]:
open_orders.append(order)
return open_orders
async def get_order_status(self, order_id: str) -> Optional[OrderStatus]:
"""Get order status"""
if order_id in self.orders:
return self.orders[order_id].status
# Check order history
for order in self.order_history:
if order.id == order_id:
return order.status
return None
async def update_market_data(self, market_data: Dict[str, Any]):
"""Update market data for order processing"""
try:
self.current_price = market_data.get('price', self.current_price)
# Update bid/ask from depth data
if market_data.get('type') == 'depth':
bids = market_data.get('bids', [])
asks = market_data.get('asks', [])
if bids:
self.bid_price = bids[0][0]
if asks:
self.ask_price = asks[0][0]
if self.bid_price and self.ask_price:
self.spread = self.ask_price - self.bid_price
# Check for order fills in paper trading
if self.config.is_paper_trading():
await self._check_paper_order_fills()
except Exception as e:
self.logger.error(f"❌ Error updating market data: {e}")
async def check_stuck_orders(self):
"""Check for stuck orders and handle them"""
try:
current_time = datetime.now()
stuck_threshold = timedelta(minutes=10) # 10 minutes
for order_id, order in list(self.orders.items()):
if current_time - order.created_at > stuck_threshold:
self.logger.warning(f"⚠️ Order {order_id} appears stuck, cancelling...")
await self.cancel_order(order_id)
except Exception as e:
self.logger.error(f"❌ Error checking stuck orders: {e}")
async def _place_live_order(self, order: Order):
"""Place order on live exchange"""
try:
# Prepare order parameters
order_params = {
'symbol': order.symbol,
'type': order.type.value,
'side': order.side.value,
'amount': order.quantity,
'params': {
'timeInForce': 'GTC',
'newClientOrderId': order.client_order_id
}
}
# Add price for limit orders
if order.type == OrderType.LIMIT:
order_params['price'] = order.price
# Place order
result = await self.exchange.create_order(**order_params)
# Update order with exchange response
order.exchange_order_id = result.get('id')
order.status = OrderStatus.SUBMITTED
order.updated_at = datetime.now()
self.logger.info(f"✅ Live order placed: {order.exchange_order_id}")
except Exception as e:
order.status = OrderStatus.REJECTED
order.updated_at = datetime.now()
self.orders_rejected += 1
self.logger.error(f"❌ Live order placement failed: {e}")
raise
async def _place_paper_order(self, order: Order):
"""Place paper trading order"""
try:
# Simulate order placement
await asyncio.sleep(0.1) # Simulate network delay
order.status = OrderStatus.OPEN
order.updated_at = datetime.now()
self.logger.info(f"📄 Paper order placed: {order.id}")
except Exception as e:
order.status = OrderStatus.REJECTED
order.updated_at = datetime.now()
self.orders_rejected += 1
self.logger.error(f"❌ Paper order placement failed: {e}")
raise
async def _cancel_live_order(self, order: Order):
"""Cancel live order"""
try:
if order.exchange_order_id:
await self.exchange.cancel_order(order.exchange_order_id, order.symbol)
self.logger.info(f"✅ Live order cancelled: {order.exchange_order_id}")
except Exception as e:
self.logger.error(f"❌ Live order cancellation failed: {e}")
raise
async def _cancel_paper_order(self, order: Order):
"""Cancel paper trading order"""
try:
# Simulate cancellation
await asyncio.sleep(0.1)
self.logger.info(f"📄 Paper order cancelled: {order.id}")
except Exception as e:
self.logger.error(f"❌ Paper order cancellation failed: {e}")
raise
async def _check_paper_order_fills(self):
"""Check for paper order fills based on current market price"""
try:
for order_id, order in list(self.orders.items()):
if order.status != OrderStatus.OPEN:
continue
# Check if order should be filled
should_fill = False
if order.type == OrderType.MARKET:
should_fill = True
elif order.type == OrderType.LIMIT:
if order.side == OrderSide.BUY and self.current_price <= order.price:
should_fill = True
elif order.side == OrderSide.SELL and self.current_price >= order.price:
should_fill = True
if should_fill:
await self._fill_paper_order(order)
except Exception as e:
self.logger.error(f"❌ Error checking paper order fills: {e}")
async def _fill_paper_order(self, order: Order):
"""Fill a paper trading order"""
try:
# Simulate order fill
fill_price = order.price if order.type == OrderType.LIMIT else self.current_price
order.status = OrderStatus.FILLED
order.filled_quantity = order.quantity
order.average_price = fill_price
order.fees = order.quantity * fill_price * 0.001 # 0.1% fee
order.updated_at = datetime.now()
# Move to history
self.order_history.append(order)
del self.orders[order.id]
self.orders_filled += 1
self.logger.info(f"📄 Paper order filled: {order.id} at {fill_price}")
except Exception as e:
self.logger.error(f"❌ Error filling paper order: {e}")
def _validate_order_params(self, symbol: str, quantity: float, price: float = None) -> bool:
"""Validate order parameters"""
try:
# Basic validation
if not symbol:
self.logger.error("❌ Symbol is required")
return False
if quantity <= 0:
self.logger.error("❌ Quantity must be positive")
return False
if price is not None and price <= 0:
self.logger.error("❌ Price must be positive")
return False
# Check minimum order size
min_quantity = 0.001 # Minimum for most crypto pairs
if quantity < min_quantity:
self.logger.error(f"❌ Quantity below minimum: {min_quantity}")
return False
# Check price deviation (for limit orders)
if price is not None and self.current_price > 0:
price_deviation = abs(price - self.current_price) / self.current_price
if price_deviation > 0.1: # 10% deviation
self.logger.warning(f"⚠️ Large price deviation: {price_deviation:.2%}")
return True
except Exception as e:
self.logger.error(f"❌ Error validating order parameters: {e}")
return False
async def _check_rate_limit(self) -> bool:
"""Check rate limiting for orders"""
try:
current_time = time.time()
if current_time - self.last_order_time < self.min_order_interval:
wait_time = self.min_order_interval - (current_time - self.last_order_time)
self.logger.warning(f"⏱️ Rate limit: waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
self.last_order_time = time.time()
return True
except Exception as e:
self.logger.error(f"❌ Error checking rate limit: {e}")
return False
def get_statistics(self) -> Dict[str, Any]:
"""Get order management statistics"""
return {
'orders_created': self.orders_created,
'orders_filled': self.orders_filled,
'orders_cancelled': self.orders_cancelled,
'orders_rejected': self.orders_rejected,
'open_orders': len(self.orders),
'order_history': len(self.order_history),
'fill_rate': self.orders_filled / max(1, self.orders_created) * 100,
'current_price': self.current_price,
'spread': self.spread
}
def get_order_book_summary(self) -> Dict[str, Any]:
"""Get order book summary"""
return {
'bid_price': self.bid_price,
'ask_price': self.ask_price,
'spread': self.spread,
'spread_percent': (self.spread / self.current_price * 100) if self.current_price > 0 else 0,
'mid_price': (self.bid_price + self.ask_price) / 2 if self.bid_price and self.ask_price else 0
}

View file

@ -0,0 +1,469 @@
"""
Position Manager
Tracks positions and calculates PnL
"""
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import json
from .config import Config
from .logger import TradingLogger
@dataclass
class Position:
"""Position data structure"""
id: str
symbol: str
side: str # 'long' or 'short'
quantity: float
entry_price: float
current_price: float = 0.0
unrealized_pnl: float = 0.0
realized_pnl: float = 0.0
fees: float = 0.0
created_at: datetime = None
updated_at: datetime = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
if self.updated_at is None:
self.updated_at = datetime.now()
class PositionManager:
"""Manages trading positions and PnL calculation"""
def __init__(self, config: Config, logger: TradingLogger, exchange):
self.config = config
self.logger = logger
self.exchange = exchange
# Position tracking
self.positions: Dict[str, Position] = {}
self.position_history: List[Position] = []
# PnL tracking
self.total_realized_pnl = 0.0
self.total_unrealized_pnl = 0.0
self.total_fees = 0.0
# Statistics
self.total_positions = 0
self.winning_positions = 0
self.losing_positions = 0
self.logger.info("📊 Position Manager initialized")
async def open_position(self, symbol: str, side: str, quantity: float, entry_price: float) -> Position:
"""Open a new position"""
try:
position = Position(
id=f"{symbol}_{side}_{int(datetime.now().timestamp())}",
symbol=symbol,
side=side,
quantity=quantity,
entry_price=entry_price,
current_price=entry_price
)
self.positions[position.id] = position
self.total_positions += 1
self.logger.position("OPENED", symbol, {
"side": side,
"quantity": quantity,
"entry_price": entry_price,
"position_id": position.id
})
return position
except Exception as e:
self.logger.error(f"❌ Error opening position: {e}")
raise
async def close_position(self, position: Position, reason: str = "") -> float:
"""Close a position and calculate realized PnL"""
try:
# Calculate realized PnL
if position.side == 'long':
realized_pnl = (position.current_price - position.entry_price) * position.quantity
else: # short
realized_pnl = (position.entry_price - position.current_price) * position.quantity
# Account for fees
realized_pnl -= position.fees
# Update position
position.realized_pnl = realized_pnl
position.updated_at = datetime.now()
# Update statistics
self.total_realized_pnl += realized_pnl
self.total_fees += position.fees
if realized_pnl > 0:
self.winning_positions += 1
else:
self.losing_positions += 1
# Move to history
self.position_history.append(position)
# Remove from active positions
if position.id in self.positions:
del self.positions[position.id]
self.logger.position("CLOSED", position.symbol, {
"side": position.side,
"quantity": position.quantity,
"entry_price": position.entry_price,
"exit_price": position.current_price,
"realized_pnl": realized_pnl,
"reason": reason
})
return realized_pnl
except Exception as e:
self.logger.error(f"❌ Error closing position: {e}")
return 0.0
async def update_positions(self, current_prices: Dict[str, float] = None):
"""Update all positions with current prices"""
try:
for position in self.positions.values():
if current_prices and position.symbol in current_prices:
position.current_price = current_prices[position.symbol]
# Calculate unrealized PnL
if position.side == 'long':
position.unrealized_pnl = (position.current_price - position.entry_price) * position.quantity
else: # short
position.unrealized_pnl = (position.entry_price - position.current_price) * position.quantity
# Account for fees
position.unrealized_pnl -= position.fees
position.updated_at = datetime.now()
# Update total unrealized PnL
self.total_unrealized_pnl = sum(pos.unrealized_pnl for pos in self.positions.values())
except Exception as e:
self.logger.error(f"❌ Error updating positions: {e}")
async def get_positions(self) -> List[Position]:
"""Get all active positions"""
return list(self.positions.values())
async def get_position_by_symbol(self, symbol: str) -> Optional[Position]:
"""Get position by symbol"""
for position in self.positions.values():
if position.symbol == symbol:
return position
return None
async def close_all_positions(self, reason: str = "Emergency close"):
"""Close all positions"""
try:
positions_to_close = list(self.positions.values())
for position in positions_to_close:
await self.close_position(position, reason)
self.logger.info(f"✅ Closed {len(positions_to_close)} positions")
except Exception as e:
self.logger.error(f"❌ Error closing all positions: {e}")
def get_portfolio_value(self) -> float:
"""Get current portfolio value"""
return self.total_realized_pnl + self.total_unrealized_pnl
def get_position_summary(self) -> Dict[str, Any]:
"""Get position summary"""
total_positions = self.total_positions
win_rate = (self.winning_positions / max(1, total_positions)) * 100
return {
'active_positions': len(self.positions),
'total_positions': total_positions,
'winning_positions': self.winning_positions,
'losing_positions': self.losing_positions,
'win_rate': win_rate,
'total_realized_pnl': self.total_realized_pnl,
'total_unrealized_pnl': self.total_unrealized_pnl,
'total_fees': self.total_fees,
'portfolio_value': self.get_portfolio_value()
}
def get_position_details(self) -> List[Dict[str, Any]]:
"""Get detailed position information"""
details = []
for position in self.positions.values():
details.append({
'id': position.id,
'symbol': position.symbol,
'side': position.side,
'quantity': position.quantity,
'entry_price': position.entry_price,
'current_price': position.current_price,
'unrealized_pnl': position.unrealized_pnl,
'pnl_percent': (position.unrealized_pnl / (position.entry_price * position.quantity)) * 100,
'created_at': position.created_at.isoformat(),
'age': str(datetime.now() - position.created_at)
})
return details
def save_positions_to_file(self, filename: str = "positions.json"):
"""Save positions to file"""
try:
data = {
'active_positions': self.get_position_details(),
'summary': self.get_position_summary(),
'timestamp': datetime.now().isoformat()
}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"✅ Positions saved to {filename}")
except Exception as e:
self.logger.error(f"❌ Error saving positions: {e}")
class PerformanceTracker:
"""Tracks trading performance metrics"""
def __init__(self, config: Config, logger: TradingLogger):
self.config = config
self.logger = logger
# Performance metrics
self.total_trades = 0
self.winning_trades = 0
self.losing_trades = 0
self.total_pnl = 0.0
self.total_volume = 0.0
self.total_fees = 0.0
# Daily metrics
self.daily_pnl = 0.0
self.daily_trades = 0
self.daily_volume = 0.0
# Risk metrics
self.max_drawdown = 0.0
self.current_drawdown = 0.0
self.peak_portfolio_value = 0.0
# Time tracking
self.session_start = datetime.now()
self.last_trade_time = None
self.logger.info("📈 Performance Tracker initialized")
async def record_trade(self, symbol: str, side: str, quantity: float, entry_price: float,
exit_price: float, fees: float = 0.0):
"""Record a completed trade"""
try:
# Calculate trade PnL
if side == 'long':
trade_pnl = (exit_price - entry_price) * quantity
else: # short
trade_pnl = (entry_price - exit_price) * quantity
# Account for fees
trade_pnl -= fees
# Update metrics
self.total_trades += 1
self.total_pnl += trade_pnl
self.total_volume += quantity * entry_price
self.total_fees += fees
# Update daily metrics
self.daily_trades += 1
self.daily_pnl += trade_pnl
self.daily_volume += quantity * entry_price
# Update win/loss counts
if trade_pnl > 0:
self.winning_trades += 1
else:
self.losing_trades += 1
self.last_trade_time = datetime.now()
# Log trade
self.logger.performance({
'trade_pnl': trade_pnl,
'total_pnl': self.total_pnl,
'win_rate': self.get_win_rate(),
'total_trades': self.total_trades
})
except Exception as e:
self.logger.error(f"❌ Error recording trade: {e}")
async def update_metrics(self):
"""Update performance metrics"""
try:
# Calculate current metrics
metrics = self.get_performance_metrics()
# Update drawdown
if self.total_pnl > self.peak_portfolio_value:
self.peak_portfolio_value = self.total_pnl
if self.peak_portfolio_value > 0:
self.current_drawdown = ((self.peak_portfolio_value - self.total_pnl) / self.peak_portfolio_value) * 100
self.max_drawdown = max(self.max_drawdown, self.current_drawdown)
# Log metrics periodically
if self.total_trades > 0 and self.total_trades % 10 == 0:
self.logger.performance(metrics)
except Exception as e:
self.logger.error(f"❌ Error updating metrics: {e}")
def get_performance_metrics(self) -> Dict[str, Any]:
"""Get comprehensive performance metrics"""
uptime = datetime.now() - self.session_start
return {
'total_trades': self.total_trades,
'winning_trades': self.winning_trades,
'losing_trades': self.losing_trades,
'win_rate': self.get_win_rate(),
'total_pnl': self.total_pnl,
'average_pnl_per_trade': self.total_pnl / max(1, self.total_trades),
'total_volume': self.total_volume,
'total_fees': self.total_fees,
'profit_factor': self.get_profit_factor(),
'sharpe_ratio': self.get_sharpe_ratio(),
'max_drawdown': self.max_drawdown,
'current_drawdown': self.current_drawdown,
'daily_pnl': self.daily_pnl,
'daily_trades': self.daily_trades,
'session_uptime': str(uptime),
'last_trade_time': self.last_trade_time.isoformat() if self.last_trade_time else None
}
def get_win_rate(self) -> float:
"""Calculate win rate percentage"""
if self.total_trades == 0:
return 0.0
return (self.winning_trades / self.total_trades) * 100
def get_profit_factor(self) -> float:
"""Calculate profit factor"""
if self.losing_trades == 0:
return float('inf') if self.winning_trades > 0 else 0.0
# This is a simplified profit factor calculation
# In practice, you'd track gross profit and gross loss separately
if self.total_pnl > 0:
return abs(self.total_pnl) / max(1, self.total_fees)
else:
return 0.0
def get_sharpe_ratio(self) -> float:
"""Calculate Sharpe ratio (simplified)"""
# This is a simplified calculation
# In practice, you'd need returns over time to calculate proper Sharpe ratio
if self.total_trades < 2:
return 0.0
avg_return = self.total_pnl / self.total_trades
# Simplified risk-free rate assumption
risk_free_rate = 0.02 # 2% annually
# Simplified standard deviation calculation
if self.total_trades > 0:
return (avg_return - risk_free_rate) / max(0.01, abs(avg_return))
return 0.0
async def get_daily_pnl(self) -> float:
"""Get daily PnL"""
return self.daily_pnl
async def reset_daily_metrics(self):
"""Reset daily metrics (called at start of new day)"""
self.daily_pnl = 0.0
self.daily_trades = 0
self.daily_volume = 0.0
self.logger.info("📅 Daily metrics reset")
async def save_session_data(self):
"""Save session data to file"""
try:
data = {
'performance_metrics': self.get_performance_metrics(),
'session_start': self.session_start.isoformat(),
'session_end': datetime.now().isoformat()
}
filename = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"✅ Session data saved to {filename}")
except Exception as e:
self.logger.error(f"❌ Error saving session data: {e}")
def get_summary_report(self) -> str:
"""Generate a summary report"""
metrics = self.get_performance_metrics()
report = f"""
📊 TRADING SESSION SUMMARY
========================
🔢 Trade Statistics:
Total Trades: {metrics['total_trades']}
Winning Trades: {metrics['winning_trades']}
Losing Trades: {metrics['losing_trades']}
Win Rate: {metrics['win_rate']:.2f}%
💰 Financial Performance:
Total PnL: ${metrics['total_pnl']:.2f}
Average PnL per Trade: ${metrics['average_pnl_per_trade']:.2f}
Total Volume: ${metrics['total_volume']:.2f}
Total Fees: ${metrics['total_fees']:.2f}
📈 Risk Metrics:
Max Drawdown: {metrics['max_drawdown']:.2f}%
Current Drawdown: {metrics['current_drawdown']:.2f}%
Profit Factor: {metrics['profit_factor']:.2f}
Sharpe Ratio: {metrics['sharpe_ratio']:.2f}
Session Info:
Uptime: {metrics['session_uptime']}
Last Trade: {metrics['last_trade_time'] or 'None'}
"""
return report

View file

@ -0,0 +1,405 @@
"""
Risk Management System
Handles position sizing, stop-loss, take-profit, and risk limits
"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from .config import Config
from .logger import TradingLogger
@dataclass
class RiskLimits:
"""Risk limits configuration"""
max_position_size: float
max_daily_loss: float
max_portfolio_risk: float
stop_loss_percent: float
take_profit_percent: float
max_drawdown: float = 20.0
class RiskManager:
"""
Manages risk limits and position sizing
"""
def __init__(self, config: Config, logger: TradingLogger):
self.config = config
self.logger = logger
# Risk limits
self.risk_limits = RiskLimits(
max_position_size=config.POSITION_SIZE,
max_daily_loss=config.MAX_DAILY_LOSS,
max_portfolio_risk=config.MAX_PORTFOLIO_RISK,
stop_loss_percent=config.STOP_LOSS_PERCENT,
take_profit_percent=config.TAKE_PROFIT_PERCENT
)
# Daily tracking
self.daily_pnl = 0.0
self.daily_trades = 0
self.daily_losses = 0
self.last_reset = datetime.now().date()
# Portfolio tracking
self.total_exposure = 0.0
self.max_drawdown = 0.0
self.peak_portfolio_value = 0.0
# Emergency states
self.trading_halted = False
self.halt_reason = ""
self.logger.info(f"🛡️ Risk Manager initialized with SL: {self.risk_limits.stop_loss_percent}%, TP: {self.risk_limits.take_profit_percent}%")
async def check_signal_risk(self, signal) -> bool:
"""
Check if signal passes risk checks
Args:
signal: Trading signal to check
Returns:
True if signal passes risk checks, False otherwise
"""
try:
# Check if trading is halted
if self.trading_halted:
self.logger.risk("SIGNAL_CHECK", "REJECTED", {"reason": self.halt_reason})
return False
# Check daily loss limit
if self.daily_pnl <= -self.risk_limits.max_daily_loss:
self.logger.risk("DAILY_LOSS_CHECK", "REJECTED", {"daily_pnl": self.daily_pnl})
return False
# Check portfolio risk
if self.total_exposure >= self.risk_limits.max_portfolio_risk:
self.logger.risk("PORTFOLIO_RISK_CHECK", "REJECTED", {"exposure": self.total_exposure})
return False
# Check maximum drawdown
if self.max_drawdown >= self.risk_limits.max_drawdown:
self.logger.risk("DRAWDOWN_CHECK", "REJECTED", {"drawdown": self.max_drawdown})
return False
# Check signal strength
if signal.strength < 0.5:
self.logger.risk("SIGNAL_STRENGTH_CHECK", "REJECTED", {"strength": signal.strength})
return False
self.logger.risk("SIGNAL_CHECK", "PASSED", {"signal_type": signal.signal_type})
return True
except Exception as e:
self.logger.error(f"❌ Error in signal risk check: {e}")
return False
async def calculate_position_size(self, signal) -> float:
"""
Calculate position size based on risk parameters
Args:
signal: Trading signal
Returns:
Position size
"""
try:
# Base position size
base_size = self.risk_limits.max_position_size
# Adjust based on signal strength
strength_multiplier = max(0.5, min(1.0, signal.strength))
# Adjust based on volatility (if available)
volatility_multiplier = 1.0
if hasattr(signal, 'indicators') and signal.indicators:
volatility_ratio = signal.indicators.get('volatility_ratio', 1.0)
if volatility_ratio > 1.5: # High volatility
volatility_multiplier = 0.7
elif volatility_ratio < 0.8: # Low volatility
volatility_multiplier = 1.2
# Calculate final position size
position_size = base_size * strength_multiplier * volatility_multiplier
# Ensure minimum viable size
min_size = 0.001
position_size = max(min_size, position_size)
# Ensure doesn't exceed maximum
position_size = min(position_size, self.risk_limits.max_position_size)
self.logger.risk("POSITION_SIZE_CALC", "CALCULATED", {
"base_size": base_size,
"strength_multiplier": strength_multiplier,
"volatility_multiplier": volatility_multiplier,
"final_size": position_size
})
return position_size
except Exception as e:
self.logger.error(f"❌ Error calculating position size: {e}")
return self.risk_limits.max_position_size * 0.5 # Conservative fallback
async def check_position_exit(self, position, current_price: float) -> Tuple[bool, str]:
"""
Check if position should be closed
Args:
position: Current position
current_price: Current market price
Returns:
Tuple of (should_close, reason)
"""
try:
entry_price = position.entry_price
side = position.side
# Calculate PnL percentage
if side == 'long':
pnl_percent = ((current_price - entry_price) / entry_price) * 100
else: # short
pnl_percent = ((entry_price - current_price) / entry_price) * 100
# Check stop loss
if pnl_percent <= -self.risk_limits.stop_loss_percent:
return True, f"STOP_LOSS_TRIGGERED: {pnl_percent:.2f}%"
# Check take profit
if pnl_percent >= self.risk_limits.take_profit_percent:
return True, f"TAKE_PROFIT_TRIGGERED: {pnl_percent:.2f}%"
# Check time-based exit (optional)
if hasattr(position, 'created_at'):
position_age = datetime.now() - position.created_at
if position_age > timedelta(hours=24): # 24 hour max hold
return True, f"TIME_EXIT: Position held for {position_age}"
return False, ""
except Exception as e:
self.logger.error(f"❌ Error checking position exit: {e}")
return False, ""
async def update_daily_metrics(self, trade_pnl: float, trade_result: str):
"""
Update daily risk metrics
Args:
trade_pnl: Trade PnL
trade_result: Trade result ('win' or 'loss')
"""
try:
# Reset daily metrics if new day
current_date = datetime.now().date()
if current_date != self.last_reset:
self.daily_pnl = 0.0
self.daily_trades = 0
self.daily_losses = 0
self.last_reset = current_date
self.logger.info("📅 Daily metrics reset")
# Update metrics
self.daily_pnl += trade_pnl
self.daily_trades += 1
if trade_result == 'loss':
self.daily_losses += 1
# Check if daily loss limit reached
if self.daily_pnl <= -self.risk_limits.max_daily_loss:
await self.halt_trading("Daily loss limit exceeded")
# Log daily metrics
self.logger.risk("DAILY_METRICS", "UPDATED", {
"daily_pnl": self.daily_pnl,
"daily_trades": self.daily_trades,
"daily_losses": self.daily_losses,
"win_rate": ((self.daily_trades - self.daily_losses) / max(1, self.daily_trades)) * 100
})
except Exception as e:
self.logger.error(f"❌ Error updating daily metrics: {e}")
async def update_portfolio_metrics(self, portfolio_value: float):
"""
Update portfolio-level risk metrics
Args:
portfolio_value: Current portfolio value
"""
try:
# Update peak portfolio value
if portfolio_value > self.peak_portfolio_value:
self.peak_portfolio_value = portfolio_value
# Calculate current drawdown
if self.peak_portfolio_value > 0:
current_drawdown = ((self.peak_portfolio_value - portfolio_value) / self.peak_portfolio_value) * 100
self.max_drawdown = max(self.max_drawdown, current_drawdown)
# Check maximum drawdown limit
if self.max_drawdown >= self.risk_limits.max_drawdown:
await self.halt_trading(f"Maximum drawdown exceeded: {self.max_drawdown:.2f}%")
self.logger.risk("PORTFOLIO_METRICS", "UPDATED", {
"portfolio_value": portfolio_value,
"peak_value": self.peak_portfolio_value,
"current_drawdown": current_drawdown if self.peak_portfolio_value > 0 else 0,
"max_drawdown": self.max_drawdown
})
except Exception as e:
self.logger.error(f"❌ Error updating portfolio metrics: {e}")
async def halt_trading(self, reason: str):
"""
Halt trading due to risk limits
Args:
reason: Reason for halting trading
"""
try:
self.trading_halted = True
self.halt_reason = reason
self.logger.critical(f"🚨 TRADING HALTED: {reason}")
# Could add additional emergency actions here:
# - Send notifications
# - Close all positions
# - Cancel all orders
except Exception as e:
self.logger.error(f"❌ Error halting trading: {e}")
async def resume_trading(self):
"""Resume trading after manual review"""
try:
self.trading_halted = False
self.halt_reason = ""
self.logger.info("✅ Trading resumed")
except Exception as e:
self.logger.error(f"❌ Error resuming trading: {e}")
def get_stop_loss_price(self, entry_price: float, side: str) -> float:
"""
Calculate stop loss price
Args:
entry_price: Position entry price
side: Position side ('long' or 'short')
Returns:
Stop loss price
"""
try:
if side == 'long':
return entry_price * (1 - self.risk_limits.stop_loss_percent / 100)
else: # short
return entry_price * (1 + self.risk_limits.stop_loss_percent / 100)
except Exception as e:
self.logger.error(f"❌ Error calculating stop loss price: {e}")
return entry_price
def get_take_profit_price(self, entry_price: float, side: str) -> float:
"""
Calculate take profit price
Args:
entry_price: Position entry price
side: Position side ('long' or 'short')
Returns:
Take profit price
"""
try:
if side == 'long':
return entry_price * (1 + self.risk_limits.take_profit_percent / 100)
else: # short
return entry_price * (1 - self.risk_limits.take_profit_percent / 100)
except Exception as e:
self.logger.error(f"❌ Error calculating take profit price: {e}")
return entry_price
def calculate_risk_reward_ratio(self, entry_price: float, stop_loss: float, take_profit: float) -> float:
"""
Calculate risk-reward ratio
Args:
entry_price: Entry price
stop_loss: Stop loss price
take_profit: Take profit price
Returns:
Risk-reward ratio
"""
try:
risk = abs(entry_price - stop_loss)
reward = abs(take_profit - entry_price)
if risk > 0:
return reward / risk
else:
return 0.0
except Exception as e:
self.logger.error(f"❌ Error calculating risk-reward ratio: {e}")
return 0.0
def get_risk_metrics(self) -> Dict[str, Any]:
"""Get current risk metrics"""
return {
'daily_pnl': self.daily_pnl,
'daily_trades': self.daily_trades,
'daily_losses': self.daily_losses,
'max_drawdown': self.max_drawdown,
'total_exposure': self.total_exposure,
'trading_halted': self.trading_halted,
'halt_reason': self.halt_reason,
'risk_limits': {
'max_position_size': self.risk_limits.max_position_size,
'max_daily_loss': self.risk_limits.max_daily_loss,
'stop_loss_percent': self.risk_limits.stop_loss_percent,
'take_profit_percent': self.risk_limits.take_profit_percent
}
}
def is_trading_allowed(self) -> bool:
"""Check if trading is allowed"""
return not self.trading_halted and self.daily_pnl > -self.risk_limits.max_daily_loss
def get_position_limits(self) -> Dict[str, float]:
"""Get position size limits"""
return {
'max_position_size': self.risk_limits.max_position_size,
'current_exposure': self.total_exposure,
'remaining_capacity': max(0, self.risk_limits.max_portfolio_risk - self.total_exposure)
}

View file

@ -0,0 +1,455 @@
"""
WebSocket Manager for Binance Real-Time Data
Handles WebSocket connections, reconnection logic, and data streaming
"""
import asyncio
import json
import time
import websockets
from datetime import datetime
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass
from .config import Config
from .logger import TradingLogger
@dataclass
class WebSocketConfig:
"""WebSocket configuration"""
url: str
streams: List[str]
reconnect_delay: int = 5
ping_interval: int = 20
timeout: int = 30
class WebSocketManager:
"""
Manages WebSocket connections to Binance for real-time data
"""
def __init__(self, config: Config, logger: TradingLogger, message_handler: Callable[[Dict], None]):
self.config = config
self.logger = logger
self.message_handler = message_handler
# WebSocket connection
self.websocket: Optional[websockets.WebSocketServerProtocol] = None
self.connection_url: str = ""
self.streams: List[str] = []
# Connection state
self.running = False
self.connected = False
self.reconnect_count = 0
self.last_ping = time.time()
self.last_message = time.time()
# Statistics
self.messages_received = 0
self.connection_start = None
self.total_downtime = 0
# Setup streams
self._setup_streams()
# Tasks
self.connection_task: Optional[asyncio.Task] = None
self.ping_task: Optional[asyncio.Task] = None
self.heartbeat_task: Optional[asyncio.Task] = None
def _setup_streams(self):
"""Setup WebSocket streams based on configuration"""
symbol = self.config.SYMBOL.lower()
# Price streams
self.streams.extend([
f"{symbol}@ticker", # 24hr ticker statistics
f"{symbol}@trade", # Trade stream
f"{symbol}@kline_{self.config.TIMEFRAME}", # Kline/candlestick
f"{symbol}@depth", # Order book depth
])
# Additional streams based on config
if self.config.ENABLE_VOLUME_ANALYSIS:
self.streams.append(f"{symbol}@depth20@100ms") # Faster depth updates
# Build connection URL
base_url = "wss://stream.binance.com:9443"
if self.config.BINANCE_TESTNET:
base_url = "wss://testnet.binance.vision"
stream_names = "/".join(self.streams)
self.connection_url = f"{base_url}/stream?streams={stream_names}"
self.logger.info(f"📡 WebSocket streams configured: {len(self.streams)} streams")
self.logger.debug(f"🔗 Connection URL: {self.connection_url}")
async def start(self):
"""Start WebSocket connection"""
if self.running:
self.logger.warning("⚠️ WebSocket already running")
return
self.running = True
self.logger.info("🚀 Starting WebSocket connection...")
# Start connection task
self.connection_task = asyncio.create_task(self._connection_loop())
# Start heartbeat monitoring
self.heartbeat_task = asyncio.create_task(self._heartbeat_loop())
# Wait for initial connection
await self._wait_for_connection()
self.logger.info("✅ WebSocket connection established")
async def stop(self):
"""Stop WebSocket connection"""
if not self.running:
return
self.logger.info("🛑 Stopping WebSocket connection...")
self.running = False
# Cancel tasks
if self.connection_task:
self.connection_task.cancel()
if self.ping_task:
self.ping_task.cancel()
if self.heartbeat_task:
self.heartbeat_task.cancel()
# Close WebSocket
if self.websocket:
await self.websocket.close()
self.websocket = None
self.connected = False
self.logger.info("✅ WebSocket connection stopped")
async def _connection_loop(self):
"""Main connection loop with reconnection logic"""
while self.running:
try:
await self._connect()
await self._message_loop()
except websockets.exceptions.ConnectionClosed:
self.logger.warning("🔌 WebSocket connection closed")
self.connected = False
await self._handle_reconnection()
except Exception as e:
self.logger.error(f"❌ WebSocket error: {e}")
self.connected = False
await self._handle_reconnection()
async def _connect(self):
"""Establish WebSocket connection"""
try:
self.logger.info("🔌 Connecting to Binance WebSocket...")
# Create connection
self.websocket = await websockets.connect(
self.connection_url,
ping_interval=self.config.WS_PING_INTERVAL,
ping_timeout=self.config.WS_TIMEOUT,
close_timeout=10
)
self.connected = True
self.connection_start = time.time()
self.last_message = time.time()
self.logger.websocket("CONNECTED", {"url": self.connection_url})
# Start ping task
self.ping_task = asyncio.create_task(self._ping_loop())
except Exception as e:
self.logger.error(f"❌ Connection failed: {e}")
raise
async def _message_loop(self):
"""Main message processing loop"""
while self.running and self.connected:
try:
# Receive message with timeout
message = await asyncio.wait_for(
self.websocket.recv(),
timeout=self.config.WS_TIMEOUT
)
# Process message
await self._process_message(message)
# Update statistics
self.messages_received += 1
self.last_message = time.time()
except asyncio.TimeoutError:
self.logger.warning("⏰ WebSocket message timeout")
break
except websockets.exceptions.ConnectionClosed:
self.logger.warning("🔌 WebSocket connection closed during message loop")
break
except Exception as e:
self.logger.error(f"❌ Error in message loop: {e}")
break
async def _process_message(self, raw_message: str):
"""Process incoming WebSocket message"""
try:
# Parse JSON
data = json.loads(raw_message)
# Handle different message types
if 'stream' in data and 'data' in data:
stream_name = data['stream']
stream_data = data['data']
# Process based on stream type
if '@ticker' in stream_name:
await self._handle_ticker_data(stream_data)
elif '@trade' in stream_name:
await self._handle_trade_data(stream_data)
elif '@kline' in stream_name:
await self._handle_kline_data(stream_data)
elif '@depth' in stream_name:
await self._handle_depth_data(stream_data)
else:
self.logger.debug(f"📦 Unhandled stream: {stream_name}")
else:
self.logger.debug(f"📦 Unknown message format: {data}")
except json.JSONDecodeError as e:
self.logger.error(f"❌ JSON decode error: {e}")
self.logger.debug(f"Raw message: {raw_message}")
except Exception as e:
self.logger.error(f"❌ Error processing message: {e}")
async def _handle_ticker_data(self, data: Dict[str, Any]):
"""Handle 24hr ticker data"""
try:
processed_data = {
'type': 'ticker',
'symbol': data['s'],
'price': float(data['c']),
'volume': float(data['v']),
'price_change': float(data['P']),
'high': float(data['h']),
'low': float(data['l']),
'timestamp': int(data['E']) / 1000, # Convert to seconds
}
# Send to message handler
await self.message_handler(processed_data)
self.logger.debug(f"📊 Ticker: {processed_data['symbol']} @ {processed_data['price']}")
except Exception as e:
self.logger.error(f"❌ Error handling ticker data: {e}")
async def _handle_trade_data(self, data: Dict[str, Any]):
"""Handle individual trade data"""
try:
processed_data = {
'type': 'trade',
'symbol': data['s'],
'price': float(data['p']),
'quantity': float(data['q']),
'trade_time': int(data['T']) / 1000,
'is_buyer_maker': data['m'],
'timestamp': int(data['E']) / 1000,
}
# Send to message handler
await self.message_handler(processed_data)
self.logger.debug(f"💰 Trade: {processed_data['quantity']} {processed_data['symbol']} @ {processed_data['price']}")
except Exception as e:
self.logger.error(f"❌ Error handling trade data: {e}")
async def _handle_kline_data(self, data: Dict[str, Any]):
"""Handle candlestick/kline data"""
try:
kline = data['k']
# Only process closed candles
if not kline['x']: # x = is_closed
return
processed_data = {
'type': 'kline',
'symbol': kline['s'],
'open': float(kline['o']),
'high': float(kline['h']),
'low': float(kline['l']),
'close': float(kline['c']),
'volume': float(kline['v']),
'trades': int(kline['n']),
'interval': kline['i'],
'open_time': int(kline['t']) / 1000,
'close_time': int(kline['T']) / 1000,
'timestamp': int(data['E']) / 1000,
}
# Send to message handler
await self.message_handler(processed_data)
self.logger.debug(f"📈 Kline: {processed_data['symbol']} OHLC: {processed_data['open']:.2f}/{processed_data['high']:.2f}/{processed_data['low']:.2f}/{processed_data['close']:.2f}")
except Exception as e:
self.logger.error(f"❌ Error handling kline data: {e}")
async def _handle_depth_data(self, data: Dict[str, Any]):
"""Handle order book depth data"""
try:
processed_data = {
'type': 'depth',
'symbol': data['s'],
'bids': [[float(bid[0]), float(bid[1])] for bid in data['b']],
'asks': [[float(ask[0]), float(ask[1])] for ask in data['a']],
'timestamp': int(data['E']) / 1000,
}
# Send to message handler
await self.message_handler(processed_data)
if processed_data['bids'] and processed_data['asks']:
best_bid = processed_data['bids'][0][0]
best_ask = processed_data['asks'][0][0]
spread = best_ask - best_bid
self.logger.debug(f"📊 Depth: {processed_data['symbol']} Bid: {best_bid}, Ask: {best_ask}, Spread: {spread:.4f}")
except Exception as e:
self.logger.error(f"❌ Error handling depth data: {e}")
async def _ping_loop(self):
"""Send periodic ping to keep connection alive"""
while self.running and self.connected:
try:
await asyncio.sleep(self.config.WS_PING_INTERVAL)
if self.websocket:
await self.websocket.ping()
self.last_ping = time.time()
self.logger.debug("🏓 Ping sent")
except Exception as e:
self.logger.warning(f"⚠️ Ping failed: {e}")
break
async def _heartbeat_loop(self):
"""Monitor connection health"""
while self.running:
try:
current_time = time.time()
# Check for stale connection
if self.connected and current_time - self.last_message > 60: # 1 minute
self.logger.warning("💔 Connection appears stale - no messages received")
await self.reconnect()
# Check ping timeout
if self.connected and current_time - self.last_ping > self.config.WS_PING_INTERVAL * 2:
self.logger.warning("💔 Ping timeout - connection may be dead")
await self.reconnect()
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
self.logger.error(f"❌ Error in heartbeat loop: {e}")
await asyncio.sleep(10)
async def _handle_reconnection(self):
"""Handle reconnection with exponential backoff"""
if not self.running:
return
self.reconnect_count += 1
delay = min(self.config.WS_RECONNECT_DELAY * (2 ** (self.reconnect_count - 1)), 60)
self.logger.info(f"🔄 Reconnecting in {delay}s (attempt #{self.reconnect_count})")
await asyncio.sleep(delay)
if self.running:
self.logger.info("🔄 Attempting to reconnect...")
async def reconnect(self):
"""Force reconnection"""
self.logger.info("🔄 Forcing WebSocket reconnection...")
# Close current connection
if self.websocket:
await self.websocket.close()
self.websocket = None
self.connected = False
# Cancel ping task
if self.ping_task:
self.ping_task.cancel()
self.ping_task = None
async def _wait_for_connection(self, timeout: int = 30):
"""Wait for WebSocket connection to be established"""
start_time = time.time()
while time.time() - start_time < timeout:
if self.connected:
return
await asyncio.sleep(0.1)
raise TimeoutError("WebSocket connection timeout")
def get_statistics(self) -> Dict[str, Any]:
"""Get connection statistics"""
uptime = time.time() - self.connection_start if self.connection_start else 0
return {
'connected': self.connected,
'running': self.running,
'messages_received': self.messages_received,
'reconnect_count': self.reconnect_count,
'uptime': uptime,
'streams': len(self.streams),
'last_message_age': time.time() - self.last_message if self.last_message else 0
}
def is_healthy(self) -> bool:
"""Check if connection is healthy"""
if not self.connected:
return False
# Check if we're receiving messages
if time.time() - self.last_message > 60: # No messages for 1 minute
return False
return True

View file

@ -0,0 +1,224 @@
version: '3.8'
services:
trading-bot:
build:
context: .
dockerfile: Dockerfile
target: production
container_name: binance-trading-bot
restart: unless-stopped
environment:
# Binance API Configuration
BINANCE_API_KEY: ${BINANCE_API_KEY:-}
BINANCE_SECRET_KEY: ${BINANCE_SECRET_KEY:-}
BINANCE_TESTNET: ${BINANCE_TESTNET:-true}
# Trading Configuration
TRADING_SYMBOL: ${TRADING_SYMBOL:-BTCUSDT}
POSITION_SIZE: ${POSITION_SIZE:-0.001}
LEVERAGE: ${LEVERAGE:-1}
MAX_POSITIONS: ${MAX_POSITIONS:-1}
# Risk Management
STOP_LOSS_PERCENT: ${STOP_LOSS_PERCENT:-2.0}
TAKE_PROFIT_PERCENT: ${TAKE_PROFIT_PERCENT:-3.0}
MAX_DAILY_LOSS: ${MAX_DAILY_LOSS:-5.0}
MAX_PORTFOLIO_RISK: ${MAX_PORTFOLIO_RISK:-10.0}
# Technical Indicators
BOLLINGER_PERIOD: ${BOLLINGER_PERIOD:-20}
BOLLINGER_STD: ${BOLLINGER_STD:-2.0}
MA_PERIOD: ${MA_PERIOD:-50}
MA_TYPE: ${MA_TYPE:-SMA}
# Trading Strategy
STRATEGY: ${STRATEGY:-BOLLINGER_MA}
TIMEFRAME: ${TIMEFRAME:-1m}
LOOKBACK_PERIODS: ${LOOKBACK_PERIODS:-100}
# WebSocket Configuration
WS_RECONNECT_DELAY: ${WS_RECONNECT_DELAY:-5}
WS_PING_INTERVAL: ${WS_PING_INTERVAL:-20}
WS_TIMEOUT: ${WS_TIMEOUT:-30}
# Logging Configuration
LOG_LEVEL: ${LOG_LEVEL:-INFO}
LOG_FILE: ${LOG_FILE:-logs/trading_bot.log}
LOG_MAX_SIZE: ${LOG_MAX_SIZE:-10485760}
LOG_BACKUP_COUNT: ${LOG_BACKUP_COUNT:-5}
# Database Configuration
DATABASE_URL: ${DATABASE_URL:-sqlite:///data/trading_bot.db}
TRACK_PERFORMANCE: ${TRACK_PERFORMANCE:-true}
# Notification Configuration
TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN:-}
TELEGRAM_CHAT_ID: ${TELEGRAM_CHAT_ID:-}
ENABLE_NOTIFICATIONS: ${ENABLE_NOTIFICATIONS:-false}
# Safety Features
PAPER_TRADING: ${PAPER_TRADING:-true}
ENABLE_TRADING: ${ENABLE_TRADING:-false}
TRADING_HOURS_START: ${TRADING_HOURS_START:-00:00}
TRADING_HOURS_END: ${TRADING_HOURS_END:-23:59}
# Performance Monitoring
PERFORMANCE_UPDATE_INTERVAL: ${PERFORMANCE_UPDATE_INTERVAL:-60}
SAVE_TRADES_TO_FILE: ${SAVE_TRADES_TO_FILE:-true}
TRADES_FILE_PATH: ${TRADES_FILE_PATH:-data/trades.json}
# Advanced Features
ENABLE_VOLUME_ANALYSIS: ${ENABLE_VOLUME_ANALYSIS:-false}
VOLUME_THRESHOLD: ${VOLUME_THRESHOLD:-1.5}
ENABLE_SENTIMENT_ANALYSIS: ${ENABLE_SENTIMENT_ANALYSIS:-false}
# Timezone
TZ: ${TZ:-UTC}
volumes:
# Persistent data storage
- ./data:/app/data
- ./logs:/app/logs
# Configuration files (optional)
- ./.env:/app/.env:ro
# Backup location (optional)
- ./backups:/app/backups
ports:
# Monitoring port (optional)
- "${MONITORING_PORT:-8000}:8000"
networks:
- trading-network
# Resource limits
deploy:
resources:
limits:
cpus: '1.0'
memory: 512M
reservations:
cpus: '0.5'
memory: 256M
# Health check
healthcheck:
test: ["CMD", "python", "-c", "import os; exit(0 if os.path.exists('/app/logs/trading_bot.log') else 1)"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
# Logging configuration
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# Optional: Database service for advanced analytics
database:
image: postgres:15-alpine
container_name: trading-bot-db
restart: unless-stopped
environment:
POSTGRES_DB: ${POSTGRES_DB:-trading_bot}
POSTGRES_USER: ${POSTGRES_USER:-trading_user}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-trading_password}
POSTGRES_HOST_AUTH_METHOD: ${POSTGRES_HOST_AUTH_METHOD:-scram-sha-256}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./sql:/docker-entrypoint-initdb.d:ro
ports:
- "${POSTGRES_PORT:-5432}:5432"
networks:
- trading-network
# Resource limits
deploy:
resources:
limits:
cpus: '0.5'
memory: 256M
reservations:
cpus: '0.25'
memory: 128M
# Health check
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-trading_user} -d ${POSTGRES_DB:-trading_bot}"]
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
# Optional: Monitoring service
monitoring:
image: prom/prometheus:latest
container_name: trading-bot-monitoring
restart: unless-stopped
ports:
- "${PROMETHEUS_PORT:-9090}:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus_data:/prometheus
networks:
- trading-network
depends_on:
- trading-bot
profiles:
- monitoring
# Optional: Grafana for visualization
grafana:
image: grafana/grafana:latest
container_name: trading-bot-grafana
restart: unless-stopped
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
ports:
- "${GRAFANA_PORT:-3000}:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards:ro
- ./monitoring/grafana/datasources:/etc/grafana/provisioning/datasources:ro
networks:
- trading-network
depends_on:
- monitoring
profiles:
- monitoring
networks:
trading-network:
driver: bridge
name: trading-network
volumes:
postgres_data:
driver: local
prometheus_data:
driver: local
grafana_data:
driver: local

183
trading_bot/env_example Normal file
View file

@ -0,0 +1,183 @@
# Binance Trading Bot Configuration
# Copy this file to .env and update with your actual values
# ================================
# BINANCE API CONFIGURATION
# ================================
# Get your API keys from: https://www.binance.com/en/my/settings/api-management
BINANCE_API_KEY=your_binance_api_key_here
BINANCE_SECRET_KEY=your_binance_secret_key_here
# Use testnet for safe testing (highly recommended for beginners)
BINANCE_TESTNET=true
# ================================
# TRADING CONFIGURATION
# ================================
# Trading symbol (e.g., BTCUSDT, ETHUSDT, ADAUSDT)
TRADING_SYMBOL=BTCUSDT
# Position size in base currency (e.g., 0.001 BTC for BTCUSDT)
POSITION_SIZE=0.001
# Leverage (1 = spot trading, >1 = futures trading)
LEVERAGE=1
# Maximum concurrent positions
MAX_POSITIONS=1
# ================================
# RISK MANAGEMENT
# ================================
# Stop loss percentage (e.g., 2.0 = 2% loss)
STOP_LOSS_PERCENT=2.0
# Take profit percentage (e.g., 3.0 = 3% profit)
TAKE_PROFIT_PERCENT=3.0
# Maximum daily loss percentage
MAX_DAILY_LOSS=5.0
# Maximum portfolio risk percentage
MAX_PORTFOLIO_RISK=10.0
# ================================
# TECHNICAL INDICATORS
# ================================
# Bollinger Bands period (typically 20)
BOLLINGER_PERIOD=20
# Bollinger Bands standard deviation (typically 2.0)
BOLLINGER_STD=2.0
# Moving Average period (typically 50)
MA_PERIOD=50
# Moving Average type (SMA or EMA)
MA_TYPE=SMA
# ================================
# TRADING STRATEGY
# ================================
# Strategy type: BOLLINGER_MA, BOLLINGER_ONLY, MA_CROSSOVER, MEAN_REVERSION
STRATEGY=BOLLINGER_MA
# Timeframe for analysis (1m, 3m, 5m, 15m, 30m, 1h, 4h, 1d)
TIMEFRAME=1m
# Number of historical periods to analyze
LOOKBACK_PERIODS=100
# ================================
# WEBSOCKET CONFIGURATION
# ================================
# WebSocket reconnection delay in seconds
WS_RECONNECT_DELAY=5
# WebSocket ping interval in seconds
WS_PING_INTERVAL=20
# WebSocket timeout in seconds
WS_TIMEOUT=30
# ================================
# LOGGING CONFIGURATION
# ================================
# Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_LEVEL=INFO
# Log file path
LOG_FILE=logs/trading_bot.log
# Maximum log file size in bytes (10MB)
LOG_MAX_SIZE=10485760
# Number of backup log files to keep
LOG_BACKUP_COUNT=5
# ================================
# DATABASE CONFIGURATION
# ================================
# Database URL for performance tracking
DATABASE_URL=sqlite:///data/trading_bot.db
# Enable performance tracking
TRACK_PERFORMANCE=true
# ================================
# NOTIFICATION CONFIGURATION
# ================================
# Telegram bot token for notifications (optional)
TELEGRAM_BOT_TOKEN=
# Telegram chat ID for notifications (optional)
TELEGRAM_CHAT_ID=
# Enable notifications
ENABLE_NOTIFICATIONS=false
# ================================
# SAFETY FEATURES
# ================================
# Paper trading mode (highly recommended for testing)
PAPER_TRADING=true
# Enable actual trading (set to false for safety)
ENABLE_TRADING=false
# Trading hours (24-hour format, e.g., 09:00-17:00)
TRADING_HOURS_START=00:00
TRADING_HOURS_END=23:59
# ================================
# PERFORMANCE MONITORING
# ================================
# Performance update interval in seconds
PERFORMANCE_UPDATE_INTERVAL=60
# Save trades to file
SAVE_TRADES_TO_FILE=true
# Trades file path
TRADES_FILE_PATH=data/trades.json
# ================================
# ADVANCED FEATURES
# ================================
# Enable volume analysis
ENABLE_VOLUME_ANALYSIS=false
# Volume threshold for analysis
VOLUME_THRESHOLD=1.5
# Enable sentiment analysis (experimental)
ENABLE_SENTIMENT_ANALYSIS=false
# ================================
# DOCKER CONFIGURATION
# ================================
# Monitoring port for health checks
MONITORING_PORT=8000
# Timezone for the container
TZ=UTC
# ================================
# DATABASE CONFIGURATION (Optional)
# ================================
# PostgreSQL configuration for advanced analytics
POSTGRES_DB=trading_bot
POSTGRES_USER=trading_user
POSTGRES_PASSWORD=secure_password_here
POSTGRES_PORT=5432
POSTGRES_HOST_AUTH_METHOD=scram-sha-256
# ================================
# MONITORING CONFIGURATION (Optional)
# ================================
# Prometheus port
PROMETHEUS_PORT=9090
# Grafana port and password
GRAFANA_PORT=3000
GRAFANA_PASSWORD=admin_password_here

145
trading_bot/main.py Normal file
View file

@ -0,0 +1,145 @@
#!/usr/bin/env python3
"""
Binance Trading Bot - Main Entry Point
Advanced crypto trading bot with WebSocket connection, technical indicators,
and automated trading capabilities.
Features:
- Real-time Binance WebSocket connection
- Bollinger Bands & Moving Average indicators
- Automated buy/sell execution
- Risk management with stop-loss/take-profit
- Performance tracking system
- Docker deployment ready
- Environment variable configuration
"""
import asyncio
import logging
import signal
import sys
from datetime import datetime
from typing import Dict, Any
from bot_core.bot import TradingBot
from bot_core.config import Config
from bot_core.logger import setup_logger
class BotManager:
"""Main bot manager that handles lifecycle and signals"""
def __init__(self):
self.bot: TradingBot = None
self.config: Config = None
self.logger = None
self.running = False
async def initialize(self):
"""Initialize bot configuration and components"""
try:
# Load configuration
self.config = Config()
# Setup logging
self.logger = setup_logger(
name="TradingBot",
level=self.config.LOG_LEVEL,
log_file=self.config.LOG_FILE
)
self.logger.info("=" * 60)
self.logger.info("🚀 BINANCE TRADING BOT STARTING")
self.logger.info("=" * 60)
# Log configuration
self.logger.info(f"Trading Symbol: {self.config.SYMBOL}")
self.logger.info(f"Position Size: {self.config.POSITION_SIZE}")
self.logger.info(f"Risk Management: SL={self.config.STOP_LOSS_PERCENT}%, TP={self.config.TAKE_PROFIT_PERCENT}%")
self.logger.info(f"Bollinger Bands: Period={self.config.BOLLINGER_PERIOD}, STD={self.config.BOLLINGER_STD}")
self.logger.info(f"Moving Average: Period={self.config.MA_PERIOD}")
# Initialize trading bot
self.bot = TradingBot(self.config, self.logger)
await self.bot.initialize()
self.logger.info("✅ Bot initialization completed successfully")
except Exception as e:
if self.logger:
self.logger.error(f"❌ Failed to initialize bot: {e}")
else:
print(f"❌ Failed to initialize bot: {e}")
raise
async def start(self):
"""Start the trading bot"""
try:
self.running = True
self.logger.info("🎯 Starting trading bot...")
# Start bot
await self.bot.start()
# Keep running until stopped
while self.running:
await asyncio.sleep(1)
except Exception as e:
self.logger.error(f"❌ Bot encountered error: {e}")
await self.stop()
raise
async def stop(self):
"""Stop the trading bot gracefully"""
if not self.running:
return
self.logger.info("🛑 Stopping trading bot...")
self.running = False
if self.bot:
await self.bot.stop()
self.logger.info("✅ Bot stopped successfully")
def setup_signal_handlers(self):
"""Setup signal handlers for graceful shutdown"""
def signal_handler(signum, frame):
self.logger.info(f"📡 Received signal {signum}, initiating graceful shutdown...")
asyncio.create_task(self.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def main():
"""Main function"""
bot_manager = BotManager()
try:
# Setup signal handlers
bot_manager.setup_signal_handlers()
# Initialize and start bot
await bot_manager.initialize()
await bot_manager.start()
except KeyboardInterrupt:
print("\n🔴 KeyboardInterrupt received")
await bot_manager.stop()
except Exception as e:
print(f"❌ Critical error: {e}")
await bot_manager.stop()
sys.exit(1)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 Bot shutdown complete")
except Exception as e:
print(f"❌ Fatal error: {e}")
sys.exit(1)

View file

@ -0,0 +1,70 @@
# Trading Bot Requirements
# Core dependencies for automated crypto trading bot
# Exchange connectivity
ccxt>=4.0.0
websockets>=11.0.0
# Data processing and analysis
pandas>=2.0.0
numpy>=1.24.0
pandas-ta>=0.3.14b
# Async support
aiohttp>=3.8.0
asyncio-throttle>=1.0.0
# Configuration management
python-dotenv>=1.0.0
# Database support
sqlalchemy>=2.0.0
aiosqlite>=0.19.0
# Logging and monitoring
colorlog>=6.7.0
# Mathematical computations
scipy>=1.10.0
scikit-learn>=1.3.0
# Time and date handling
pytz>=2023.3
# Data validation
pydantic>=2.0.0
# HTTP requests
requests>=2.31.0
# JSON handling
ujson>=5.8.0
# Testing (optional)
pytest>=7.4.0
pytest-asyncio>=0.21.0
# Performance monitoring
psutil>=5.9.0
# Notifications (optional)
python-telegram-bot>=20.0.0
# Cryptographic functions
cryptography>=41.0.0
# Environment and system
python-environ>=0.4.0
# Development tools (optional)
black>=23.0.0
flake8>=6.0.0
mypy>=1.5.0
# Jupyter notebook support (optional)
jupyter>=1.0.0
ipython>=8.0.0
# Additional utilities
tqdm>=4.65.0
rich>=13.0.0