Files

230 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""Backtest kch123 copy-trading from full trade history"""
import json
from collections import defaultdict
from datetime import datetime
with open("kch123-full-trades.json") as f:
trades = json.load(f)
print(f"Total trade records: {len(trades)}")
# Separate by type
buys = [t for t in trades if t.get("type") == "TRADE" and t.get("side") == "BUY"]
sells = [t for t in trades if t.get("type") == "TRADE" and t.get("side") == "SELL"]
redeems = [t for t in trades if t.get("type") == "REDEEM"]
print(f"BUYs: {len(buys)}, SELLs: {len(sells)}, REDEEMs: {len(redeems)}")
# Group by market (conditionId)
markets = defaultdict(lambda: {"buys": [], "sells": [], "redeems": [], "title": ""})
for t in trades:
cid = t.get("conditionId", "")
if not cid:
continue
markets[cid]["title"] = t.get("title", "")
if t["type"] == "TRADE" and t.get("side") == "BUY":
markets[cid]["buys"].append(t)
elif t["type"] == "TRADE" and t.get("side") == "SELL":
markets[cid]["sells"].append(t)
elif t["type"] == "REDEEM":
markets[cid]["redeems"].append(t)
print(f"Unique markets: {len(markets)}")
# Reconstruct P&L per market
results = []
for cid, data in markets.items():
total_bought_usdc = sum(t.get("usdcSize", 0) for t in data["buys"])
total_bought_shares = sum(t.get("size", 0) for t in data["buys"])
total_sold_usdc = sum(t.get("usdcSize", 0) for t in data["sells"])
total_redeemed_usdc = sum(t.get("usdcSize", 0) for t in data["redeems"])
total_redeemed_shares = sum(t.get("size", 0) for t in data["redeems"])
# Net cost = bought - sold
net_cost = total_bought_usdc - total_sold_usdc
# Returns = redeemed amount
returns = total_redeemed_usdc
# If redeemed shares > 0 and usdc > 0, it was a win
# If no redeems or redeem usdc=0, could be loss or still open
pnl = returns - net_cost
# Determine status
if total_redeemed_shares > 0 and total_redeemed_usdc > 0:
status = "WIN"
elif total_redeemed_shares > 0 and total_redeemed_usdc == 0:
status = "LOSS" # redeemed at 0
elif len(data["redeems"]) > 0:
status = "LOSS"
else:
status = "OPEN"
# Get timestamps
all_times = [t.get("timestamp", 0) for t in data["buys"] + data["sells"] + data["redeems"]]
first_trade = min(all_times) if all_times else 0
last_trade = max(all_times) if all_times else 0
avg_price = total_bought_usdc / total_bought_shares if total_bought_shares > 0 else 0
results.append({
"conditionId": cid,
"title": data["title"],
"status": status,
"net_cost": round(net_cost, 2),
"returns": round(returns, 2),
"pnl": round(pnl, 2),
"shares_bought": round(total_bought_shares, 2),
"avg_price": round(avg_price, 4),
"first_trade": first_trade,
"last_trade": last_trade,
"num_buys": len(data["buys"]),
"num_sells": len(data["sells"]),
"num_redeems": len(data["redeems"]),
})
# Sort by first trade time
results.sort(key=lambda x: x["first_trade"])
# Stats
wins = [r for r in results if r["status"] == "WIN"]
losses = [r for r in results if r["status"] == "LOSS"]
opens = [r for r in results if r["status"] == "OPEN"]
resolved = wins + losses
total_cost = sum(r["net_cost"] for r in results)
total_returns = sum(r["returns"] for r in results)
total_pnl = sum(r["pnl"] for r in results)
print(f"\n=== MARKET RESULTS ===")
print(f"Wins: {len(wins)}, Losses: {len(losses)}, Open: {len(opens)}")
print(f"Win rate (resolved): {len(wins)/len(resolved)*100:.1f}%" if resolved else "N/A")
print(f"Total cost: ${total_cost:,.2f}")
print(f"Total returns: ${total_returns:,.2f}")
print(f"Total P&L: ${total_pnl:,.2f}")
# Top wins and losses
wins_sorted = sorted(wins, key=lambda x: x["pnl"], reverse=True)
losses_sorted = sorted(losses, key=lambda x: x["pnl"])
print(f"\n=== TOP 10 WINS ===")
for r in wins_sorted[:10]:
dt = datetime.fromtimestamp(r["first_trade"]).strftime("%Y-%m-%d") if r["first_trade"] else "?"
print(f" +${r['pnl']:>12,.2f} | {dt} | {r['title'][:60]}")
print(f"\n=== TOP 10 LOSSES ===")
for r in losses_sorted[:10]:
dt = datetime.fromtimestamp(r["first_trade"]).strftime("%Y-%m-%d") if r["first_trade"] else "?"
print(f" -${abs(r['pnl']):>12,.2f} | {dt} | {r['title'][:60]}")
# === COPY TRADE SIMULATION ===
print(f"\n=== COPY-TRADE SIMULATION ($10,000 bankroll) ===")
# Process all resolved markets chronologically
resolved_chrono = sorted(resolved, key=lambda x: x["first_trade"])
for scenario_name, slippage in [("Instant", 0), ("30min delay", 0.05), ("1hr delay", 0.10)]:
bankroll = 10000
peak = bankroll
max_dd = 0
max_dd_pct = 0
streak = 0
max_losing_streak = 0
trade_results = []
for r in resolved_chrono:
# Proportional sizing: his cost / his total capital * our bankroll
# Use 1% of bankroll per bet as conservative sizing
position_size = min(bankroll * 0.02, bankroll) # 2% per bet
if position_size <= 0:
continue
# Adjust entry price for slippage
entry_price = min(r["avg_price"] * (1 + slippage), 0.99)
if r["status"] == "WIN":
# Payout is $1 per share, cost was entry_price per share
shares = position_size / entry_price
payout = shares * 1.0
trade_pnl = payout - position_size
streak = 0
else:
trade_pnl = -position_size
streak += 1
max_losing_streak = max(max_losing_streak, streak)
bankroll += trade_pnl
peak = max(peak, bankroll)
dd = (peak - bankroll) / peak * 100
max_dd_pct = max(max_dd_pct, dd)
trade_results.append(trade_pnl)
total_trades = len(trade_results)
wins_count = sum(1 for t in trade_results if t > 0)
avg_win = sum(t for t in trade_results if t > 0) / wins_count if wins_count else 0
avg_loss = sum(t for t in trade_results if t <= 0) / (total_trades - wins_count) if (total_trades - wins_count) > 0 else 0
print(f"\n {scenario_name}:")
print(f" Final bankroll: ${bankroll:,.2f} ({(bankroll/10000-1)*100:+.1f}%)")
print(f" Trades: {total_trades}, Wins: {wins_count} ({wins_count/total_trades*100:.1f}%)")
print(f" Avg win: ${avg_win:,.2f}, Avg loss: ${avg_loss:,.2f}")
print(f" Max drawdown: {max_dd_pct:.1f}%")
print(f" Max losing streak: {max_losing_streak}")
# Also do proportional sizing (mirror his allocation %)
print(f"\n=== PROPORTIONAL COPY (mirror his sizing) ===")
his_total_capital = sum(r["net_cost"] for r in resolved_chrono if r["net_cost"] > 0)
for scenario_name, slippage in [("Instant", 0), ("30min delay", 0.05), ("1hr delay", 0.10)]:
bankroll = 10000
peak = bankroll
max_dd_pct = 0
streak = 0
max_losing_streak = 0
for r in resolved_chrono:
if r["net_cost"] <= 0:
continue
# Mirror his position weight
weight = r["net_cost"] / his_total_capital
position_size = bankroll * weight * 10 # scale up since weights are tiny with 400+ markets
position_size = min(position_size, bankroll * 0.25) # cap at 25% of bankroll
if position_size <= 0:
continue
entry_price = min(r["avg_price"] * (1 + slippage), 0.99)
if r["status"] == "WIN":
shares = position_size / entry_price
payout = shares * 1.0
trade_pnl = payout - position_size
streak = 0
else:
trade_pnl = -position_size
streak += 1
max_losing_streak = max(max_losing_streak, streak)
bankroll += trade_pnl
peak = max(peak, bankroll)
dd = (peak - bankroll) / peak * 100
max_dd_pct = max(max_dd_pct, dd)
print(f"\n {scenario_name}:")
print(f" Final bankroll: ${bankroll:,.2f} ({(bankroll/10000-1)*100:+.1f}%)")
print(f" Max drawdown: {max_dd_pct:.1f}%")
print(f" Max losing streak: {max_losing_streak}")
# Monthly breakdown
print(f"\n=== MONTHLY P&L (his actual) ===")
monthly = defaultdict(float)
for r in results:
if r["first_trade"]:
month = datetime.fromtimestamp(r["first_trade"]).strftime("%Y-%m")
monthly[month] += r["pnl"]
for month in sorted(monthly.keys()):
bar = "+" * int(monthly[month] / 50000) if monthly[month] > 0 else "-" * int(abs(monthly[month]) / 50000)
print(f" {month}: ${monthly[month]:>12,.2f} {bar}")