230 lines
8.4 KiB
Python
230 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Backtest kch123 copy-trading from full trade history"""
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
with open("kch123-full-trades.json") as f:
|
|
trades = json.load(f)
|
|
|
|
print(f"Total trade records: {len(trades)}")
|
|
|
|
# Separate by type
|
|
buys = [t for t in trades if t.get("type") == "TRADE" and t.get("side") == "BUY"]
|
|
sells = [t for t in trades if t.get("type") == "TRADE" and t.get("side") == "SELL"]
|
|
redeems = [t for t in trades if t.get("type") == "REDEEM"]
|
|
|
|
print(f"BUYs: {len(buys)}, SELLs: {len(sells)}, REDEEMs: {len(redeems)}")
|
|
|
|
# Group by market (conditionId)
|
|
markets = defaultdict(lambda: {"buys": [], "sells": [], "redeems": [], "title": ""})
|
|
|
|
for t in trades:
|
|
cid = t.get("conditionId", "")
|
|
if not cid:
|
|
continue
|
|
markets[cid]["title"] = t.get("title", "")
|
|
if t["type"] == "TRADE" and t.get("side") == "BUY":
|
|
markets[cid]["buys"].append(t)
|
|
elif t["type"] == "TRADE" and t.get("side") == "SELL":
|
|
markets[cid]["sells"].append(t)
|
|
elif t["type"] == "REDEEM":
|
|
markets[cid]["redeems"].append(t)
|
|
|
|
print(f"Unique markets: {len(markets)}")
|
|
|
|
# Reconstruct P&L per market
|
|
results = []
|
|
for cid, data in markets.items():
|
|
total_bought_usdc = sum(t.get("usdcSize", 0) for t in data["buys"])
|
|
total_bought_shares = sum(t.get("size", 0) for t in data["buys"])
|
|
total_sold_usdc = sum(t.get("usdcSize", 0) for t in data["sells"])
|
|
total_redeemed_usdc = sum(t.get("usdcSize", 0) for t in data["redeems"])
|
|
total_redeemed_shares = sum(t.get("size", 0) for t in data["redeems"])
|
|
|
|
# Net cost = bought - sold
|
|
net_cost = total_bought_usdc - total_sold_usdc
|
|
# Returns = redeemed amount
|
|
returns = total_redeemed_usdc
|
|
|
|
# If redeemed shares > 0 and usdc > 0, it was a win
|
|
# If no redeems or redeem usdc=0, could be loss or still open
|
|
pnl = returns - net_cost
|
|
|
|
# Determine status
|
|
if total_redeemed_shares > 0 and total_redeemed_usdc > 0:
|
|
status = "WIN"
|
|
elif total_redeemed_shares > 0 and total_redeemed_usdc == 0:
|
|
status = "LOSS" # redeemed at 0
|
|
elif len(data["redeems"]) > 0:
|
|
status = "LOSS"
|
|
else:
|
|
status = "OPEN"
|
|
|
|
# Get timestamps
|
|
all_times = [t.get("timestamp", 0) for t in data["buys"] + data["sells"] + data["redeems"]]
|
|
first_trade = min(all_times) if all_times else 0
|
|
last_trade = max(all_times) if all_times else 0
|
|
|
|
avg_price = total_bought_usdc / total_bought_shares if total_bought_shares > 0 else 0
|
|
|
|
results.append({
|
|
"conditionId": cid,
|
|
"title": data["title"],
|
|
"status": status,
|
|
"net_cost": round(net_cost, 2),
|
|
"returns": round(returns, 2),
|
|
"pnl": round(pnl, 2),
|
|
"shares_bought": round(total_bought_shares, 2),
|
|
"avg_price": round(avg_price, 4),
|
|
"first_trade": first_trade,
|
|
"last_trade": last_trade,
|
|
"num_buys": len(data["buys"]),
|
|
"num_sells": len(data["sells"]),
|
|
"num_redeems": len(data["redeems"]),
|
|
})
|
|
|
|
# Sort by first trade time
|
|
results.sort(key=lambda x: x["first_trade"])
|
|
|
|
# Stats
|
|
wins = [r for r in results if r["status"] == "WIN"]
|
|
losses = [r for r in results if r["status"] == "LOSS"]
|
|
opens = [r for r in results if r["status"] == "OPEN"]
|
|
resolved = wins + losses
|
|
|
|
total_cost = sum(r["net_cost"] for r in results)
|
|
total_returns = sum(r["returns"] for r in results)
|
|
total_pnl = sum(r["pnl"] for r in results)
|
|
|
|
print(f"\n=== MARKET RESULTS ===")
|
|
print(f"Wins: {len(wins)}, Losses: {len(losses)}, Open: {len(opens)}")
|
|
print(f"Win rate (resolved): {len(wins)/len(resolved)*100:.1f}%" if resolved else "N/A")
|
|
print(f"Total cost: ${total_cost:,.2f}")
|
|
print(f"Total returns: ${total_returns:,.2f}")
|
|
print(f"Total P&L: ${total_pnl:,.2f}")
|
|
|
|
# Top wins and losses
|
|
wins_sorted = sorted(wins, key=lambda x: x["pnl"], reverse=True)
|
|
losses_sorted = sorted(losses, key=lambda x: x["pnl"])
|
|
|
|
print(f"\n=== TOP 10 WINS ===")
|
|
for r in wins_sorted[:10]:
|
|
dt = datetime.fromtimestamp(r["first_trade"]).strftime("%Y-%m-%d") if r["first_trade"] else "?"
|
|
print(f" +${r['pnl']:>12,.2f} | {dt} | {r['title'][:60]}")
|
|
|
|
print(f"\n=== TOP 10 LOSSES ===")
|
|
for r in losses_sorted[:10]:
|
|
dt = datetime.fromtimestamp(r["first_trade"]).strftime("%Y-%m-%d") if r["first_trade"] else "?"
|
|
print(f" -${abs(r['pnl']):>12,.2f} | {dt} | {r['title'][:60]}")
|
|
|
|
# === COPY TRADE SIMULATION ===
|
|
print(f"\n=== COPY-TRADE SIMULATION ($10,000 bankroll) ===")
|
|
|
|
# Process all resolved markets chronologically
|
|
resolved_chrono = sorted(resolved, key=lambda x: x["first_trade"])
|
|
|
|
for scenario_name, slippage in [("Instant", 0), ("30min delay", 0.05), ("1hr delay", 0.10)]:
|
|
bankroll = 10000
|
|
peak = bankroll
|
|
max_dd = 0
|
|
max_dd_pct = 0
|
|
streak = 0
|
|
max_losing_streak = 0
|
|
trade_results = []
|
|
|
|
for r in resolved_chrono:
|
|
# Proportional sizing: his cost / his total capital * our bankroll
|
|
# Use 1% of bankroll per bet as conservative sizing
|
|
position_size = min(bankroll * 0.02, bankroll) # 2% per bet
|
|
if position_size <= 0:
|
|
continue
|
|
|
|
# Adjust entry price for slippage
|
|
entry_price = min(r["avg_price"] * (1 + slippage), 0.99)
|
|
|
|
if r["status"] == "WIN":
|
|
# Payout is $1 per share, cost was entry_price per share
|
|
shares = position_size / entry_price
|
|
payout = shares * 1.0
|
|
trade_pnl = payout - position_size
|
|
streak = 0
|
|
else:
|
|
trade_pnl = -position_size
|
|
streak += 1
|
|
max_losing_streak = max(max_losing_streak, streak)
|
|
|
|
bankroll += trade_pnl
|
|
peak = max(peak, bankroll)
|
|
dd = (peak - bankroll) / peak * 100
|
|
max_dd_pct = max(max_dd_pct, dd)
|
|
trade_results.append(trade_pnl)
|
|
|
|
total_trades = len(trade_results)
|
|
wins_count = sum(1 for t in trade_results if t > 0)
|
|
avg_win = sum(t for t in trade_results if t > 0) / wins_count if wins_count else 0
|
|
avg_loss = sum(t for t in trade_results if t <= 0) / (total_trades - wins_count) if (total_trades - wins_count) > 0 else 0
|
|
|
|
print(f"\n {scenario_name}:")
|
|
print(f" Final bankroll: ${bankroll:,.2f} ({(bankroll/10000-1)*100:+.1f}%)")
|
|
print(f" Trades: {total_trades}, Wins: {wins_count} ({wins_count/total_trades*100:.1f}%)")
|
|
print(f" Avg win: ${avg_win:,.2f}, Avg loss: ${avg_loss:,.2f}")
|
|
print(f" Max drawdown: {max_dd_pct:.1f}%")
|
|
print(f" Max losing streak: {max_losing_streak}")
|
|
|
|
# Also do proportional sizing (mirror his allocation %)
|
|
print(f"\n=== PROPORTIONAL COPY (mirror his sizing) ===")
|
|
his_total_capital = sum(r["net_cost"] for r in resolved_chrono if r["net_cost"] > 0)
|
|
|
|
for scenario_name, slippage in [("Instant", 0), ("30min delay", 0.05), ("1hr delay", 0.10)]:
|
|
bankroll = 10000
|
|
peak = bankroll
|
|
max_dd_pct = 0
|
|
streak = 0
|
|
max_losing_streak = 0
|
|
|
|
for r in resolved_chrono:
|
|
if r["net_cost"] <= 0:
|
|
continue
|
|
# Mirror his position weight
|
|
weight = r["net_cost"] / his_total_capital
|
|
position_size = bankroll * weight * 10 # scale up since weights are tiny with 400+ markets
|
|
position_size = min(position_size, bankroll * 0.25) # cap at 25% of bankroll
|
|
if position_size <= 0:
|
|
continue
|
|
|
|
entry_price = min(r["avg_price"] * (1 + slippage), 0.99)
|
|
|
|
if r["status"] == "WIN":
|
|
shares = position_size / entry_price
|
|
payout = shares * 1.0
|
|
trade_pnl = payout - position_size
|
|
streak = 0
|
|
else:
|
|
trade_pnl = -position_size
|
|
streak += 1
|
|
max_losing_streak = max(max_losing_streak, streak)
|
|
|
|
bankroll += trade_pnl
|
|
peak = max(peak, bankroll)
|
|
dd = (peak - bankroll) / peak * 100
|
|
max_dd_pct = max(max_dd_pct, dd)
|
|
|
|
print(f"\n {scenario_name}:")
|
|
print(f" Final bankroll: ${bankroll:,.2f} ({(bankroll/10000-1)*100:+.1f}%)")
|
|
print(f" Max drawdown: {max_dd_pct:.1f}%")
|
|
print(f" Max losing streak: {max_losing_streak}")
|
|
|
|
# Monthly breakdown
|
|
print(f"\n=== MONTHLY P&L (his actual) ===")
|
|
monthly = defaultdict(float)
|
|
for r in results:
|
|
if r["first_trade"]:
|
|
month = datetime.fromtimestamp(r["first_trade"]).strftime("%Y-%m")
|
|
monthly[month] += r["pnl"]
|
|
|
|
for month in sorted(monthly.keys()):
|
|
bar = "+" * int(monthly[month] / 50000) if monthly[month] > 0 else "-" * int(abs(monthly[month]) / 50000)
|
|
print(f" {month}: ${monthly[month]:>12,.2f} {bar}")
|
|
|