Files
workspace/projects/crypto-signals/scripts/leverage_backtester.py

772 lines
28 KiB
Python

#!/usr/bin/env python3
"""
Leverage Strategy Backtester
Replays the exact same signal logic (RSI, VWAP, MACD, Bollinger, volume)
against historical candle data to validate the live leverage trader.
Usage:
python3 leverage_backtester.py # Default: 6 months, all coins
python3 leverage_backtester.py --months 12 # 12 months
python3 leverage_backtester.py --coins BTC,ETH # Specific coins
python3 leverage_backtester.py --optimize # Grid search params
"""
import json
import math
import time
import argparse
import urllib.request
from datetime import datetime, timezone, timedelta
from pathlib import Path
from dataclasses import dataclass, field
BINANCE_KLINES = "https://api.binance.us/api/v3/klines"
COINS = [
"BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT",
"ADAUSDT", "AVAXUSDT", "LINKUSDT", "DOTUSDT", "MATICUSDT",
"NEARUSDT", "ATOMUSDT", "LTCUSDT", "UNIUSDT", "AAVEUSDT",
"FILUSDT", "ALGOUSDT", "XLMUSDT", "VETUSDT", "ICPUSDT",
]
RESULTS_DIR = Path(__file__).parent.parent / "data" / "backtest-results"
CACHE_DIR = Path(__file__).parent.parent / "data" / "kline-cache"
# ── Indicator Calculations (identical to short_scanner.py) ──
def calc_rsi(closes, period=14):
if len(closes) < period + 1:
return 50
deltas = [closes[i] - closes[i-1] for i in range(1, len(closes))]
gains = [d if d > 0 else 0 for d in deltas]
losses = [-d if d < 0 else 0 for d in deltas]
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
for i in range(period, len(deltas)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
if avg_loss == 0:
return 100
return round(100 - (100 / (1 + avg_gain / avg_loss)), 1)
def calc_vwap(klines_window):
cum_vol = cum_tp_vol = 0
for k in klines_window:
tp = (k['high'] + k['low'] + k['close']) / 3
cum_vol += k['volume']
cum_tp_vol += tp * k['volume']
return cum_tp_vol / cum_vol if cum_vol else 0
def calc_ema(values, period):
if not values:
return 0
mult = 2 / (period + 1)
ema = values[0]
for v in values[1:]:
ema = (v - ema) * mult + ema
return ema
def calc_macd(closes):
if len(closes) < 26:
return 0, 0, 0
ema12 = calc_ema(closes, 12)
ema26 = calc_ema(closes, 26)
macd_line = ema12 - ema26
signal = calc_ema(closes[-9:], 9) if len(closes) >= 9 else macd_line
return macd_line, signal, macd_line - signal
def calc_bollinger_position(closes, period=20):
if len(closes) < period:
return 0.5
recent = closes[-period:]
sma = sum(recent) / period
std = (sum((x - sma)**2 for x in recent) / period) ** 0.5
if std == 0:
return 0.5
upper = sma + 2 * std
lower = sma - 2 * std
bw = upper - lower
return (closes[-1] - lower) / bw if bw else 0.5
def calc_atr(klines, period=14):
"""Average True Range — measures volatility."""
if len(klines) < period + 1:
return 0
trs = []
for i in range(1, len(klines)):
h, l, pc = klines[i]['high'], klines[i]['low'], klines[i-1]['close']
tr = max(h - l, abs(h - pc), abs(l - pc))
trs.append(tr)
if len(trs) < period:
return sum(trs) / len(trs) if trs else 0
atr = sum(trs[:period]) / period
for i in range(period, len(trs)):
atr = (atr * (period - 1) + trs[i]) / period
return atr
def calc_volatility_regime(klines, period=14):
"""Returns ATR as % of price. Higher = more volatile = better for trading."""
atr = calc_atr(klines, period)
price = klines[-1]['close'] if klines else 1
return (atr / price * 100) if price else 0
def volume_trend(klines, lookback=10):
if len(klines) < lookback * 2:
return 1.0
recent = sum(k['volume'] for k in klines[-lookback:]) / lookback
older = sum(k['volume'] for k in klines[-lookback*2:-lookback]) / lookback
return recent / older if older else 1.0
# ── Signal Scoring (identical to leverage_trader.py) ──
def score_short(closes, klines):
price = closes[-1]
rsi = calc_rsi(closes)
vwap = calc_vwap(klines[-24:])
vwap_pct = ((price - vwap) / vwap * 100) if vwap else 0
_, _, histogram = calc_macd(closes)
bb_pos = calc_bollinger_position(closes)
vol = volume_trend(klines)
p24 = closes[-24] if len(closes) >= 24 else closes[0]
chg24 = ((price - p24) / p24 * 100) if p24 else 0
score = 0
if rsi >= 80: score += 30
elif rsi >= 70: score += 25
elif rsi >= 65: score += 15
elif rsi >= 60: score += 5
if vwap_pct > 5: score += 20
elif vwap_pct > 3: score += 15
elif vwap_pct > 1: score += 8
if histogram < -0.001 * price: score += 15
elif histogram < 0: score += 10
if bb_pos > 1.0: score += 15
elif bb_pos > 0.85: score += 10
if chg24 > 15: score += 15
elif chg24 > 8: score += 10
elif chg24 > 4: score += 5
if chg24 > 2 and vol < 0.7: score += 5
return score, "short", rsi
def score_long(closes, klines):
price = closes[-1]
rsi = calc_rsi(closes)
vwap = calc_vwap(klines[-24:])
vwap_pct = ((price - vwap) / vwap * 100) if vwap else 0
bb_pos = calc_bollinger_position(closes)
p24 = closes[-24] if len(closes) >= 24 else closes[0]
chg24 = ((price - p24) / p24 * 100) if p24 else 0
score = 0
if rsi <= 25: score += 30
elif rsi <= 30: score += 25
elif rsi <= 35: score += 15
elif rsi <= 40: score += 5
if vwap_pct < -5: score += 20
elif vwap_pct < -3: score += 15
elif vwap_pct < -1: score += 8
if chg24 < -15: score += 15
elif chg24 < -8: score += 10
elif chg24 < -4: score += 5
if bb_pos < 0: score += 15
elif bb_pos < 0.15: score += 10
return score, "long", rsi
# ── Data Fetching with Cache ──
def fetch_klines_cached(symbol, interval, start_ms, end_ms):
"""Fetch klines with local file cache to avoid re-downloading."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / f"{symbol}_{interval}_{start_ms}_{end_ms}.json"
if cache_file.exists():
return json.loads(cache_file.read_text())
print(f" Downloading {symbol} {interval} candles...")
all_klines = []
current = start_ms
while current < end_ms:
url = f"{BINANCE_KLINES}?symbol={symbol}&interval={interval}&startTime={current}&endTime={end_ms}&limit=1000"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
try:
resp = urllib.request.urlopen(req, timeout=15)
raw = json.loads(resp.read())
if not raw:
break
for k in raw:
all_klines.append({
'open_time': k[0],
'open': float(k[1]),
'high': float(k[2]),
'low': float(k[3]),
'close': float(k[4]),
'volume': float(k[5]),
'close_time': k[6],
})
current = raw[-1][6] + 1
time.sleep(0.12) # Rate limit
except Exception as e:
print(f" Error: {e}")
break
if all_klines:
cache_file.write_text(json.dumps(all_klines))
print(f" Got {len(all_klines)} candles")
return all_klines
# ── Position Simulation ──
@dataclass
class Position:
symbol: str
direction: str # "long" or "short"
entry_price: float
margin: float
leverage: int
entry_time: int # ms timestamp
peak_pnl_pct: float = 0.0
@property
def notional(self):
return self.margin * self.leverage
def pnl_at(self, price):
shares = self.notional / self.entry_price
if self.direction == "long":
return (price - self.entry_price) * shares
else:
return (self.entry_price - price) * shares
def pnl_pct_at(self, price):
pnl = self.pnl_at(price)
return (pnl / self.margin) * 100
def is_liquidated(self, price):
if self.direction == "long":
liq = self.entry_price * (1 - 1 / self.leverage)
return price <= liq
else:
liq = self.entry_price * (1 + 1 / self.leverage)
return price >= liq
@dataclass
class BacktestResult:
total_trades: int = 0
wins: int = 0
losses: int = 0
liquidations: int = 0
total_pnl: float = 0.0
max_drawdown_pct: float = 0.0
peak_equity: float = 0.0
fees_paid: float = 0.0
trades: list = field(default_factory=list)
equity_curve: list = field(default_factory=list)
def run_backtest(
coins=None,
months=6,
starting_cash=10_000.0,
margin_per_trade=200.0,
max_positions=10,
short_threshold=50,
long_threshold=45,
tp_pct=5.0,
sl_pct=3.0,
trailing_stop_pct=2.0,
taker_fee_pct=0.05,
maker_fee_pct=0.02,
fee_mode="taker", # "taker", "maker", "hybrid"
maker_fill_rate=0.70, # hybrid: 70% of entries fill as maker
scan_interval=15, # candles between scans (15 = every 15h on 1h candles)
vol_filter=0.0, # Min ATR% on BTC to allow entries (0=disabled)
vol_filter_coin="BTCUSDT",
):
"""
Run backtest over historical data.
Uses 1h candles, scans every `scan_interval` candles for signals.
"""
import random
def calc_fee(notional_usd):
"""Calculate fee based on fee mode."""
if fee_mode == "maker":
return notional_usd * maker_fee_pct / 100
elif fee_mode == "hybrid":
# Each trade randomly fills as maker or taker based on fill rate
if random.random() < maker_fill_rate:
return notional_usd * maker_fee_pct / 100
else:
return notional_usd * taker_fee_pct / 100
else: # taker
return notional_usd * taker_fee_pct / 100
if coins is None:
coins = COINS
now = datetime.now(timezone.utc)
start = now - timedelta(days=months * 30)
start_ms = int(start.timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
# Fetch all historical data
print(f"=== Leverage Strategy Backtester ===")
print(f"Period: {start.strftime('%Y-%m-%d')}{now.strftime('%Y-%m-%d')} ({months} months)")
print(f"Coins: {len(coins)}")
vf_str = f" | vol_filter={vol_filter}% on {vol_filter_coin}" if vol_filter > 0 else ""
fee_str = f" | fees={fee_mode}" + (f"({maker_fill_rate:.0%} maker)" if fee_mode == "hybrid" else "")
print(f"Params: margin=${margin_per_trade} | TP={tp_pct}% | SL={sl_pct}% | trailing={trailing_stop_pct}%{vf_str}{fee_str}")
print(f"Thresholds: short≥{short_threshold} | long≥{long_threshold}")
print()
coin_data = {}
for sym in coins:
klines = fetch_klines_cached(sym, '1h', start_ms, end_ms)
if len(klines) >= 100:
coin_data[sym] = klines
else:
print(f" Skipping {sym}: only {len(klines)} candles")
if not coin_data:
print("No data available!")
return None
# Find common time range
min_len = min(len(v) for v in coin_data.values())
print(f"\nSimulating on {len(coin_data)} coins, {min_len} candles each\n")
# Backtest state
cash = starting_cash
positions: list[Position] = []
result = BacktestResult()
result.peak_equity = starting_cash
# Walk forward through time
lookback = 100 # Need 100 candles for indicators
for i in range(lookback, min_len):
# Current time
sample_sym = list(coin_data.keys())[0]
current_time = coin_data[sample_sym][i]['close_time']
# ── Check exits on every candle ──
to_close = []
for idx, pos in enumerate(positions):
sym_data = coin_data.get(pos.symbol + "USDT" if not pos.symbol.endswith("USDT") else pos.symbol)
if not sym_data or i >= len(sym_data):
continue
candle = sym_data[i]
# Check against high AND low for more realistic simulation
# For longs: SL could hit on low, TP on high
# For shorts: SL could hit on high, TP on low
if pos.direction == "long":
worst_price = candle['low']
best_price = candle['high']
else:
worst_price = candle['high']
best_price = candle['low']
current_price = candle['close']
# Check liquidation on worst price
if pos.is_liquidated(worst_price):
to_close.append((idx, "liquidated", current_price))
continue
# Check stop loss on worst price
worst_pnl_pct = pos.pnl_pct_at(worst_price)
if worst_pnl_pct <= -sl_pct:
# Stopped out — approximate exit at SL level
to_close.append((idx, "sl", current_price))
continue
# Check take profit on best price
best_pnl_pct = pos.pnl_pct_at(best_price)
if best_pnl_pct >= tp_pct:
to_close.append((idx, "tp", current_price))
continue
# Update peak and check trailing stop
close_pnl_pct = pos.pnl_pct_at(current_price)
if close_pnl_pct > pos.peak_pnl_pct:
pos.peak_pnl_pct = close_pnl_pct
if pos.peak_pnl_pct >= 2.0 and (pos.peak_pnl_pct - close_pnl_pct) >= trailing_stop_pct:
to_close.append((idx, "trailing", current_price))
# Process closes (reverse order to preserve indices)
for idx, reason, price in sorted(to_close, key=lambda x: x[0], reverse=True):
pos = positions[idx]
pnl = pos.pnl_at(price)
fee = calc_fee(pos.notional)
net_pnl = pnl - fee
if reason == "liquidated":
net_pnl = -pos.margin # Lose entire margin
result.liquidations += 1
cash += pos.margin + net_pnl
result.total_pnl += net_pnl
result.fees_paid += fee
result.total_trades += 1
if net_pnl > 0:
result.wins += 1
else:
result.losses += 1
result.trades.append({
"symbol": pos.symbol,
"direction": pos.direction,
"entry_price": pos.entry_price,
"exit_price": price,
"leverage": pos.leverage,
"margin": pos.margin,
"pnl": round(net_pnl, 2),
"pnl_pct": round(pos.pnl_pct_at(price), 2),
"reason": reason,
"entry_time": pos.entry_time,
"exit_time": current_time,
})
positions.pop(idx)
# ── Scan for entries every N candles ──
if (i - lookback) % scan_interval == 0 and len(positions) < max_positions:
# Volatility regime filter: skip entries in low-vol environments
if vol_filter > 0 and vol_filter_coin in coin_data:
vf_klines = coin_data[vol_filter_coin][max(0,i-23):i+1]
regime_vol = calc_volatility_regime(vf_klines)
if regime_vol < vol_filter:
# Record equity curve point even when skipping
if (i - lookback) % 24 == 0:
unrealized = 0
for pos in positions:
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
if sym_key in coin_data and i < len(coin_data[sym_key]):
unrealized += pos.pnl_at(coin_data[sym_key][i]['close'])
margin_locked = sum(p.margin for p in positions)
equity = cash + margin_locked + unrealized
if equity > result.peak_equity:
result.peak_equity = equity
dd = ((result.peak_equity - equity) / result.peak_equity * 100) if result.peak_equity > 0 else 0
if dd > result.max_drawdown_pct:
result.max_drawdown_pct = dd
result.equity_curve.append({"time": current_time, "equity": round(equity, 2), "cash": round(cash, 2), "positions": len(positions), "drawdown_pct": round(dd, 2)})
continue # Skip entries in low-vol regime
open_symbols = {p.symbol for p in positions}
candidates = []
for sym, klines in coin_data.items():
if sym.replace("USDT", "") in open_symbols or sym in open_symbols:
continue
if i >= len(klines):
continue
window = klines[i-99:i+1]
closes = [k['close'] for k in window]
# Score both directions
short_score, _, _ = score_short(closes, window)
long_score, _, _ = score_long(closes, window)
if short_score >= short_threshold:
candidates.append((sym, "short", short_score))
if long_score >= long_threshold:
candidates.append((sym, "long", long_score))
# Sort by score, take best
candidates.sort(key=lambda x: x[2], reverse=True)
for sym, direction, score in candidates:
if len(positions) >= max_positions:
break
clean_sym = sym.replace("USDT", "")
if clean_sym in {p.symbol for p in positions}:
continue
if cash < margin_per_trade:
break
entry_price = coin_data[sym][i]['close']
lev = 15 if score >= 70 else 10 if score >= 60 else 7
fee = calc_fee(margin_per_trade * lev)
cash -= margin_per_trade + fee
result.fees_paid += fee
positions.append(Position(
symbol=clean_sym,
direction=direction,
entry_price=entry_price,
margin=margin_per_trade,
leverage=lev,
entry_time=current_time,
))
# ── Track equity curve ──
unrealized = 0
for pos in positions:
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
if sym_key in coin_data and i < len(coin_data[sym_key]):
unrealized += pos.pnl_at(coin_data[sym_key][i]['close'])
margin_locked = sum(p.margin for p in positions)
equity = cash + margin_locked + unrealized
if equity > result.peak_equity:
result.peak_equity = equity
dd = ((result.peak_equity - equity) / result.peak_equity * 100) if result.peak_equity > 0 else 0
if dd > result.max_drawdown_pct:
result.max_drawdown_pct = dd
# Sample equity curve every 24 candles (daily)
if (i - lookback) % 24 == 0:
result.equity_curve.append({
"time": current_time,
"equity": round(equity, 2),
"cash": round(cash, 2),
"positions": len(positions),
"drawdown_pct": round(dd, 2),
})
# Close remaining positions at last price
for pos in positions:
sym_key = pos.symbol if pos.symbol.endswith("USDT") else pos.symbol + "USDT"
if sym_key in coin_data:
price = coin_data[sym_key][-1]['close']
pnl = pos.pnl_at(price)
fee = calc_fee(pos.notional)
net_pnl = pnl - fee
cash += pos.margin + net_pnl
result.total_pnl += net_pnl
result.total_trades += 1
if net_pnl > 0:
result.wins += 1
else:
result.losses += 1
return result, cash
def print_results(result, final_equity, starting_cash, params=None):
"""Pretty print backtest results."""
total_return = ((final_equity - starting_cash) / starting_cash) * 100
win_rate = (result.wins / result.total_trades * 100) if result.total_trades else 0
# Calculate Sharpe-like ratio from equity curve
if len(result.equity_curve) >= 2:
returns = []
for i in range(1, len(result.equity_curve)):
prev = result.equity_curve[i-1]['equity']
curr = result.equity_curve[i]['equity']
if prev > 0:
returns.append((curr - prev) / prev)
if returns:
avg_ret = sum(returns) / len(returns)
std_ret = (sum((r - avg_ret)**2 for r in returns) / len(returns)) ** 0.5
sharpe = (avg_ret / std_ret * (365**0.5)) if std_ret > 0 else 0
else:
sharpe = 0
else:
sharpe = 0
# Profit factor
gross_wins = sum(t['pnl'] for t in result.trades if t['pnl'] > 0)
gross_losses = abs(sum(t['pnl'] for t in result.trades if t['pnl'] < 0))
profit_factor = gross_wins / gross_losses if gross_losses > 0 else float('inf')
# Average win/loss
wins_list = [t['pnl'] for t in result.trades if t['pnl'] > 0]
losses_list = [t['pnl'] for t in result.trades if t['pnl'] < 0]
avg_win = sum(wins_list) / len(wins_list) if wins_list else 0
avg_loss = sum(losses_list) / len(losses_list) if losses_list else 0
# Exit reason breakdown
reasons = {}
for t in result.trades:
r = t.get('reason', 'unknown')
reasons[r] = reasons.get(r, 0) + 1
print("\n" + "="*60)
print(" BACKTEST RESULTS")
print("="*60)
if params:
print(f" Params: TP={params.get('tp')}% SL={params.get('sl')}% Trail={params.get('trail')}%")
print(f" Starting Capital: ${starting_cash:>12,.2f}")
print(f" Final Equity: ${final_equity:>12,.2f}")
print(f" Total Return: {total_return:>12.2f}%")
print(f" Max Drawdown: {result.max_drawdown_pct:>12.2f}%")
print(f" Sharpe Ratio: {sharpe:>12.2f}")
print(f" Profit Factor: {profit_factor:>12.2f}")
print(f" Fees Paid: ${result.fees_paid:>12,.2f}")
print("-"*60)
print(f" Total Trades: {result.total_trades:>12}")
print(f" Wins: {result.wins:>12}")
print(f" Losses: {result.losses:>12}")
print(f" Win Rate: {win_rate:>11.1f}%")
print(f" Liquidations: {result.liquidations:>12}")
print(f" Avg Win: ${avg_win:>12,.2f}")
print(f" Avg Loss: ${avg_loss:>12,.2f}")
print(f" Win/Loss Ratio: {abs(avg_win/avg_loss) if avg_loss else 0:>12.2f}")
print("-"*60)
print(f" Exit Reasons:")
for reason, count in sorted(reasons.items(), key=lambda x: -x[1]):
print(f" {reason:20s} {count:>6}")
print("="*60)
return {
"total_return_pct": round(total_return, 2),
"max_drawdown_pct": round(result.max_drawdown_pct, 2),
"sharpe": round(sharpe, 2),
"profit_factor": round(profit_factor, 2),
"total_trades": result.total_trades,
"win_rate": round(win_rate, 1),
"liquidations": result.liquidations,
"avg_win": round(avg_win, 2),
"avg_loss": round(avg_loss, 2),
"fees": round(result.fees_paid, 2),
}
def optimize(months=6):
"""Grid search over key parameters."""
print("\n🔧 PARAMETER OPTIMIZATION")
print("="*60)
param_grid = [
{"tp": 3, "sl": 2, "trail": 1.5},
{"tp": 4, "sl": 2, "trail": 1.5},
{"tp": 5, "sl": 3, "trail": 2}, # Current live params
{"tp": 6, "sl": 3, "trail": 2},
{"tp": 7, "sl": 3, "trail": 2.5},
{"tp": 5, "sl": 2, "trail": 1.5},
{"tp": 8, "sl": 4, "trail": 3},
{"tp": 10, "sl": 5, "trail": 3},
{"tp": 5, "sl": 3, "trail": 1},
{"tp": 4, "sl": 3, "trail": 2},
]
all_results = []
for params in param_grid:
print(f"\n--- TP={params['tp']}% SL={params['sl']}% Trail={params['trail']}% ---")
result, final_eq = run_backtest(
months=months,
tp_pct=params['tp'],
sl_pct=params['sl'],
trailing_stop_pct=params['trail'],
)
stats = print_results(result, final_eq, 10_000, params)
stats['params'] = params
all_results.append(stats)
# Rank by return
print("\n\n" + "="*60)
print(" OPTIMIZATION RANKINGS")
print("="*60)
print(f"{'Rank':>4} {'TP%':>4} {'SL%':>4} {'Trail%':>6} {'Return%':>9} {'MaxDD%':>7} {'WinRate':>8} {'Sharpe':>7} {'Trades':>7}")
print("-"*60)
for rank, r in enumerate(sorted(all_results, key=lambda x: x['total_return_pct'], reverse=True), 1):
p = r['params']
marker = " ◄ LIVE" if p['tp'] == 5 and p['sl'] == 3 and p['trail'] == 2 else ""
print(f"{rank:>4} {p['tp']:>4} {p['sl']:>4} {p['trail']:>6} {r['total_return_pct']:>8.1f}% {r['max_drawdown_pct']:>6.1f}% {r['win_rate']:>7.1f}% {r['sharpe']:>7.2f} {r['total_trades']:>7}{marker}")
return all_results
def main():
parser = argparse.ArgumentParser(description="Leverage Strategy Backtester")
parser.add_argument("--months", type=int, default=6, help="Months of history (default: 6)")
parser.add_argument("--coins", type=str, help="Comma-separated coin list (e.g. BTC,ETH)")
parser.add_argument("--optimize", action="store_true", help="Run parameter optimization")
parser.add_argument("--tp", type=float, default=5.0, help="Take profit %% (default: 5)")
parser.add_argument("--sl", type=float, default=3.0, help="Stop loss %% (default: 3)")
parser.add_argument("--trail", type=float, default=2.0, help="Trailing stop %% (default: 2)")
parser.add_argument("--margin", type=float, default=200.0, help="Margin per trade (default: 200)")
parser.add_argument("--cash", type=float, default=10000.0, help="Starting cash (default: 10000)")
parser.add_argument("--vol-filter", type=float, default=0.0, help="Min ATR%% to trade (0=disabled, try 1.0-2.0)")
parser.add_argument("--vol-filter-coin", type=str, default="BTCUSDT", help="Coin to measure regime volatility on")
parser.add_argument("--fee-mode", type=str, default="taker", choices=["taker", "maker", "hybrid"], help="Fee mode: taker (0.05%%), maker (0.02%%), hybrid (70%% maker)")
parser.add_argument("--maker-fill-rate", type=float, default=0.70, help="Hybrid: probability limit order fills (default: 0.70)")
args = parser.parse_args()
if args.optimize:
results = optimize(args.months)
# Save results
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
out = RESULTS_DIR / f"optimize_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
out.write_text(json.dumps(results, indent=2))
print(f"\nSaved to {out}")
return
coins = None
if args.coins:
coins = [c.strip().upper() + "USDT" for c in args.coins.split(",")]
result, final_equity = run_backtest(
coins=coins,
months=args.months,
starting_cash=args.cash,
margin_per_trade=args.margin,
tp_pct=args.tp,
sl_pct=args.sl,
trailing_stop_pct=args.trail,
vol_filter=args.vol_filter,
vol_filter_coin=args.vol_filter_coin,
fee_mode=args.fee_mode,
maker_fill_rate=args.maker_fill_rate,
)
stats = print_results(result, final_equity, args.cash)
# Save results
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
out = RESULTS_DIR / f"backtest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
out.write_text(json.dumps({
"stats": stats,
"equity_curve": result.equity_curve,
"trades": result.trades[-100:], # Last 100 trades for review
"params": {
"months": args.months,
"tp": args.tp,
"sl": args.sl,
"trail": args.trail,
"margin": args.margin,
"starting_cash": args.cash,
}
}, indent=2))
print(f"\nResults saved to {out}")
if __name__ == "__main__":
main()