Files
workspace/projects/market-watch/game_engine.py

515 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""Multiplayer game engine for Market Watch paper trading."""
import json
import os
import uuid
from datetime import datetime, date
import yfinance as yf
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
GAMES_DIR = os.path.join(DATA_DIR, "games")
# Risk management constants
MAX_SECTOR_PCT = 0.30 # Max 30% in any single sector
MIN_CASH_PCT = 0.15 # Keep minimum 15% cash
def _load_json(path, default=None):
if os.path.exists(path):
with open(path) as f:
return json.load(f)
return default if default is not None else {}
def _save_json(path, data):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2, default=str)
def _game_dir(game_id):
return os.path.join(GAMES_DIR, game_id)
def _player_dir(game_id, username):
return os.path.join(_game_dir(game_id), "players", username)
def _portfolio_path(game_id, username):
return os.path.join(_player_dir(game_id, username), "portfolio.json")
def _trades_path(game_id, username):
return os.path.join(_player_dir(game_id, username), "trades.json")
def _snapshots_path(game_id, username):
return os.path.join(_player_dir(game_id, username), "snapshots.json")
def _game_config_path(game_id):
return os.path.join(_game_dir(game_id), "game.json")
def get_stock_sector(ticker):
"""Get sector information for a stock ticker. Returns sector name or None."""
try:
stock = yf.Ticker(ticker)
info = stock.info
return info.get("sector")
except Exception as e:
print(f" Warning: Could not get sector for {ticker}: {e}")
return None
def get_sector_allocation(game_id, username):
"""Get current sector allocation as percentage of total portfolio value."""
pf = _load_json(_portfolio_path(game_id, username))
if not pf or not pf["positions"]:
return {}
p = get_portfolio(game_id, username)
if not p:
return {}
sector_values = {}
total_invested = p["total_value"] - p["cash"]
for ticker, pos in p["positions"].items():
sector = get_stock_sector(ticker)
if sector:
sector_values[sector] = sector_values.get(sector, 0) + pos["market_value"]
# Convert to percentages of total portfolio
sector_pcts = {}
for sector, value in sector_values.items():
sector_pcts[sector] = (value / p["total_value"]) * 100 if p["total_value"] > 0 else 0
return sector_pcts
# ── Game Management ──
def create_game(name, starting_cash=100_000.0, end_date=None, creator="system"):
"""Create a new game. Returns game_id."""
game_id = str(uuid.uuid4())[:8]
config = {
"game_id": game_id,
"name": name,
"starting_cash": starting_cash,
"start_date": date.today().isoformat(),
"end_date": end_date,
"creator": creator,
"created_at": datetime.now().isoformat(),
"players": [],
"status": "active",
}
os.makedirs(_game_dir(game_id), exist_ok=True)
_save_json(_game_config_path(game_id), config)
return game_id
def list_games(active_only=True):
"""List all games."""
games = []
if not os.path.exists(GAMES_DIR):
return games
for gid in os.listdir(GAMES_DIR):
config_path = _game_config_path(gid)
if os.path.exists(config_path):
config = _load_json(config_path)
if active_only and config.get("status") != "active":
continue
games.append(config)
return sorted(games, key=lambda g: g.get("created_at", ""), reverse=True)
def get_game(game_id):
"""Get game config."""
return _load_json(_game_config_path(game_id))
def join_game(game_id, username):
"""Add a player to a game. Initializes their portfolio."""
config = get_game(game_id)
if not config:
return {"success": False, "error": "Game not found"}
if username in config["players"]:
return {"success": False, "error": f"{username} already in game"}
config["players"].append(username)
_save_json(_game_config_path(game_id), config)
# Initialize player portfolio
player_dir = _player_dir(game_id, username)
os.makedirs(player_dir, exist_ok=True)
_save_json(_portfolio_path(game_id, username), {
"cash": config["starting_cash"],
"positions": {},
})
_save_json(_trades_path(game_id, username), [])
_save_json(_snapshots_path(game_id, username), [])
return {"success": True, "game_id": game_id, "username": username, "starting_cash": config["starting_cash"]}
# ── Trading ──
def buy(game_id, username, ticker, shares, price, reason="Manual"):
"""Buy shares for a player in a game with sector diversification and cash reserve checks."""
pf = _load_json(_portfolio_path(game_id, username))
if not pf:
return {"success": False, "error": "Player portfolio not found"}
cost = shares * price
if cost > pf["cash"]:
return {"success": False, "error": f"Insufficient cash. Need ${cost:,.2f}, have ${pf['cash']:,.2f}"}
# Get current portfolio to check constraints
p = get_portfolio(game_id, username)
if not p:
return {"success": False, "error": "Could not get current portfolio"}
# Check minimum cash reserve (15%)
min_cash_required = p["total_value"] * MIN_CASH_PCT
cash_after_purchase = pf["cash"] - cost
if cash_after_purchase < min_cash_required:
return {"success": False, "error": f"Cash reserve violation. Need ${min_cash_required:,.2f} minimum cash, would have ${cash_after_purchase:,.2f}"}
# Check sector diversification (30% max per sector)
stock_sector = get_stock_sector(ticker)
if stock_sector:
current_sectors = get_sector_allocation(game_id, username)
current_sector_pct = current_sectors.get(stock_sector, 0)
# Calculate what sector percentage would be after this purchase
new_sector_value = current_sectors.get(stock_sector, 0) * p["total_value"] / 100 + cost
new_sector_pct = (new_sector_value / p["total_value"]) * 100
if new_sector_pct > MAX_SECTOR_PCT * 100:
return {"success": False, "error": f"Sector cap violation. {stock_sector} would be {new_sector_pct:.1f}%, max allowed is {MAX_SECTOR_PCT*100:.1f}%"}
pf["cash"] -= cost
if ticker in pf["positions"]:
pos = pf["positions"][ticker]
total_shares = pos["shares"] + shares
pos["avg_cost"] = ((pos["avg_cost"] * pos["shares"]) + cost) / total_shares
pos["shares"] = total_shares
pos["current_price"] = price
# Update trailing stop
new_stop = price * 0.90
if new_stop > pos.get("trailing_stop", 0):
pos["trailing_stop"] = new_stop
else:
pf["positions"][ticker] = {
"shares": shares,
"avg_cost": price,
"current_price": price,
"entry_date": datetime.now().isoformat(),
"entry_reason": reason,
"trailing_stop": price * 0.90,
"sector": stock_sector, # Store sector info
}
_save_json(_portfolio_path(game_id, username), pf)
# Log trade
trades = _load_json(_trades_path(game_id, username), [])
trades.append({
"action": "BUY",
"ticker": ticker,
"shares": shares,
"price": price,
"cost": round(cost, 2),
"reason": reason,
"timestamp": datetime.now().isoformat(),
})
_save_json(_trades_path(game_id, username), trades)
return {"success": True, "ticker": ticker, "shares": shares, "price": price, "cost": round(cost, 2), "cash_remaining": round(pf["cash"], 2)}
def sell(game_id, username, ticker, shares=None, price=None, reason="Manual"):
"""Sell shares for a player."""
pf = _load_json(_portfolio_path(game_id, username))
if not pf:
return {"success": False, "error": "Player portfolio not found"}
if ticker not in pf["positions"]:
return {"success": False, "error": f"No position in {ticker}"}
pos = pf["positions"][ticker]
if shares is None:
shares = pos["shares"]
if shares > pos["shares"]:
return {"success": False, "error": f"Only have {pos['shares']} shares of {ticker}"}
if price is None:
price = pos["current_price"]
proceeds = shares * price
pf["cash"] += proceeds
realized_pnl = (price - pos["avg_cost"]) * shares
if shares >= pos["shares"]:
del pf["positions"][ticker]
else:
pos["shares"] -= shares
_save_json(_portfolio_path(game_id, username), pf)
# Log trade
trades = _load_json(_trades_path(game_id, username), [])
trades.append({
"action": "SELL",
"ticker": ticker,
"shares": shares,
"price": price,
"proceeds": round(proceeds, 2),
"realized_pnl": round(realized_pnl, 2),
"entry_price": pos["avg_cost"],
"reason": reason,
"timestamp": datetime.now().isoformat(),
})
_save_json(_trades_path(game_id, username), trades)
return {"success": True, "ticker": ticker, "shares": shares, "price": price, "proceeds": round(proceeds, 2), "realized_pnl": round(realized_pnl, 2)}
def update_price(game_id, username, ticker, price):
"""Update current price for a position."""
pf = _load_json(_portfolio_path(game_id, username))
if pf and ticker in pf["positions"]:
pos = pf["positions"][ticker]
pos["current_price"] = price
new_stop = price * 0.90
if new_stop > pos.get("trailing_stop", 0):
pos["trailing_stop"] = new_stop
_save_json(_portfolio_path(game_id, username), pf)
def get_portfolio(game_id, username):
"""Get player's portfolio with unrealized P&L."""
pf = _load_json(_portfolio_path(game_id, username))
if not pf:
return None
game = get_game(game_id)
starting_cash = game.get("starting_cash", 100_000) if game else 100_000
total_value = pf["cash"]
positions_out = {}
for ticker, pos in pf["positions"].items():
unrealized_pnl = (pos["current_price"] - pos["avg_cost"]) * pos["shares"]
market_value = pos["current_price"] * pos["shares"]
total_value += market_value
positions_out[ticker] = {
**pos,
"unrealized_pnl": round(unrealized_pnl, 2),
"market_value": round(market_value, 2),
}
total_pnl = total_value - starting_cash
return {
"username": username,
"game_id": game_id,
"cash": round(pf["cash"], 2),
"positions": positions_out,
"total_value": round(total_value, 2),
"total_pnl": round(total_pnl, 2),
"pnl_pct": round(total_pnl / starting_cash * 100, 2),
"num_positions": len(positions_out),
}
def get_trades(game_id, username):
"""Get player's trade history."""
return _load_json(_trades_path(game_id, username), [])
def daily_snapshot(game_id, username):
"""Take daily snapshot for a player."""
p = get_portfolio(game_id, username)
if not p:
return None
snapshots = _load_json(_snapshots_path(game_id, username), [])
today = date.today().isoformat()
snapshots = [s for s in snapshots if s["date"] != today]
snapshots.append({
"date": today,
"total_value": p["total_value"],
"total_pnl": p["total_pnl"],
"pnl_pct": p["pnl_pct"],
"cash": p["cash"],
"num_positions": p["num_positions"],
})
_save_json(_snapshots_path(game_id, username), snapshots)
return snapshots[-1]
def get_snapshots(game_id, username):
"""Get player's daily snapshots."""
return _load_json(_snapshots_path(game_id, username), [])
def rebalance_portfolio(game_id, username, dry_run=True):
"""Rebalance portfolio to enforce sector caps and cash reserves.
Args:
game_id: Game ID
username: Username
dry_run: If True, return recommendations without executing trades
Returns:
Dict with rebalance actions and results
"""
p = get_portfolio(game_id, username)
if not p:
return {"success": False, "error": "Portfolio not found"}
actions = []
violations = []
# Check cash reserve
min_cash_required = p["total_value"] * MIN_CASH_PCT
cash_deficit = max(0, min_cash_required - p["cash"])
if cash_deficit > 0:
violations.append(f"Cash reserve deficit: ${cash_deficit:,.2f} (need ${min_cash_required:,.2f}, have ${p['cash']:,.2f})")
# Check sector allocations
sector_allocations = get_sector_allocation(game_id, username)
sector_violations = {}
for sector, pct in sector_allocations.items():
if pct > MAX_SECTOR_PCT * 100:
excess_pct = pct - (MAX_SECTOR_PCT * 100)
excess_value = (excess_pct / 100) * p["total_value"]
sector_violations[sector] = {
"current_pct": pct,
"excess_pct": excess_pct,
"excess_value": excess_value,
}
violations.append(f"{sector} sector: {pct:.1f}% (excess: {excess_pct:.1f}%, ${excess_value:,.2f})")
# Calculate total excess to sell
total_excess = cash_deficit + sum(v["excess_value"] for v in sector_violations.values())
if total_excess > 0:
# Find positions to sell (prioritize largest positions in violated sectors)
sell_candidates = []
for ticker, pos in p["positions"].items():
sector = pos.get("sector") or get_stock_sector(ticker)
if sector in sector_violations:
sell_candidates.append({
"ticker": ticker,
"sector": sector,
"market_value": pos["market_value"],
"shares": pos["shares"],
"current_price": pos["current_price"],
"priority": sector_violations[sector]["excess_pct"],
})
# Sort by priority (highest excess first) and market value
sell_candidates.sort(key=lambda x: (x["priority"], -x["market_value"]), reverse=True)
# Calculate sells needed
remaining_to_sell = total_excess
for candidate in sell_candidates:
if remaining_to_sell <= 0:
break
# Sell partial or full position
sell_value = min(remaining_to_sell, candidate["market_value"])
shares_to_sell = min(candidate["shares"], int(sell_value / candidate["current_price"]))
if shares_to_sell > 0:
action = {
"action": "SELL",
"ticker": candidate["ticker"],
"shares": shares_to_sell,
"price": candidate["current_price"],
"value": shares_to_sell * candidate["current_price"],
"reason": f"Rebalance: {candidate['sector']} sector cap violation",
}
actions.append(action)
remaining_to_sell -= action["value"]
# Execute if not dry run
if not dry_run:
result = sell(game_id, username, candidate["ticker"],
shares_to_sell, candidate["current_price"],
action["reason"])
action["result"] = result
return {
"success": True,
"dry_run": dry_run,
"violations": violations,
"actions": actions,
"total_excess": total_excess,
"sector_allocations": sector_allocations,
"cash_deficit": cash_deficit,
}
def get_leaderboard(game_id):
"""Get game leaderboard sorted by % return."""
game = get_game(game_id)
if not game:
return []
board = []
for username in game["players"]:
p = get_portfolio(game_id, username)
if p:
trades = get_trades(game_id, username)
num_trades = len([t for t in trades if t.get("action") == "SELL"])
board.append({
"username": username,
"total_value": p["total_value"],
"total_pnl": p["total_pnl"],
"pnl_pct": p["pnl_pct"],
"num_positions": p["num_positions"],
"num_trades": num_trades,
"cash": p["cash"],
})
return sorted(board, key=lambda x: x["pnl_pct"], reverse=True)
# ── Convenience: find default game ──
def get_default_game_id():
"""Get the first active game (usually 'GARP Challenge')."""
games = list_games(active_only=True)
if games:
return games[0]["game_id"]
return None
# ── Initialize default game ──
def ensure_default_game():
"""Create default GARP Challenge game with 'case' player if it doesn't exist."""
games = list_games(active_only=True)
for g in games:
if g["name"] == "GARP Challenge":
return g["game_id"]
game_id = create_game("GARP Challenge", starting_cash=100_000.0, creator="case")
join_game(game_id, "case")
return game_id
if __name__ == "__main__":
game_id = ensure_default_game()
game = get_game(game_id)
print(f"Game: {game['name']} ({game_id})")
print(f"Players: {game['players']}")
board = get_leaderboard(game_id)
for entry in board:
print(f" {entry['username']}: ${entry['total_value']:,.2f} ({entry['pnl_pct']:+.2f}%)")