Files
workspace/projects/crypto-signals/scripts/backtester.py
Case be0315894e Add crypto signals pipeline + Polymarket arb scanner
- Signal parser for Telegram JSON exports
- Price fetcher using Binance US API
- Backtester with fee-aware simulation
- Polymarket 15-min arb scanner with orderbook checking
- Systemd timer every 2 min for arb alerts
- Paper trade tracking
- Investigation: polymarket-15min-arb.md
2026-02-09 14:31:51 -06:00

260 lines
9.0 KiB
Python

#!/usr/bin/env python3
"""
Crypto Signal Backtester
Simulates each signal against historical price data to determine outcomes.
"""
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from price_fetcher import get_all_klines, get_current_price, normalize_symbol, datetime_to_ms
def simulate_signal(signal, klines):
"""
Simulate a signal against historical candle data.
Returns outcome dict with result, P&L, time to resolution, etc.
"""
direction = signal['direction']
entry = signal.get('entry')
stop_loss = signal.get('stop_loss')
targets = signal.get('targets', [])
leverage = signal.get('leverage', 1)
if not targets or not stop_loss:
return {'result': 'incomplete', 'reason': 'missing SL or targets'}
target = targets[0] # Primary target
# If entry is 'market', use first candle's open
if entry == 'market' or entry is None:
if not klines:
return {'result': 'no_data'}
entry = klines[0]['open']
signal['entry_resolved'] = entry
# Calculate risk/reward
if direction == 'short':
risk = abs(stop_loss - entry)
reward = abs(entry - target)
risk_pct = risk / entry * 100
reward_pct = reward / entry * 100
else: # long
risk = abs(entry - stop_loss)
reward = abs(target - entry)
risk_pct = risk / entry * 100
reward_pct = reward / entry * 100
rr_ratio = reward / risk if risk > 0 else 0
result = {
'entry_price': entry,
'stop_loss': stop_loss,
'target': target,
'direction': direction,
'leverage': leverage,
'risk_pct': round(risk_pct, 2),
'reward_pct': round(reward_pct, 2),
'rr_ratio': round(rr_ratio, 2),
}
# Walk through candles
for i, candle in enumerate(klines):
high = candle['high']
low = candle['low']
if direction == 'short':
# Check SL hit (price went above SL)
sl_hit = high >= stop_loss
# Check TP hit (price went below target)
tp_hit = low <= target
else: # long
# Check SL hit (price went below SL)
sl_hit = low <= stop_loss
# Check TP hit (price went above target)
tp_hit = high >= target
if sl_hit and tp_hit:
# Both hit in same candle — assume SL hit first (conservative)
result['result'] = 'stop_loss'
result['exit_price'] = stop_loss
result['candles_to_exit'] = i + 1
result['exit_time'] = candle['open_time']
break
elif tp_hit:
result['result'] = 'target_hit'
result['exit_price'] = target
result['candles_to_exit'] = i + 1
result['exit_time'] = candle['open_time']
break
elif sl_hit:
result['result'] = 'stop_loss'
result['exit_price'] = stop_loss
result['candles_to_exit'] = i + 1
result['exit_time'] = candle['open_time']
break
else:
# Never resolved — check current unrealized P&L
if klines:
last_price = klines[-1]['close']
if direction == 'short':
unrealized_pct = (entry - last_price) / entry * 100
else:
unrealized_pct = (last_price - entry) / entry * 100
result['result'] = 'open'
result['last_price'] = last_price
result['unrealized_pct'] = round(unrealized_pct, 2)
result['unrealized_pct_leveraged'] = round(unrealized_pct * leverage, 2)
else:
result['result'] = 'no_data'
# Calculate P&L
if result['result'] in ('target_hit', 'stop_loss'):
exit_price = result['exit_price']
if direction == 'short':
pnl_pct = (entry - exit_price) / entry * 100
else:
pnl_pct = (exit_price - entry) / entry * 100
result['pnl_pct'] = round(pnl_pct, 2)
result['pnl_pct_leveraged'] = round(pnl_pct * leverage, 2)
return result
def backtest_signals(signals, interval='5m', lookforward_hours=72):
"""Backtest a list of parsed signals."""
results = []
for i, signal in enumerate(signals):
ticker = signal['ticker']
symbol = normalize_symbol(ticker)
timestamp = signal.get('timestamp', '')
print(f"[{i+1}/{len(signals)}] {ticker} {signal['direction']} ...", end=' ', flush=True)
# Get start time
start_ms = datetime_to_ms(timestamp) if timestamp else int(time.time() * 1000)
end_ms = start_ms + (lookforward_hours * 60 * 60 * 1000)
# Cap at current time
now_ms = int(time.time() * 1000)
if end_ms > now_ms:
end_ms = now_ms
# Fetch candles
klines = get_all_klines(symbol, interval, start_ms, end_ms)
if not klines:
print(f"NO DATA")
results.append({**signal, 'backtest': {'result': 'no_data', 'reason': f'no klines for {symbol}'}})
continue
# Simulate
outcome = simulate_signal(signal, klines)
print(f"{outcome['result']} | PnL: {outcome.get('pnl_pct_leveraged', outcome.get('unrealized_pct_leveraged', '?'))}%")
results.append({**signal, 'backtest': outcome})
time.sleep(0.2) # Rate limit
return results
def generate_report(results):
"""Generate a summary report from backtest results."""
total = len(results)
wins = [r for r in results if r['backtest'].get('result') == 'target_hit']
losses = [r for r in results if r['backtest'].get('result') == 'stop_loss']
open_trades = [r for r in results if r['backtest'].get('result') == 'open']
no_data = [r for r in results if r['backtest'].get('result') in ('no_data', 'incomplete')]
resolved = wins + losses
win_rate = len(wins) / len(resolved) * 100 if resolved else 0
avg_win = sum(r['backtest']['pnl_pct_leveraged'] for r in wins) / len(wins) if wins else 0
avg_loss = sum(r['backtest']['pnl_pct_leveraged'] for r in losses) / len(losses) if losses else 0
total_pnl = sum(r['backtest'].get('pnl_pct_leveraged', 0) for r in resolved)
# Profit factor
gross_profit = sum(r['backtest']['pnl_pct_leveraged'] for r in wins)
gross_loss = abs(sum(r['backtest']['pnl_pct_leveraged'] for r in losses))
profit_factor = gross_profit / gross_loss if gross_loss > 0 else float('inf')
# Risk/reward stats
avg_rr = sum(r['backtest'].get('rr_ratio', 0) for r in resolved) / len(resolved) if resolved else 0
report = {
'summary': {
'total_signals': total,
'wins': len(wins),
'losses': len(losses),
'open': len(open_trades),
'no_data': len(no_data),
'win_rate': round(win_rate, 1),
'avg_win_pct': round(avg_win, 2),
'avg_loss_pct': round(avg_loss, 2),
'total_pnl_pct': round(total_pnl, 2),
'profit_factor': round(profit_factor, 2),
'avg_risk_reward': round(avg_rr, 2),
},
'trades': results,
}
return report
def print_report(report):
"""Pretty print the report."""
s = report['summary']
print("\n" + "=" * 60)
print("CRYPTO SIGNAL BACKTEST REPORT")
print("=" * 60)
print(f"Total Signals: {s['total_signals']}")
print(f"Wins: {s['wins']}")
print(f"Losses: {s['losses']}")
print(f"Open: {s['open']}")
print(f"No Data: {s['no_data']}")
print(f"Win Rate: {s['win_rate']}%")
print(f"Avg Win: +{s['avg_win_pct']}% (leveraged)")
print(f"Avg Loss: {s['avg_loss_pct']}% (leveraged)")
print(f"Total P&L: {s['total_pnl_pct']}% (sum of resolved)")
print(f"Profit Factor: {s['profit_factor']}")
print(f"Avg R:R: {s['avg_risk_reward']}")
print("=" * 60)
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 backtester.py <signals.json> [--interval 5m] [--hours 72]")
print("\nRun signal_parser.py first to generate signals.json")
sys.exit(1)
signals_path = sys.argv[1]
interval = '5m'
hours = 72
for i, arg in enumerate(sys.argv):
if arg == '--interval' and i + 1 < len(sys.argv):
interval = sys.argv[i + 1]
if arg == '--hours' and i + 1 < len(sys.argv):
hours = int(sys.argv[i + 1])
with open(signals_path) as f:
signals = json.load(f)
print(f"Backtesting {len(signals)} signals (interval={interval}, lookforward={hours}h)\n")
results = backtest_signals(signals, interval=interval, lookforward_hours=hours)
report = generate_report(results)
print_report(report)
# Save full report
out_path = signals_path.replace('.json', '_backtest.json')
with open(out_path, 'w') as f:
json.dump(report, f, indent=2)
print(f"\nFull report saved to {out_path}")