772 lines
28 KiB
Python
772 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Leverage Strategy Backtester
|
|
Replays the exact same signal logic (RSI, VWAP, MACD, Bollinger, volume)
|
|
against historical candle data to validate the live leverage trader.
|
|
|
|
Usage:
|
|
python3 leverage_backtester.py # Default: 6 months, all coins
|
|
python3 leverage_backtester.py --months 12 # 12 months
|
|
python3 leverage_backtester.py --coins BTC,ETH # Specific coins
|
|
python3 leverage_backtester.py --optimize # Grid search params
|
|
"""
|
|
|
|
import json
|
|
import math
|
|
import time
|
|
import argparse
|
|
import urllib.request
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
|
|
BINANCE_KLINES = "https://api.binance.us/api/v3/klines"
|
|
|
|
COINS = [
|
|
"BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT",
|
|
"ADAUSDT", "AVAXUSDT", "LINKUSDT", "DOTUSDT", "MATICUSDT",
|
|
"NEARUSDT", "ATOMUSDT", "LTCUSDT", "UNIUSDT", "AAVEUSDT",
|
|
"FILUSDT", "ALGOUSDT", "XLMUSDT", "VETUSDT", "ICPUSDT",
|
|
]
|
|
|
|
RESULTS_DIR = Path(__file__).parent.parent / "data" / "backtest-results"
|
|
CACHE_DIR = Path(__file__).parent.parent / "data" / "kline-cache"
|
|
|
|
|
|
# ── Indicator Calculations (identical to short_scanner.py) ──
|
|
|
|
def calc_rsi(closes, period=14):
|
|
if len(closes) < period + 1:
|
|
return 50
|
|
deltas = [closes[i] - closes[i-1] for i in range(1, len(closes))]
|
|
gains = [d if d > 0 else 0 for d in deltas]
|
|
losses = [-d if d < 0 else 0 for d in deltas]
|
|
avg_gain = sum(gains[:period]) / period
|
|
avg_loss = sum(losses[:period]) / period
|
|
for i in range(period, len(deltas)):
|
|
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
|
|
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
|
|
if avg_loss == 0:
|
|
return 100
|
|
return round(100 - (100 / (1 + avg_gain / avg_loss)), 1)
|
|
|
|
|
|
def calc_vwap(klines_window):
|
|
cum_vol = cum_tp_vol = 0
|
|
for k in klines_window:
|
|
tp = (k['high'] + k['low'] + k['close']) / 3
|
|
cum_vol += k['volume']
|
|
cum_tp_vol += tp * k['volume']
|
|
return cum_tp_vol / cum_vol if cum_vol else 0
|
|
|
|
|
|
def calc_ema(values, period):
|
|
if not values:
|
|
return 0
|
|
mult = 2 / (period + 1)
|
|
ema = values[0]
|
|
for v in values[1:]:
|
|
ema = (v - ema) * mult + ema
|
|
return ema
|
|
|
|
|
|
def calc_macd(closes):
|
|
if len(closes) < 26:
|
|
return 0, 0, 0
|
|
ema12 = calc_ema(closes, 12)
|
|
ema26 = calc_ema(closes, 26)
|
|
macd_line = ema12 - ema26
|
|
signal = calc_ema(closes[-9:], 9) if len(closes) >= 9 else macd_line
|
|
return macd_line, signal, macd_line - signal
|
|
|
|
|
|
def calc_bollinger_position(closes, period=20):
|
|
if len(closes) < period:
|
|
return 0.5
|
|
recent = closes[-period:]
|
|
sma = sum(recent) / period
|
|
std = (sum((x - sma)**2 for x in recent) / period) ** 0.5
|
|
if std == 0:
|
|
return 0.5
|
|
upper = sma + 2 * std
|
|
lower = sma - 2 * std
|
|
bw = upper - lower
|
|
return (closes[-1] - lower) / bw if bw else 0.5
|
|
|
|
|
|
def calc_atr(klines, period=14):
|
|
"""Average True Range — measures volatility."""
|
|
if len(klines) < period + 1:
|
|
return 0
|
|
trs = []
|
|
for i in range(1, len(klines)):
|
|
h, l, pc = klines[i]['high'], klines[i]['low'], klines[i-1]['close']
|
|
tr = max(h - l, abs(h - pc), abs(l - pc))
|
|
trs.append(tr)
|
|
if len(trs) < period:
|
|
return sum(trs) / len(trs) if trs else 0
|
|
atr = sum(trs[:period]) / period
|
|
for i in range(period, len(trs)):
|
|
atr = (atr * (period - 1) + trs[i]) / period
|
|
return atr
|
|
|
|
|
|
def calc_volatility_regime(klines, period=14):
|
|
"""Returns ATR as % of price. Higher = more volatile = better for trading."""
|
|
atr = calc_atr(klines, period)
|
|
price = klines[-1]['close'] if klines else 1
|
|
return (atr / price * 100) if price else 0
|
|
|
|
|
|
def volume_trend(klines, lookback=10):
|
|
if len(klines) < lookback * 2:
|
|
return 1.0
|
|
recent = sum(k['volume'] for k in klines[-lookback:]) / lookback
|
|
older = sum(k['volume'] for k in klines[-lookback*2:-lookback]) / lookback
|
|
return recent / older if older else 1.0
|
|
|
|
|
|
# ── Signal Scoring (identical to leverage_trader.py) ──
|
|
|
|
def score_short(closes, klines):
|
|
price = closes[-1]
|
|
rsi = calc_rsi(closes)
|
|
vwap = calc_vwap(klines[-24:])
|
|
vwap_pct = ((price - vwap) / vwap * 100) if vwap else 0
|
|
_, _, histogram = calc_macd(closes)
|
|
bb_pos = calc_bollinger_position(closes)
|
|
vol = volume_trend(klines)
|
|
p24 = closes[-24] if len(closes) >= 24 else closes[0]
|
|
chg24 = ((price - p24) / p24 * 100) if p24 else 0
|
|
|
|
score = 0
|
|
if rsi >= 80: score += 30
|
|
elif rsi >= 70: score += 25
|
|
elif rsi >= 65: score += 15
|
|
elif rsi >= 60: score += 5
|
|
|
|
if vwap_pct > 5: score += 20
|
|
elif vwap_pct > 3: score += 15
|
|
elif vwap_pct > 1: score += 8
|
|
|
|
if histogram < -0.001 * price: score += 15
|
|
elif histogram < 0: score += 10
|
|
|
|
if bb_pos > 1.0: score += 15
|
|
elif bb_pos > 0.85: score += 10
|
|
|
|
if chg24 > 15: score += 15
|
|
elif chg24 > 8: score += 10
|
|
elif chg24 > 4: score += 5
|
|
|
|
if chg24 > 2 and vol < 0.7: score += 5
|
|
|
|
return score, "short", rsi
|
|
|
|
|
|
def score_long(closes, klines):
|
|
price = closes[-1]
|
|
rsi = calc_rsi(closes)
|
|
vwap = calc_vwap(klines[-24:])
|
|
vwap_pct = ((price - vwap) / vwap * 100) if vwap else 0
|
|
bb_pos = calc_bollinger_position(closes)
|
|
p24 = closes[-24] if len(closes) >= 24 else closes[0]
|
|
chg24 = ((price - p24) / p24 * 100) if p24 else 0
|
|
|
|
score = 0
|
|
if rsi <= 25: score += 30
|
|
elif rsi <= 30: score += 25
|
|
elif rsi <= 35: score += 15
|
|
elif rsi <= 40: score += 5
|
|
|
|
if vwap_pct < -5: score += 20
|
|
elif vwap_pct < -3: score += 15
|
|
elif vwap_pct < -1: score += 8
|
|
|
|
if chg24 < -15: score += 15
|
|
elif chg24 < -8: score += 10
|
|
elif chg24 < -4: score += 5
|
|
|
|
if bb_pos < 0: score += 15
|
|
elif bb_pos < 0.15: score += 10
|
|
|
|
return score, "long", rsi
|
|
|
|
|
|
# ── Data Fetching with Cache ──
|
|
|
|
def fetch_klines_cached(symbol, interval, start_ms, end_ms):
|
|
"""Fetch klines with local file cache to avoid re-downloading."""
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
cache_file = CACHE_DIR / f"{symbol}_{interval}_{start_ms}_{end_ms}.json"
|
|
|
|
if cache_file.exists():
|
|
return json.loads(cache_file.read_text())
|
|
|
|
print(f" Downloading {symbol} {interval} candles...")
|
|
all_klines = []
|
|
current = start_ms
|
|
|
|
while current < end_ms:
|
|
url = f"{BINANCE_KLINES}?symbol={symbol}&interval={interval}&startTime={current}&endTime={end_ms}&limit=1000"
|
|
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
|
try:
|
|
resp = urllib.request.urlopen(req, timeout=15)
|
|
raw = json.loads(resp.read())
|
|
if not raw:
|
|
break
|
|
for k in raw:
|
|
all_klines.append({
|
|
'open_time': k[0],
|
|
'open': float(k[1]),
|
|
'high': float(k[2]),
|
|
'low': float(k[3]),
|
|
'close': float(k[4]),
|
|
'volume': float(k[5]),
|
|
'close_time': k[6],
|
|
})
|
|
current = raw[-1][6] + 1
|
|
time.sleep(0.12) # Rate limit
|
|
except Exception as e:
|
|
print(f" Error: {e}")
|
|
break
|
|
|
|
if all_klines:
|
|
cache_file.write_text(json.dumps(all_klines))
|
|
print(f" Got {len(all_klines)} candles")
|
|
return all_klines
|
|
|
|
|
|
# ── Position Simulation ──
|
|
|
|
@dataclass
|
|
class Position:
|
|
symbol: str
|
|
direction: str # "long" or "short"
|
|
entry_price: float
|
|
margin: float
|
|
leverage: int
|
|
entry_time: int # ms timestamp
|
|
peak_pnl_pct: float = 0.0
|
|
|
|
@property
|
|
def notional(self):
|
|
return self.margin * self.leverage
|
|
|
|
def pnl_at(self, price):
|
|
shares = self.notional / self.entry_price
|
|
if self.direction == "long":
|
|
return (price - self.entry_price) * shares
|
|
else:
|
|
return (self.entry_price - price) * shares
|
|
|
|
def pnl_pct_at(self, price):
|
|
pnl = self.pnl_at(price)
|
|
return (pnl / self.margin) * 100
|
|
|
|
def is_liquidated(self, price):
|
|
if self.direction == "long":
|
|
liq = self.entry_price * (1 - 1 / self.leverage)
|
|
return price <= liq
|
|
else:
|
|
liq = self.entry_price * (1 + 1 / self.leverage)
|
|
return price >= liq
|
|
|
|
|
|
@dataclass
|
|
class BacktestResult:
|
|
total_trades: int = 0
|
|
wins: int = 0
|
|
losses: int = 0
|
|
liquidations: int = 0
|
|
total_pnl: float = 0.0
|
|
max_drawdown_pct: float = 0.0
|
|
peak_equity: float = 0.0
|
|
fees_paid: float = 0.0
|
|
trades: list = field(default_factory=list)
|
|
equity_curve: list = field(default_factory=list)
|
|
|
|
|
|
def run_backtest(
|
|
coins=None,
|
|
months=6,
|
|
starting_cash=10_000.0,
|
|
margin_per_trade=200.0,
|
|
max_positions=10,
|
|
short_threshold=50,
|
|
long_threshold=45,
|
|
tp_pct=5.0,
|
|
sl_pct=3.0,
|
|
trailing_stop_pct=2.0,
|
|
taker_fee_pct=0.05,
|
|
maker_fee_pct=0.02,
|
|
fee_mode="taker", # "taker", "maker", "hybrid"
|
|
maker_fill_rate=0.70, # hybrid: 70% of entries fill as maker
|
|
scan_interval=15, # candles between scans (15 = every 15h on 1h candles)
|
|
vol_filter=0.0, # Min ATR% on BTC to allow entries (0=disabled)
|
|
vol_filter_coin="BTCUSDT",
|
|
):
|
|
"""
|
|
Run backtest over historical data.
|
|
Uses 1h candles, scans every `scan_interval` candles for signals.
|
|
"""
|
|
import random
|
|
|
|
def calc_fee(notional_usd):
|
|
"""Calculate fee based on fee mode."""
|
|
if fee_mode == "maker":
|
|
return notional_usd * maker_fee_pct / 100
|
|
elif fee_mode == "hybrid":
|
|
# Each trade randomly fills as maker or taker based on fill rate
|
|
if random.random() < maker_fill_rate:
|
|
return notional_usd * maker_fee_pct / 100
|
|
else:
|
|
return notional_usd * taker_fee_pct / 100
|
|
else: # taker
|
|
return notional_usd * taker_fee_pct / 100
|
|
|
|
if coins is None:
|
|
coins = COINS
|
|
|
|
now = datetime.now(timezone.utc)
|
|
start = now - timedelta(days=months * 30)
|
|
start_ms = int(start.timestamp() * 1000)
|
|
end_ms = int(now.timestamp() * 1000)
|
|
|
|
# Fetch all historical data
|
|
print(f"=== Leverage Strategy Backtester ===")
|
|
print(f"Period: {start.strftime('%Y-%m-%d')} → {now.strftime('%Y-%m-%d')} ({months} months)")
|
|
print(f"Coins: {len(coins)}")
|
|
vf_str = f" | vol_filter={vol_filter}% on {vol_filter_coin}" if vol_filter > 0 else ""
|
|
fee_str = f" | fees={fee_mode}" + (f"({maker_fill_rate:.0%} maker)" if fee_mode == "hybrid" else "")
|
|
print(f"Params: margin=${margin_per_trade} | TP={tp_pct}% | SL={sl_pct}% | trailing={trailing_stop_pct}%{vf_str}{fee_str}")
|
|
print(f"Thresholds: short≥{short_threshold} | long≥{long_threshold}")
|
|
print()
|
|
|
|
coin_data = {}
|
|
for sym in coins:
|
|
klines = fetch_klines_cached(sym, '1h', start_ms, end_ms)
|
|
if len(klines) >= 100:
|
|
coin_data[sym] = klines
|
|
else:
|
|
print(f" Skipping {sym}: only {len(klines)} candles")
|
|
|
|
if not coin_data:
|
|
print("No data available!")
|
|
return None
|
|
|
|
# Find common time range
|
|
min_len = min(len(v) for v in coin_data.values())
|
|
print(f"\nSimulating on {len(coin_data)} coins, {min_len} candles each\n")
|
|
|
|
# Backtest state
|
|
cash = starting_cash
|
|
positions: list[Position] = []
|
|
result = BacktestResult()
|
|
result.peak_equity = starting_cash
|
|
|
|
# Walk forward through time
|
|
lookback = 100 # Need 100 candles for indicators
|
|
|
|
for i in range(lookback, min_len):
|
|
# Current time
|
|
sample_sym = list(coin_data.keys())[0]
|
|
current_time = coin_data[sample_sym][i]['close_time']
|
|
|
|
# ── Check exits on every candle ──
|
|
to_close = []
|
|
for idx, pos in enumerate(positions):
|
|
sym_data = coin_data.get(pos.symbol + "USDT" if not pos.symbol.endswith("USDT") else pos.symbol)
|
|
if not sym_data or i >= len(sym_data):
|
|
continue
|
|
|
|
candle = sym_data[i]
|
|
# Check against high AND low for more realistic simulation
|
|
# For longs: SL could hit on low, TP on high
|
|
# For shorts: SL could hit on high, TP on low
|
|
|
|
if pos.direction == "long":
|
|
worst_price = candle['low']
|
|
best_price = candle['high']
|
|
else:
|
|
worst_price = candle['high']
|
|
best_price = candle['low']
|
|
|
|
current_price = candle['close']
|
|
|
|
# Check liquidation on worst price
|
|
if pos.is_liquidated(worst_price):
|
|
to_close.append((idx, "liquidated", current_price))
|
|
continue
|
|
|
|
# Check stop loss on worst price
|
|
worst_pnl_pct = pos.pnl_pct_at(worst_price)
|
|
if worst_pnl_pct <= -sl_pct:
|
|
# Stopped out — approximate exit at SL level
|
|
to_close.append((idx, "sl", current_price))
|
|
continue
|
|
|
|
# Check take profit on best price
|
|
best_pnl_pct = pos.pnl_pct_at(best_price)
|
|
if best_pnl_pct >= tp_pct:
|
|
to_close.append((idx, "tp", current_price))
|
|
continue
|
|
|
|
# Update peak and check trailing stop
|
|
close_pnl_pct = pos.pnl_pct_at(current_price)
|
|
if close_pnl_pct > pos.peak_pnl_pct:
|
|
pos.peak_pnl_pct = close_pnl_pct
|
|
|
|
if pos.peak_pnl_pct >= 2.0 and (pos.peak_pnl_pct - close_pnl_pct) >= trailing_stop_pct:
|
|
to_close.append((idx, "trailing", current_price))
|
|
|
|
# Process closes (reverse order to preserve indices)
|
|
for idx, reason, price in sorted(to_close, key=lambda x: x[0], reverse=True):
|
|
pos = positions[idx]
|
|
pnl = pos.pnl_at(price)
|
|
fee = calc_fee(pos.notional)
|
|
net_pnl = pnl - fee
|
|
|
|
if reason == "liquidated":
|
|
net_pnl = -pos.margin # Lose entire margin
|
|
result.liquidations += 1
|
|
|
|
cash += pos.margin + net_pnl
|
|
result.total_pnl += net_pnl
|
|
result.fees_paid += fee
|
|
result.total_trades += 1
|
|
|
|
if net_pnl > 0:
|
|
result.wins += 1
|
|
else:
|
|
result.losses += 1
|
|
|
|
result.trades.append({
|
|
"symbol": pos.symbol,
|
|
"direction": pos.direction,
|
|
"entry_price": pos.entry_price,
|
|
"exit_price": price,
|
|
"leverage": pos.leverage,
|
|
"margin": pos.margin,
|
|
"pnl": round(net_pnl, 2),
|
|
"pnl_pct": round(pos.pnl_pct_at(price), 2),
|
|
"reason": reason,
|
|
"entry_time": pos.entry_time,
|
|
"exit_time": current_time,
|
|
})
|
|
|
|
positions.pop(idx)
|
|
|
|
# ── Scan for entries every N candles ──
|
|
if (i - lookback) % scan_interval == 0 and len(positions) < max_positions:
|
|
# Volatility regime filter: skip entries in low-vol environments
|
|
if vol_filter > 0 and vol_filter_coin in coin_data:
|
|
vf_klines = coin_data[vol_filter_coin][max(0,i-23):i+1]
|
|
regime_vol = calc_volatility_regime(vf_klines)
|
|
if regime_vol < vol_filter:
|
|
# Record equity curve point even when skipping
|
|
if (i - lookback) % 24 == 0:
|
|
unrealized = 0
|
|
for pos in positions:
|
|
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
|
|
if sym_key in coin_data and i < len(coin_data[sym_key]):
|
|
unrealized += pos.pnl_at(coin_data[sym_key][i]['close'])
|
|
margin_locked = sum(p.margin for p in positions)
|
|
equity = cash + margin_locked + unrealized
|
|
if equity > result.peak_equity:
|
|
result.peak_equity = equity
|
|
dd = ((result.peak_equity - equity) / result.peak_equity * 100) if result.peak_equity > 0 else 0
|
|
if dd > result.max_drawdown_pct:
|
|
result.max_drawdown_pct = dd
|
|
result.equity_curve.append({"time": current_time, "equity": round(equity, 2), "cash": round(cash, 2), "positions": len(positions), "drawdown_pct": round(dd, 2)})
|
|
continue # Skip entries in low-vol regime
|
|
|
|
open_symbols = {p.symbol for p in positions}
|
|
candidates = []
|
|
|
|
for sym, klines in coin_data.items():
|
|
if sym.replace("USDT", "") in open_symbols or sym in open_symbols:
|
|
continue
|
|
if i >= len(klines):
|
|
continue
|
|
|
|
window = klines[i-99:i+1]
|
|
closes = [k['close'] for k in window]
|
|
|
|
# Score both directions
|
|
short_score, _, _ = score_short(closes, window)
|
|
long_score, _, _ = score_long(closes, window)
|
|
|
|
if short_score >= short_threshold:
|
|
candidates.append((sym, "short", short_score))
|
|
if long_score >= long_threshold:
|
|
candidates.append((sym, "long", long_score))
|
|
|
|
# Sort by score, take best
|
|
candidates.sort(key=lambda x: x[2], reverse=True)
|
|
|
|
for sym, direction, score in candidates:
|
|
if len(positions) >= max_positions:
|
|
break
|
|
clean_sym = sym.replace("USDT", "")
|
|
if clean_sym in {p.symbol for p in positions}:
|
|
continue
|
|
if cash < margin_per_trade:
|
|
break
|
|
|
|
entry_price = coin_data[sym][i]['close']
|
|
lev = 15 if score >= 70 else 10 if score >= 60 else 7
|
|
fee = calc_fee(margin_per_trade * lev)
|
|
|
|
cash -= margin_per_trade + fee
|
|
result.fees_paid += fee
|
|
|
|
positions.append(Position(
|
|
symbol=clean_sym,
|
|
direction=direction,
|
|
entry_price=entry_price,
|
|
margin=margin_per_trade,
|
|
leverage=lev,
|
|
entry_time=current_time,
|
|
))
|
|
|
|
# ── Track equity curve ──
|
|
unrealized = 0
|
|
for pos in positions:
|
|
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
|
|
if sym_key in coin_data and i < len(coin_data[sym_key]):
|
|
unrealized += pos.pnl_at(coin_data[sym_key][i]['close'])
|
|
|
|
margin_locked = sum(p.margin for p in positions)
|
|
equity = cash + margin_locked + unrealized
|
|
|
|
if equity > result.peak_equity:
|
|
result.peak_equity = equity
|
|
|
|
dd = ((result.peak_equity - equity) / result.peak_equity * 100) if result.peak_equity > 0 else 0
|
|
if dd > result.max_drawdown_pct:
|
|
result.max_drawdown_pct = dd
|
|
|
|
# Sample equity curve every 24 candles (daily)
|
|
if (i - lookback) % 24 == 0:
|
|
result.equity_curve.append({
|
|
"time": current_time,
|
|
"equity": round(equity, 2),
|
|
"cash": round(cash, 2),
|
|
"positions": len(positions),
|
|
"drawdown_pct": round(dd, 2),
|
|
})
|
|
|
|
# Close remaining positions at last price
|
|
for pos in positions:
|
|
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
|
|
if sym_key in coin_data:
|
|
price = coin_data[sym_key][-1]['close']
|
|
pnl = pos.pnl_at(price)
|
|
fee = calc_fee(pos.notional)
|
|
net_pnl = pnl - fee
|
|
cash += pos.margin + net_pnl
|
|
result.total_pnl += net_pnl
|
|
result.total_trades += 1
|
|
if net_pnl > 0:
|
|
result.wins += 1
|
|
else:
|
|
result.losses += 1
|
|
|
|
return result, cash
|
|
|
|
|
|
def print_results(result, final_equity, starting_cash, params=None):
|
|
"""Pretty print backtest results."""
|
|
total_return = ((final_equity - starting_cash) / starting_cash) * 100
|
|
win_rate = (result.wins / result.total_trades * 100) if result.total_trades else 0
|
|
|
|
# Calculate Sharpe-like ratio from equity curve
|
|
if len(result.equity_curve) >= 2:
|
|
returns = []
|
|
for i in range(1, len(result.equity_curve)):
|
|
prev = result.equity_curve[i-1]['equity']
|
|
curr = result.equity_curve[i]['equity']
|
|
if prev > 0:
|
|
returns.append((curr - prev) / prev)
|
|
if returns:
|
|
avg_ret = sum(returns) / len(returns)
|
|
std_ret = (sum((r - avg_ret)**2 for r in returns) / len(returns)) ** 0.5
|
|
sharpe = (avg_ret / std_ret * (365**0.5)) if std_ret > 0 else 0
|
|
else:
|
|
sharpe = 0
|
|
else:
|
|
sharpe = 0
|
|
|
|
# Profit factor
|
|
gross_wins = sum(t['pnl'] for t in result.trades if t['pnl'] > 0)
|
|
gross_losses = abs(sum(t['pnl'] for t in result.trades if t['pnl'] < 0))
|
|
profit_factor = gross_wins / gross_losses if gross_losses > 0 else float('inf')
|
|
|
|
# Average win/loss
|
|
wins_list = [t['pnl'] for t in result.trades if t['pnl'] > 0]
|
|
losses_list = [t['pnl'] for t in result.trades if t['pnl'] < 0]
|
|
avg_win = sum(wins_list) / len(wins_list) if wins_list else 0
|
|
avg_loss = sum(losses_list) / len(losses_list) if losses_list else 0
|
|
|
|
# Exit reason breakdown
|
|
reasons = {}
|
|
for t in result.trades:
|
|
r = t.get('reason', 'unknown')
|
|
reasons[r] = reasons.get(r, 0) + 1
|
|
|
|
print("\n" + "="*60)
|
|
print(" BACKTEST RESULTS")
|
|
print("="*60)
|
|
if params:
|
|
print(f" Params: TP={params.get('tp')}% SL={params.get('sl')}% Trail={params.get('trail')}%")
|
|
print(f" Starting Capital: ${starting_cash:>12,.2f}")
|
|
print(f" Final Equity: ${final_equity:>12,.2f}")
|
|
print(f" Total Return: {total_return:>12.2f}%")
|
|
print(f" Max Drawdown: {result.max_drawdown_pct:>12.2f}%")
|
|
print(f" Sharpe Ratio: {sharpe:>12.2f}")
|
|
print(f" Profit Factor: {profit_factor:>12.2f}")
|
|
print(f" Fees Paid: ${result.fees_paid:>12,.2f}")
|
|
print("-"*60)
|
|
print(f" Total Trades: {result.total_trades:>12}")
|
|
print(f" Wins: {result.wins:>12}")
|
|
print(f" Losses: {result.losses:>12}")
|
|
print(f" Win Rate: {win_rate:>11.1f}%")
|
|
print(f" Liquidations: {result.liquidations:>12}")
|
|
print(f" Avg Win: ${avg_win:>12,.2f}")
|
|
print(f" Avg Loss: ${avg_loss:>12,.2f}")
|
|
print(f" Win/Loss Ratio: {abs(avg_win/avg_loss) if avg_loss else 0:>12.2f}")
|
|
print("-"*60)
|
|
print(f" Exit Reasons:")
|
|
for reason, count in sorted(reasons.items(), key=lambda x: -x[1]):
|
|
print(f" {reason:20s} {count:>6}")
|
|
print("="*60)
|
|
|
|
return {
|
|
"total_return_pct": round(total_return, 2),
|
|
"max_drawdown_pct": round(result.max_drawdown_pct, 2),
|
|
"sharpe": round(sharpe, 2),
|
|
"profit_factor": round(profit_factor, 2),
|
|
"total_trades": result.total_trades,
|
|
"win_rate": round(win_rate, 1),
|
|
"liquidations": result.liquidations,
|
|
"avg_win": round(avg_win, 2),
|
|
"avg_loss": round(avg_loss, 2),
|
|
"fees": round(result.fees_paid, 2),
|
|
}
|
|
|
|
|
|
def optimize(months=6):
|
|
"""Grid search over key parameters."""
|
|
print("\n🔧 PARAMETER OPTIMIZATION")
|
|
print("="*60)
|
|
|
|
param_grid = [
|
|
{"tp": 3, "sl": 2, "trail": 1.5},
|
|
{"tp": 4, "sl": 2, "trail": 1.5},
|
|
{"tp": 5, "sl": 3, "trail": 2}, # Current live params
|
|
{"tp": 6, "sl": 3, "trail": 2},
|
|
{"tp": 7, "sl": 3, "trail": 2.5},
|
|
{"tp": 5, "sl": 2, "trail": 1.5},
|
|
{"tp": 8, "sl": 4, "trail": 3},
|
|
{"tp": 10, "sl": 5, "trail": 3},
|
|
{"tp": 5, "sl": 3, "trail": 1},
|
|
{"tp": 4, "sl": 3, "trail": 2},
|
|
]
|
|
|
|
all_results = []
|
|
|
|
for params in param_grid:
|
|
print(f"\n--- TP={params['tp']}% SL={params['sl']}% Trail={params['trail']}% ---")
|
|
result, final_eq = run_backtest(
|
|
months=months,
|
|
tp_pct=params['tp'],
|
|
sl_pct=params['sl'],
|
|
trailing_stop_pct=params['trail'],
|
|
)
|
|
stats = print_results(result, final_eq, 10_000, params)
|
|
stats['params'] = params
|
|
all_results.append(stats)
|
|
|
|
# Rank by return
|
|
print("\n\n" + "="*60)
|
|
print(" OPTIMIZATION RANKINGS")
|
|
print("="*60)
|
|
print(f"{'Rank':>4} {'TP%':>4} {'SL%':>4} {'Trail%':>6} {'Return%':>9} {'MaxDD%':>7} {'WinRate':>8} {'Sharpe':>7} {'Trades':>7}")
|
|
print("-"*60)
|
|
|
|
for rank, r in enumerate(sorted(all_results, key=lambda x: x['total_return_pct'], reverse=True), 1):
|
|
p = r['params']
|
|
marker = " ◄ LIVE" if p['tp'] == 5 and p['sl'] == 3 and p['trail'] == 2 else ""
|
|
print(f"{rank:>4} {p['tp']:>4} {p['sl']:>4} {p['trail']:>6} {r['total_return_pct']:>8.1f}% {r['max_drawdown_pct']:>6.1f}% {r['win_rate']:>7.1f}% {r['sharpe']:>7.2f} {r['total_trades']:>7}{marker}")
|
|
|
|
return all_results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Leverage Strategy Backtester")
|
|
parser.add_argument("--months", type=int, default=6, help="Months of history (default: 6)")
|
|
parser.add_argument("--coins", type=str, help="Comma-separated coin list (e.g. BTC,ETH)")
|
|
parser.add_argument("--optimize", action="store_true", help="Run parameter optimization")
|
|
parser.add_argument("--tp", type=float, default=5.0, help="Take profit %% (default: 5)")
|
|
parser.add_argument("--sl", type=float, default=3.0, help="Stop loss %% (default: 3)")
|
|
parser.add_argument("--trail", type=float, default=2.0, help="Trailing stop %% (default: 2)")
|
|
parser.add_argument("--margin", type=float, default=200.0, help="Margin per trade (default: 200)")
|
|
parser.add_argument("--cash", type=float, default=10000.0, help="Starting cash (default: 10000)")
|
|
parser.add_argument("--vol-filter", type=float, default=0.0, help="Min ATR%% to trade (0=disabled, try 1.0-2.0)")
|
|
parser.add_argument("--vol-filter-coin", type=str, default="BTCUSDT", help="Coin to measure regime volatility on")
|
|
parser.add_argument("--fee-mode", type=str, default="taker", choices=["taker", "maker", "hybrid"], help="Fee mode: taker (0.05%%), maker (0.02%%), hybrid (70%% maker)")
|
|
parser.add_argument("--maker-fill-rate", type=float, default=0.70, help="Hybrid: probability limit order fills (default: 0.70)")
|
|
args = parser.parse_args()
|
|
|
|
if args.optimize:
|
|
results = optimize(args.months)
|
|
# Save results
|
|
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
out = RESULTS_DIR / f"optimize_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
out.write_text(json.dumps(results, indent=2))
|
|
print(f"\nSaved to {out}")
|
|
return
|
|
|
|
coins = None
|
|
if args.coins:
|
|
coins = [c.strip().upper() + "USDT" for c in args.coins.split(",")]
|
|
|
|
result, final_equity = run_backtest(
|
|
coins=coins,
|
|
months=args.months,
|
|
starting_cash=args.cash,
|
|
margin_per_trade=args.margin,
|
|
tp_pct=args.tp,
|
|
sl_pct=args.sl,
|
|
trailing_stop_pct=args.trail,
|
|
vol_filter=args.vol_filter,
|
|
vol_filter_coin=args.vol_filter_coin,
|
|
fee_mode=args.fee_mode,
|
|
maker_fill_rate=args.maker_fill_rate,
|
|
)
|
|
|
|
stats = print_results(result, final_equity, args.cash)
|
|
|
|
# Save results
|
|
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
out = RESULTS_DIR / f"backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
out.write_text(json.dumps({
|
|
"stats": stats,
|
|
"equity_curve": result.equity_curve,
|
|
"trades": result.trades[-100:], # Last 100 trades for review
|
|
"params": {
|
|
"months": args.months,
|
|
"tp": args.tp,
|
|
"sl": args.sl,
|
|
"trail": args.trail,
|
|
"margin": args.margin,
|
|
"starting_cash": args.cash,
|
|
}
|
|
}, indent=2))
|
|
print(f"\nResults saved to {out}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|