1
Fork 0
crypto_bot_training/Session_05/bots/Stochastic_RSI_Nadarya.py
2025-06-26 15:46:49 +02:00

72 lines
No EOL
2.3 KiB
Python

'''
Stochastic RSI + Nadarya Strategy Overview:
Entry Signals:
- LONG: Nadarya buy signal OR Stochastic RSI shows oversold (below 10)
- SHORT: Nadarya sell signal OR Stochastic RSI shows overbought (above 90)
Exit Signals:
- Close LONG: Nadarya sell signal OR Stochastic RSI overbought 2+ times
- Close SHORT: Nadarya buy signal OR Stochastic RSI oversold 2+ times
Uses dual confirmation system combining trend-following (Nadarya) with
momentum oscillator (Stochastic RSI) for improved entry/exit timing.
WARNING: Do not run without thorough backtesting and understanding!
'''
import ccxt, schedule
from Functions.functions import *
import time
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BINANCE_API_KEY')
api_secret = os.getenv('BINANCE_SECRET_KEY')
exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True,
'options': {
'defaultType': 'future',
}
})
rsi_targets = [10, 90]
rsi_window = 14
timeframe = '1h'
symbol = 'ETHUSDT'
size = 1000
params = {'timeInForce': 'GTC'}
def bot():
position_info, in_position, long = get_position(exchange, symbol)
candles = get_candle_df(exchange, symbol, timeframe)
nadarya_buy_signal, nadarya_sell_signal = calc_nadarya(candles)
calc_stoch_rsi(candles)
bid = exchange.fetch_ticker(symbol)['bid']
if not in_position:
if nadarya_buy_signal or is_oversold(candles['stoch_rsi'], rsi_window, 1, rsi_targets[0]):
exchange.create_limit_buy_order(symbol, size, bid, params)
elif nadarya_sell_signal or is_overbought(candles['stoch_rsi'], rsi_window, 1, rsi_targets[1]):
exchange.create_limit_sell_order(symbol, size, bid, params)
elif in_position:
if long:
if nadarya_sell_signal or is_overbought(candles['stoch_rsi'], rsi_window, times=2, target=rsi_targets[1]):
close_position(exchange, symbol)
else:
if nadarya_buy_signal or is_oversold(candles['stoch_rsi'], rsi_window, times=2, target=rsi_targets[0]):
close_position(exchange, symbol)
schedule.every(1).seconds.do(bot)
while True:
try:
schedule.run_pending()
except Exception as e:
print(f'ERROR RUNNING BOT: {e}. Sleeping 30 seconds...')
time.sleep(30)