Feed monitor Telegram alerts, Market Watch portal, data source connectors
This commit is contained in:
771
projects/crypto-signals/scripts/leverage_backtester.py
Normal file
771
projects/crypto-signals/scripts/leverage_backtester.py
Normal file
@ -0,0 +1,771 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Leverage Strategy Backtester
|
||||
Replays the exact same signal logic (RSI, VWAP, MACD, Bollinger, volume)
|
||||
against historical candle data to validate the live leverage trader.
|
||||
|
||||
Usage:
|
||||
python3 leverage_backtester.py # Default: 6 months, all coins
|
||||
python3 leverage_backtester.py --months 12 # 12 months
|
||||
python3 leverage_backtester.py --coins BTC,ETH # Specific coins
|
||||
python3 leverage_backtester.py --optimize # Grid search params
|
||||
"""
|
||||
|
||||
import json
|
||||
import math
|
||||
import time
|
||||
import argparse
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
BINANCE_KLINES = "https://api.binance.us/api/v3/klines"
|
||||
|
||||
COINS = [
|
||||
"BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT",
|
||||
"ADAUSDT", "AVAXUSDT", "LINKUSDT", "DOTUSDT", "MATICUSDT",
|
||||
"NEARUSDT", "ATOMUSDT", "LTCUSDT", "UNIUSDT", "AAVEUSDT",
|
||||
"FILUSDT", "ALGOUSDT", "XLMUSDT", "VETUSDT", "ICPUSDT",
|
||||
]
|
||||
|
||||
RESULTS_DIR = Path(__file__).parent.parent / "data" / "backtest-results"
|
||||
CACHE_DIR = Path(__file__).parent.parent / "data" / "kline-cache"
|
||||
|
||||
|
||||
# ── Indicator Calculations (identical to short_scanner.py) ──
|
||||
|
||||
def calc_rsi(closes, period=14):
|
||||
if len(closes) < period + 1:
|
||||
return 50
|
||||
deltas = [closes[i] - closes[i-1] for i in range(1, len(closes))]
|
||||
gains = [d if d > 0 else 0 for d in deltas]
|
||||
losses = [-d if d < 0 else 0 for d in deltas]
|
||||
avg_gain = sum(gains[:period]) / period
|
||||
avg_loss = sum(losses[:period]) / period
|
||||
for i in range(period, len(deltas)):
|
||||
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
|
||||
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
|
||||
if avg_loss == 0:
|
||||
return 100
|
||||
return round(100 - (100 / (1 + avg_gain / avg_loss)), 1)
|
||||
|
||||
|
||||
def calc_vwap(klines_window):
|
||||
cum_vol = cum_tp_vol = 0
|
||||
for k in klines_window:
|
||||
tp = (k['high'] + k['low'] + k['close']) / 3
|
||||
cum_vol += k['volume']
|
||||
cum_tp_vol += tp * k['volume']
|
||||
return cum_tp_vol / cum_vol if cum_vol else 0
|
||||
|
||||
|
||||
def calc_ema(values, period):
|
||||
if not values:
|
||||
return 0
|
||||
mult = 2 / (period + 1)
|
||||
ema = values[0]
|
||||
for v in values[1:]:
|
||||
ema = (v - ema) * mult + ema
|
||||
return ema
|
||||
|
||||
|
||||
def calc_macd(closes):
|
||||
if len(closes) < 26:
|
||||
return 0, 0, 0
|
||||
ema12 = calc_ema(closes, 12)
|
||||
ema26 = calc_ema(closes, 26)
|
||||
macd_line = ema12 - ema26
|
||||
signal = calc_ema(closes[-9:], 9) if len(closes) >= 9 else macd_line
|
||||
return macd_line, signal, macd_line - signal
|
||||
|
||||
|
||||
def calc_bollinger_position(closes, period=20):
|
||||
if len(closes) < period:
|
||||
return 0.5
|
||||
recent = closes[-period:]
|
||||
sma = sum(recent) / period
|
||||
std = (sum((x - sma)**2 for x in recent) / period) ** 0.5
|
||||
if std == 0:
|
||||
return 0.5
|
||||
upper = sma + 2 * std
|
||||
lower = sma - 2 * std
|
||||
bw = upper - lower
|
||||
return (closes[-1] - lower) / bw if bw else 0.5
|
||||
|
||||
|
||||
def calc_atr(klines, period=14):
|
||||
"""Average True Range — measures volatility."""
|
||||
if len(klines) < period + 1:
|
||||
return 0
|
||||
trs = []
|
||||
for i in range(1, len(klines)):
|
||||
h, l, pc = klines[i]['high'], klines[i]['low'], klines[i-1]['close']
|
||||
tr = max(h - l, abs(h - pc), abs(l - pc))
|
||||
trs.append(tr)
|
||||
if len(trs) < period:
|
||||
return sum(trs) / len(trs) if trs else 0
|
||||
atr = sum(trs[:period]) / period
|
||||
for i in range(period, len(trs)):
|
||||
atr = (atr * (period - 1) + trs[i]) / period
|
||||
return atr
|
||||
|
||||
|
||||
def calc_volatility_regime(klines, period=14):
|
||||
"""Returns ATR as % of price. Higher = more volatile = better for trading."""
|
||||
atr = calc_atr(klines, period)
|
||||
price = klines[-1]['close'] if klines else 1
|
||||
return (atr / price * 100) if price else 0
|
||||
|
||||
|
||||
def volume_trend(klines, lookback=10):
|
||||
if len(klines) < lookback * 2:
|
||||
return 1.0
|
||||
recent = sum(k['volume'] for k in klines[-lookback:]) / lookback
|
||||
older = sum(k['volume'] for k in klines[-lookback*2:-lookback]) / lookback
|
||||
return recent / older if older else 1.0
|
||||
|
||||
|
||||
# ── Signal Scoring (identical to leverage_trader.py) ──
|
||||
|
||||
def score_short(closes, klines):
|
||||
price = closes[-1]
|
||||
rsi = calc_rsi(closes)
|
||||
vwap = calc_vwap(klines[-24:])
|
||||
vwap_pct = ((price - vwap) / vwap * 100) if vwap else 0
|
||||
_, _, histogram = calc_macd(closes)
|
||||
bb_pos = calc_bollinger_position(closes)
|
||||
vol = volume_trend(klines)
|
||||
p24 = closes[-24] if len(closes) >= 24 else closes[0]
|
||||
chg24 = ((price - p24) / p24 * 100) if p24 else 0
|
||||
|
||||
score = 0
|
||||
if rsi >= 80: score += 30
|
||||
elif rsi >= 70: score += 25
|
||||
elif rsi >= 65: score += 15
|
||||
elif rsi >= 60: score += 5
|
||||
|
||||
if vwap_pct > 5: score += 20
|
||||
elif vwap_pct > 3: score += 15
|
||||
elif vwap_pct > 1: score += 8
|
||||
|
||||
if histogram < -0.001 * price: score += 15
|
||||
elif histogram < 0: score += 10
|
||||
|
||||
if bb_pos > 1.0: score += 15
|
||||
elif bb_pos > 0.85: score += 10
|
||||
|
||||
if chg24 > 15: score += 15
|
||||
elif chg24 > 8: score += 10
|
||||
elif chg24 > 4: score += 5
|
||||
|
||||
if chg24 > 2 and vol < 0.7: score += 5
|
||||
|
||||
return score, "short", rsi
|
||||
|
||||
|
||||
def score_long(closes, klines):
|
||||
price = closes[-1]
|
||||
rsi = calc_rsi(closes)
|
||||
vwap = calc_vwap(klines[-24:])
|
||||
vwap_pct = ((price - vwap) / vwap * 100) if vwap else 0
|
||||
bb_pos = calc_bollinger_position(closes)
|
||||
p24 = closes[-24] if len(closes) >= 24 else closes[0]
|
||||
chg24 = ((price - p24) / p24 * 100) if p24 else 0
|
||||
|
||||
score = 0
|
||||
if rsi <= 25: score += 30
|
||||
elif rsi <= 30: score += 25
|
||||
elif rsi <= 35: score += 15
|
||||
elif rsi <= 40: score += 5
|
||||
|
||||
if vwap_pct < -5: score += 20
|
||||
elif vwap_pct < -3: score += 15
|
||||
elif vwap_pct < -1: score += 8
|
||||
|
||||
if chg24 < -15: score += 15
|
||||
elif chg24 < -8: score += 10
|
||||
elif chg24 < -4: score += 5
|
||||
|
||||
if bb_pos < 0: score += 15
|
||||
elif bb_pos < 0.15: score += 10
|
||||
|
||||
return score, "long", rsi
|
||||
|
||||
|
||||
# ── Data Fetching with Cache ──
|
||||
|
||||
def fetch_klines_cached(symbol, interval, start_ms, end_ms):
|
||||
"""Fetch klines with local file cache to avoid re-downloading."""
|
||||
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
cache_file = CACHE_DIR / f"{symbol}_{interval}_{start_ms}_{end_ms}.json"
|
||||
|
||||
if cache_file.exists():
|
||||
return json.loads(cache_file.read_text())
|
||||
|
||||
print(f" Downloading {symbol} {interval} candles...")
|
||||
all_klines = []
|
||||
current = start_ms
|
||||
|
||||
while current < end_ms:
|
||||
url = f"{BINANCE_KLINES}?symbol={symbol}&interval={interval}&startTime={current}&endTime={end_ms}&limit=1000"
|
||||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
try:
|
||||
resp = urllib.request.urlopen(req, timeout=15)
|
||||
raw = json.loads(resp.read())
|
||||
if not raw:
|
||||
break
|
||||
for k in raw:
|
||||
all_klines.append({
|
||||
'open_time': k[0],
|
||||
'open': float(k[1]),
|
||||
'high': float(k[2]),
|
||||
'low': float(k[3]),
|
||||
'close': float(k[4]),
|
||||
'volume': float(k[5]),
|
||||
'close_time': k[6],
|
||||
})
|
||||
current = raw[-1][6] + 1
|
||||
time.sleep(0.12) # Rate limit
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
break
|
||||
|
||||
if all_klines:
|
||||
cache_file.write_text(json.dumps(all_klines))
|
||||
print(f" Got {len(all_klines)} candles")
|
||||
return all_klines
|
||||
|
||||
|
||||
# ── Position Simulation ──
|
||||
|
||||
@dataclass
|
||||
class Position:
|
||||
symbol: str
|
||||
direction: str # "long" or "short"
|
||||
entry_price: float
|
||||
margin: float
|
||||
leverage: int
|
||||
entry_time: int # ms timestamp
|
||||
peak_pnl_pct: float = 0.0
|
||||
|
||||
@property
|
||||
def notional(self):
|
||||
return self.margin * self.leverage
|
||||
|
||||
def pnl_at(self, price):
|
||||
shares = self.notional / self.entry_price
|
||||
if self.direction == "long":
|
||||
return (price - self.entry_price) * shares
|
||||
else:
|
||||
return (self.entry_price - price) * shares
|
||||
|
||||
def pnl_pct_at(self, price):
|
||||
pnl = self.pnl_at(price)
|
||||
return (pnl / self.margin) * 100
|
||||
|
||||
def is_liquidated(self, price):
|
||||
if self.direction == "long":
|
||||
liq = self.entry_price * (1 - 1 / self.leverage)
|
||||
return price <= liq
|
||||
else:
|
||||
liq = self.entry_price * (1 + 1 / self.leverage)
|
||||
return price >= liq
|
||||
|
||||
|
||||
@dataclass
|
||||
class BacktestResult:
|
||||
total_trades: int = 0
|
||||
wins: int = 0
|
||||
losses: int = 0
|
||||
liquidations: int = 0
|
||||
total_pnl: float = 0.0
|
||||
max_drawdown_pct: float = 0.0
|
||||
peak_equity: float = 0.0
|
||||
fees_paid: float = 0.0
|
||||
trades: list = field(default_factory=list)
|
||||
equity_curve: list = field(default_factory=list)
|
||||
|
||||
|
||||
def run_backtest(
|
||||
coins=None,
|
||||
months=6,
|
||||
starting_cash=10_000.0,
|
||||
margin_per_trade=200.0,
|
||||
max_positions=10,
|
||||
short_threshold=50,
|
||||
long_threshold=45,
|
||||
tp_pct=5.0,
|
||||
sl_pct=3.0,
|
||||
trailing_stop_pct=2.0,
|
||||
taker_fee_pct=0.05,
|
||||
maker_fee_pct=0.02,
|
||||
fee_mode="taker", # "taker", "maker", "hybrid"
|
||||
maker_fill_rate=0.70, # hybrid: 70% of entries fill as maker
|
||||
scan_interval=15, # candles between scans (15 = every 15h on 1h candles)
|
||||
vol_filter=0.0, # Min ATR% on BTC to allow entries (0=disabled)
|
||||
vol_filter_coin="BTCUSDT",
|
||||
):
|
||||
"""
|
||||
Run backtest over historical data.
|
||||
Uses 1h candles, scans every `scan_interval` candles for signals.
|
||||
"""
|
||||
import random
|
||||
|
||||
def calc_fee(notional_usd):
|
||||
"""Calculate fee based on fee mode."""
|
||||
if fee_mode == "maker":
|
||||
return notional_usd * maker_fee_pct / 100
|
||||
elif fee_mode == "hybrid":
|
||||
# Each trade randomly fills as maker or taker based on fill rate
|
||||
if random.random() < maker_fill_rate:
|
||||
return notional_usd * maker_fee_pct / 100
|
||||
else:
|
||||
return notional_usd * taker_fee_pct / 100
|
||||
else: # taker
|
||||
return notional_usd * taker_fee_pct / 100
|
||||
|
||||
if coins is None:
|
||||
coins = COINS
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
start = now - timedelta(days=months * 30)
|
||||
start_ms = int(start.timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
|
||||
# Fetch all historical data
|
||||
print(f"=== Leverage Strategy Backtester ===")
|
||||
print(f"Period: {start.strftime('%Y-%m-%d')} → {now.strftime('%Y-%m-%d')} ({months} months)")
|
||||
print(f"Coins: {len(coins)}")
|
||||
vf_str = f" | vol_filter={vol_filter}% on {vol_filter_coin}" if vol_filter > 0 else ""
|
||||
fee_str = f" | fees={fee_mode}" + (f"({maker_fill_rate:.0%} maker)" if fee_mode == "hybrid" else "")
|
||||
print(f"Params: margin=${margin_per_trade} | TP={tp_pct}% | SL={sl_pct}% | trailing={trailing_stop_pct}%{vf_str}{fee_str}")
|
||||
print(f"Thresholds: short≥{short_threshold} | long≥{long_threshold}")
|
||||
print()
|
||||
|
||||
coin_data = {}
|
||||
for sym in coins:
|
||||
klines = fetch_klines_cached(sym, '1h', start_ms, end_ms)
|
||||
if len(klines) >= 100:
|
||||
coin_data[sym] = klines
|
||||
else:
|
||||
print(f" Skipping {sym}: only {len(klines)} candles")
|
||||
|
||||
if not coin_data:
|
||||
print("No data available!")
|
||||
return None
|
||||
|
||||
# Find common time range
|
||||
min_len = min(len(v) for v in coin_data.values())
|
||||
print(f"\nSimulating on {len(coin_data)} coins, {min_len} candles each\n")
|
||||
|
||||
# Backtest state
|
||||
cash = starting_cash
|
||||
positions: list[Position] = []
|
||||
result = BacktestResult()
|
||||
result.peak_equity = starting_cash
|
||||
|
||||
# Walk forward through time
|
||||
lookback = 100 # Need 100 candles for indicators
|
||||
|
||||
for i in range(lookback, min_len):
|
||||
# Current time
|
||||
sample_sym = list(coin_data.keys())[0]
|
||||
current_time = coin_data[sample_sym][i]['close_time']
|
||||
|
||||
# ── Check exits on every candle ──
|
||||
to_close = []
|
||||
for idx, pos in enumerate(positions):
|
||||
sym_data = coin_data.get(pos.symbol + "USDT" if not pos.symbol.endswith("USDT") else pos.symbol)
|
||||
if not sym_data or i >= len(sym_data):
|
||||
continue
|
||||
|
||||
candle = sym_data[i]
|
||||
# Check against high AND low for more realistic simulation
|
||||
# For longs: SL could hit on low, TP on high
|
||||
# For shorts: SL could hit on high, TP on low
|
||||
|
||||
if pos.direction == "long":
|
||||
worst_price = candle['low']
|
||||
best_price = candle['high']
|
||||
else:
|
||||
worst_price = candle['high']
|
||||
best_price = candle['low']
|
||||
|
||||
current_price = candle['close']
|
||||
|
||||
# Check liquidation on worst price
|
||||
if pos.is_liquidated(worst_price):
|
||||
to_close.append((idx, "liquidated", current_price))
|
||||
continue
|
||||
|
||||
# Check stop loss on worst price
|
||||
worst_pnl_pct = pos.pnl_pct_at(worst_price)
|
||||
if worst_pnl_pct <= -sl_pct:
|
||||
# Stopped out — approximate exit at SL level
|
||||
to_close.append((idx, "sl", current_price))
|
||||
continue
|
||||
|
||||
# Check take profit on best price
|
||||
best_pnl_pct = pos.pnl_pct_at(best_price)
|
||||
if best_pnl_pct >= tp_pct:
|
||||
to_close.append((idx, "tp", current_price))
|
||||
continue
|
||||
|
||||
# Update peak and check trailing stop
|
||||
close_pnl_pct = pos.pnl_pct_at(current_price)
|
||||
if close_pnl_pct > pos.peak_pnl_pct:
|
||||
pos.peak_pnl_pct = close_pnl_pct
|
||||
|
||||
if pos.peak_pnl_pct >= 2.0 and (pos.peak_pnl_pct - close_pnl_pct) >= trailing_stop_pct:
|
||||
to_close.append((idx, "trailing", current_price))
|
||||
|
||||
# Process closes (reverse order to preserve indices)
|
||||
for idx, reason, price in sorted(to_close, key=lambda x: x[0], reverse=True):
|
||||
pos = positions[idx]
|
||||
pnl = pos.pnl_at(price)
|
||||
fee = calc_fee(pos.notional)
|
||||
net_pnl = pnl - fee
|
||||
|
||||
if reason == "liquidated":
|
||||
net_pnl = -pos.margin # Lose entire margin
|
||||
result.liquidations += 1
|
||||
|
||||
cash += pos.margin + net_pnl
|
||||
result.total_pnl += net_pnl
|
||||
result.fees_paid += fee
|
||||
result.total_trades += 1
|
||||
|
||||
if net_pnl > 0:
|
||||
result.wins += 1
|
||||
else:
|
||||
result.losses += 1
|
||||
|
||||
result.trades.append({
|
||||
"symbol": pos.symbol,
|
||||
"direction": pos.direction,
|
||||
"entry_price": pos.entry_price,
|
||||
"exit_price": price,
|
||||
"leverage": pos.leverage,
|
||||
"margin": pos.margin,
|
||||
"pnl": round(net_pnl, 2),
|
||||
"pnl_pct": round(pos.pnl_pct_at(price), 2),
|
||||
"reason": reason,
|
||||
"entry_time": pos.entry_time,
|
||||
"exit_time": current_time,
|
||||
})
|
||||
|
||||
positions.pop(idx)
|
||||
|
||||
# ── Scan for entries every N candles ──
|
||||
if (i - lookback) % scan_interval == 0 and len(positions) < max_positions:
|
||||
# Volatility regime filter: skip entries in low-vol environments
|
||||
if vol_filter > 0 and vol_filter_coin in coin_data:
|
||||
vf_klines = coin_data[vol_filter_coin][max(0,i-23):i+1]
|
||||
regime_vol = calc_volatility_regime(vf_klines)
|
||||
if regime_vol < vol_filter:
|
||||
# Record equity curve point even when skipping
|
||||
if (i - lookback) % 24 == 0:
|
||||
unrealized = 0
|
||||
for pos in positions:
|
||||
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
|
||||
if sym_key in coin_data and i < len(coin_data[sym_key]):
|
||||
unrealized += pos.pnl_at(coin_data[sym_key][i]['close'])
|
||||
margin_locked = sum(p.margin for p in positions)
|
||||
equity = cash + margin_locked + unrealized
|
||||
if equity > result.peak_equity:
|
||||
result.peak_equity = equity
|
||||
dd = ((result.peak_equity - equity) / result.peak_equity * 100) if result.peak_equity > 0 else 0
|
||||
if dd > result.max_drawdown_pct:
|
||||
result.max_drawdown_pct = dd
|
||||
result.equity_curve.append({"time": current_time, "equity": round(equity, 2), "cash": round(cash, 2), "positions": len(positions), "drawdown_pct": round(dd, 2)})
|
||||
continue # Skip entries in low-vol regime
|
||||
|
||||
open_symbols = {p.symbol for p in positions}
|
||||
candidates = []
|
||||
|
||||
for sym, klines in coin_data.items():
|
||||
if sym.replace("USDT", "") in open_symbols or sym in open_symbols:
|
||||
continue
|
||||
if i >= len(klines):
|
||||
continue
|
||||
|
||||
window = klines[i-99:i+1]
|
||||
closes = [k['close'] for k in window]
|
||||
|
||||
# Score both directions
|
||||
short_score, _, _ = score_short(closes, window)
|
||||
long_score, _, _ = score_long(closes, window)
|
||||
|
||||
if short_score >= short_threshold:
|
||||
candidates.append((sym, "short", short_score))
|
||||
if long_score >= long_threshold:
|
||||
candidates.append((sym, "long", long_score))
|
||||
|
||||
# Sort by score, take best
|
||||
candidates.sort(key=lambda x: x[2], reverse=True)
|
||||
|
||||
for sym, direction, score in candidates:
|
||||
if len(positions) >= max_positions:
|
||||
break
|
||||
clean_sym = sym.replace("USDT", "")
|
||||
if clean_sym in {p.symbol for p in positions}:
|
||||
continue
|
||||
if cash < margin_per_trade:
|
||||
break
|
||||
|
||||
entry_price = coin_data[sym][i]['close']
|
||||
lev = 15 if score >= 70 else 10 if score >= 60 else 7
|
||||
fee = calc_fee(margin_per_trade * lev)
|
||||
|
||||
cash -= margin_per_trade + fee
|
||||
result.fees_paid += fee
|
||||
|
||||
positions.append(Position(
|
||||
symbol=clean_sym,
|
||||
direction=direction,
|
||||
entry_price=entry_price,
|
||||
margin=margin_per_trade,
|
||||
leverage=lev,
|
||||
entry_time=current_time,
|
||||
))
|
||||
|
||||
# ── Track equity curve ──
|
||||
unrealized = 0
|
||||
for pos in positions:
|
||||
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
|
||||
if sym_key in coin_data and i < len(coin_data[sym_key]):
|
||||
unrealized += pos.pnl_at(coin_data[sym_key][i]['close'])
|
||||
|
||||
margin_locked = sum(p.margin for p in positions)
|
||||
equity = cash + margin_locked + unrealized
|
||||
|
||||
if equity > result.peak_equity:
|
||||
result.peak_equity = equity
|
||||
|
||||
dd = ((result.peak_equity - equity) / result.peak_equity * 100) if result.peak_equity > 0 else 0
|
||||
if dd > result.max_drawdown_pct:
|
||||
result.max_drawdown_pct = dd
|
||||
|
||||
# Sample equity curve every 24 candles (daily)
|
||||
if (i - lookback) % 24 == 0:
|
||||
result.equity_curve.append({
|
||||
"time": current_time,
|
||||
"equity": round(equity, 2),
|
||||
"cash": round(cash, 2),
|
||||
"positions": len(positions),
|
||||
"drawdown_pct": round(dd, 2),
|
||||
})
|
||||
|
||||
# Close remaining positions at last price
|
||||
for pos in positions:
|
||||
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
|
||||
if sym_key in coin_data:
|
||||
price = coin_data[sym_key][-1]['close']
|
||||
pnl = pos.pnl_at(price)
|
||||
fee = calc_fee(pos.notional)
|
||||
net_pnl = pnl - fee
|
||||
cash += pos.margin + net_pnl
|
||||
result.total_pnl += net_pnl
|
||||
result.total_trades += 1
|
||||
if net_pnl > 0:
|
||||
result.wins += 1
|
||||
else:
|
||||
result.losses += 1
|
||||
|
||||
return result, cash
|
||||
|
||||
|
||||
def print_results(result, final_equity, starting_cash, params=None):
|
||||
"""Pretty print backtest results."""
|
||||
total_return = ((final_equity - starting_cash) / starting_cash) * 100
|
||||
win_rate = (result.wins / result.total_trades * 100) if result.total_trades else 0
|
||||
|
||||
# Calculate Sharpe-like ratio from equity curve
|
||||
if len(result.equity_curve) >= 2:
|
||||
returns = []
|
||||
for i in range(1, len(result.equity_curve)):
|
||||
prev = result.equity_curve[i-1]['equity']
|
||||
curr = result.equity_curve[i]['equity']
|
||||
if prev > 0:
|
||||
returns.append((curr - prev) / prev)
|
||||
if returns:
|
||||
avg_ret = sum(returns) / len(returns)
|
||||
std_ret = (sum((r - avg_ret)**2 for r in returns) / len(returns)) ** 0.5
|
||||
sharpe = (avg_ret / std_ret * (365**0.5)) if std_ret > 0 else 0
|
||||
else:
|
||||
sharpe = 0
|
||||
else:
|
||||
sharpe = 0
|
||||
|
||||
# Profit factor
|
||||
gross_wins = sum(t['pnl'] for t in result.trades if t['pnl'] > 0)
|
||||
gross_losses = abs(sum(t['pnl'] for t in result.trades if t['pnl'] < 0))
|
||||
profit_factor = gross_wins / gross_losses if gross_losses > 0 else float('inf')
|
||||
|
||||
# Average win/loss
|
||||
wins_list = [t['pnl'] for t in result.trades if t['pnl'] > 0]
|
||||
losses_list = [t['pnl'] for t in result.trades if t['pnl'] < 0]
|
||||
avg_win = sum(wins_list) / len(wins_list) if wins_list else 0
|
||||
avg_loss = sum(losses_list) / len(losses_list) if losses_list else 0
|
||||
|
||||
# Exit reason breakdown
|
||||
reasons = {}
|
||||
for t in result.trades:
|
||||
r = t.get('reason', 'unknown')
|
||||
reasons[r] = reasons.get(r, 0) + 1
|
||||
|
||||
print("\n" + "="*60)
|
||||
print(" BACKTEST RESULTS")
|
||||
print("="*60)
|
||||
if params:
|
||||
print(f" Params: TP={params.get('tp')}% SL={params.get('sl')}% Trail={params.get('trail')}%")
|
||||
print(f" Starting Capital: ${starting_cash:>12,.2f}")
|
||||
print(f" Final Equity: ${final_equity:>12,.2f}")
|
||||
print(f" Total Return: {total_return:>12.2f}%")
|
||||
print(f" Max Drawdown: {result.max_drawdown_pct:>12.2f}%")
|
||||
print(f" Sharpe Ratio: {sharpe:>12.2f}")
|
||||
print(f" Profit Factor: {profit_factor:>12.2f}")
|
||||
print(f" Fees Paid: ${result.fees_paid:>12,.2f}")
|
||||
print("-"*60)
|
||||
print(f" Total Trades: {result.total_trades:>12}")
|
||||
print(f" Wins: {result.wins:>12}")
|
||||
print(f" Losses: {result.losses:>12}")
|
||||
print(f" Win Rate: {win_rate:>11.1f}%")
|
||||
print(f" Liquidations: {result.liquidations:>12}")
|
||||
print(f" Avg Win: ${avg_win:>12,.2f}")
|
||||
print(f" Avg Loss: ${avg_loss:>12,.2f}")
|
||||
print(f" Win/Loss Ratio: {abs(avg_win/avg_loss) if avg_loss else 0:>12.2f}")
|
||||
print("-"*60)
|
||||
print(f" Exit Reasons:")
|
||||
for reason, count in sorted(reasons.items(), key=lambda x: -x[1]):
|
||||
print(f" {reason:20s} {count:>6}")
|
||||
print("="*60)
|
||||
|
||||
return {
|
||||
"total_return_pct": round(total_return, 2),
|
||||
"max_drawdown_pct": round(result.max_drawdown_pct, 2),
|
||||
"sharpe": round(sharpe, 2),
|
||||
"profit_factor": round(profit_factor, 2),
|
||||
"total_trades": result.total_trades,
|
||||
"win_rate": round(win_rate, 1),
|
||||
"liquidations": result.liquidations,
|
||||
"avg_win": round(avg_win, 2),
|
||||
"avg_loss": round(avg_loss, 2),
|
||||
"fees": round(result.fees_paid, 2),
|
||||
}
|
||||
|
||||
|
||||
def optimize(months=6):
|
||||
"""Grid search over key parameters."""
|
||||
print("\n🔧 PARAMETER OPTIMIZATION")
|
||||
print("="*60)
|
||||
|
||||
param_grid = [
|
||||
{"tp": 3, "sl": 2, "trail": 1.5},
|
||||
{"tp": 4, "sl": 2, "trail": 1.5},
|
||||
{"tp": 5, "sl": 3, "trail": 2}, # Current live params
|
||||
{"tp": 6, "sl": 3, "trail": 2},
|
||||
{"tp": 7, "sl": 3, "trail": 2.5},
|
||||
{"tp": 5, "sl": 2, "trail": 1.5},
|
||||
{"tp": 8, "sl": 4, "trail": 3},
|
||||
{"tp": 10, "sl": 5, "trail": 3},
|
||||
{"tp": 5, "sl": 3, "trail": 1},
|
||||
{"tp": 4, "sl": 3, "trail": 2},
|
||||
]
|
||||
|
||||
all_results = []
|
||||
|
||||
for params in param_grid:
|
||||
print(f"\n--- TP={params['tp']}% SL={params['sl']}% Trail={params['trail']}% ---")
|
||||
result, final_eq = run_backtest(
|
||||
months=months,
|
||||
tp_pct=params['tp'],
|
||||
sl_pct=params['sl'],
|
||||
trailing_stop_pct=params['trail'],
|
||||
)
|
||||
stats = print_results(result, final_eq, 10_000, params)
|
||||
stats['params'] = params
|
||||
all_results.append(stats)
|
||||
|
||||
# Rank by return
|
||||
print("\n\n" + "="*60)
|
||||
print(" OPTIMIZATION RANKINGS")
|
||||
print("="*60)
|
||||
print(f"{'Rank':>4} {'TP%':>4} {'SL%':>4} {'Trail%':>6} {'Return%':>9} {'MaxDD%':>7} {'WinRate':>8} {'Sharpe':>7} {'Trades':>7}")
|
||||
print("-"*60)
|
||||
|
||||
for rank, r in enumerate(sorted(all_results, key=lambda x: x['total_return_pct'], reverse=True), 1):
|
||||
p = r['params']
|
||||
marker = " ◄ LIVE" if p['tp'] == 5 and p['sl'] == 3 and p['trail'] == 2 else ""
|
||||
print(f"{rank:>4} {p['tp']:>4} {p['sl']:>4} {p['trail']:>6} {r['total_return_pct']:>8.1f}% {r['max_drawdown_pct']:>6.1f}% {r['win_rate']:>7.1f}% {r['sharpe']:>7.2f} {r['total_trades']:>7}{marker}")
|
||||
|
||||
return all_results
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Leverage Strategy Backtester")
|
||||
parser.add_argument("--months", type=int, default=6, help="Months of history (default: 6)")
|
||||
parser.add_argument("--coins", type=str, help="Comma-separated coin list (e.g. BTC,ETH)")
|
||||
parser.add_argument("--optimize", action="store_true", help="Run parameter optimization")
|
||||
parser.add_argument("--tp", type=float, default=5.0, help="Take profit %% (default: 5)")
|
||||
parser.add_argument("--sl", type=float, default=3.0, help="Stop loss %% (default: 3)")
|
||||
parser.add_argument("--trail", type=float, default=2.0, help="Trailing stop %% (default: 2)")
|
||||
parser.add_argument("--margin", type=float, default=200.0, help="Margin per trade (default: 200)")
|
||||
parser.add_argument("--cash", type=float, default=10000.0, help="Starting cash (default: 10000)")
|
||||
parser.add_argument("--vol-filter", type=float, default=0.0, help="Min ATR%% to trade (0=disabled, try 1.0-2.0)")
|
||||
parser.add_argument("--vol-filter-coin", type=str, default="BTCUSDT", help="Coin to measure regime volatility on")
|
||||
parser.add_argument("--fee-mode", type=str, default="taker", choices=["taker", "maker", "hybrid"], help="Fee mode: taker (0.05%%), maker (0.02%%), hybrid (70%% maker)")
|
||||
parser.add_argument("--maker-fill-rate", type=float, default=0.70, help="Hybrid: probability limit order fills (default: 0.70)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.optimize:
|
||||
results = optimize(args.months)
|
||||
# Save results
|
||||
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
out = RESULTS_DIR / f"optimize_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
out.write_text(json.dumps(results, indent=2))
|
||||
print(f"\nSaved to {out}")
|
||||
return
|
||||
|
||||
coins = None
|
||||
if args.coins:
|
||||
coins = [c.strip().upper() + "USDT" for c in args.coins.split(",")]
|
||||
|
||||
result, final_equity = run_backtest(
|
||||
coins=coins,
|
||||
months=args.months,
|
||||
starting_cash=args.cash,
|
||||
margin_per_trade=args.margin,
|
||||
tp_pct=args.tp,
|
||||
sl_pct=args.sl,
|
||||
trailing_stop_pct=args.trail,
|
||||
vol_filter=args.vol_filter,
|
||||
vol_filter_coin=args.vol_filter_coin,
|
||||
fee_mode=args.fee_mode,
|
||||
maker_fill_rate=args.maker_fill_rate,
|
||||
)
|
||||
|
||||
stats = print_results(result, final_equity, args.cash)
|
||||
|
||||
# Save results
|
||||
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
out = RESULTS_DIR / f"backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
out.write_text(json.dumps({
|
||||
"stats": stats,
|
||||
"equity_curve": result.equity_curve,
|
||||
"trades": result.trades[-100:], # Last 100 trades for review
|
||||
"params": {
|
||||
"months": args.months,
|
||||
"tp": args.tp,
|
||||
"sl": args.sl,
|
||||
"trail": args.trail,
|
||||
"margin": args.margin,
|
||||
"starting_cash": args.cash,
|
||||
}
|
||||
}, indent=2))
|
||||
print(f"\nResults saved to {out}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user