#!/usr/bin/env python3 """Multiplayer game engine for Market Watch paper trading.""" import json import os import uuid from datetime import datetime, date DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") GAMES_DIR = os.path.join(DATA_DIR, "games") def _load_json(path, default=None): if os.path.exists(path): with open(path) as f: return json.load(f) return default if default is not None else {} def _save_json(path, data): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: json.dump(data, f, indent=2, default=str) def _game_dir(game_id): return os.path.join(GAMES_DIR, game_id) def _player_dir(game_id, username): return os.path.join(_game_dir(game_id), "players", username) def _portfolio_path(game_id, username): return os.path.join(_player_dir(game_id, username), "portfolio.json") def _trades_path(game_id, username): return os.path.join(_player_dir(game_id, username), "trades.json") def _snapshots_path(game_id, username): return os.path.join(_player_dir(game_id, username), "snapshots.json") def _game_config_path(game_id): return os.path.join(_game_dir(game_id), "game.json") # ── Game Management ── def create_game(name, starting_cash=100_000.0, end_date=None, creator="system"): """Create a new game. Returns game_id.""" game_id = str(uuid.uuid4())[:8] config = { "game_id": game_id, "name": name, "starting_cash": starting_cash, "start_date": date.today().isoformat(), "end_date": end_date, "creator": creator, "created_at": datetime.now().isoformat(), "players": [], "status": "active", } os.makedirs(_game_dir(game_id), exist_ok=True) _save_json(_game_config_path(game_id), config) return game_id def list_games(active_only=True): """List all games.""" games = [] if not os.path.exists(GAMES_DIR): return games for gid in os.listdir(GAMES_DIR): config_path = _game_config_path(gid) if os.path.exists(config_path): config = _load_json(config_path) if active_only and config.get("status") != "active": continue games.append(config) return sorted(games, key=lambda g: g.get("created_at", ""), reverse=True) def get_game(game_id): """Get game config.""" return _load_json(_game_config_path(game_id)) def join_game(game_id, username): """Add a player to a game. Initializes their portfolio.""" config = get_game(game_id) if not config: return {"success": False, "error": "Game not found"} if username in config["players"]: return {"success": False, "error": f"{username} already in game"} config["players"].append(username) _save_json(_game_config_path(game_id), config) # Initialize player portfolio player_dir = _player_dir(game_id, username) os.makedirs(player_dir, exist_ok=True) _save_json(_portfolio_path(game_id, username), { "cash": config["starting_cash"], "positions": {}, }) _save_json(_trades_path(game_id, username), []) _save_json(_snapshots_path(game_id, username), []) return {"success": True, "game_id": game_id, "username": username, "starting_cash": config["starting_cash"]} # ── Trading ── def buy(game_id, username, ticker, shares, price, reason="Manual"): """Buy shares for a player in a game.""" pf = _load_json(_portfolio_path(game_id, username)) if not pf: return {"success": False, "error": "Player portfolio not found"} cost = shares * price if cost > pf["cash"]: return {"success": False, "error": f"Insufficient cash. Need ${cost:,.2f}, have ${pf['cash']:,.2f}"} pf["cash"] -= cost if ticker in pf["positions"]: pos = pf["positions"][ticker] total_shares = pos["shares"] + shares pos["avg_cost"] = ((pos["avg_cost"] * pos["shares"]) + cost) / total_shares pos["shares"] = total_shares pos["current_price"] = price # Update trailing stop new_stop = price * 0.90 if new_stop > pos.get("trailing_stop", 0): pos["trailing_stop"] = new_stop else: pf["positions"][ticker] = { "shares": shares, "avg_cost": price, "current_price": price, "entry_date": datetime.now().isoformat(), "entry_reason": reason, "trailing_stop": price * 0.90, } _save_json(_portfolio_path(game_id, username), pf) # Log trade trades = _load_json(_trades_path(game_id, username), []) trades.append({ "action": "BUY", "ticker": ticker, "shares": shares, "price": price, "cost": round(cost, 2), "reason": reason, "timestamp": datetime.now().isoformat(), }) _save_json(_trades_path(game_id, username), trades) return {"success": True, "ticker": ticker, "shares": shares, "price": price, "cost": round(cost, 2), "cash_remaining": round(pf["cash"], 2)} def sell(game_id, username, ticker, shares=None, price=None, reason="Manual"): """Sell shares for a player.""" pf = _load_json(_portfolio_path(game_id, username)) if not pf: return {"success": False, "error": "Player portfolio not found"} if ticker not in pf["positions"]: return {"success": False, "error": f"No position in {ticker}"} pos = pf["positions"][ticker] if shares is None: shares = pos["shares"] if shares > pos["shares"]: return {"success": False, "error": f"Only have {pos['shares']} shares of {ticker}"} if price is None: price = pos["current_price"] proceeds = shares * price pf["cash"] += proceeds realized_pnl = (price - pos["avg_cost"]) * shares if shares >= pos["shares"]: del pf["positions"][ticker] else: pos["shares"] -= shares _save_json(_portfolio_path(game_id, username), pf) # Log trade trades = _load_json(_trades_path(game_id, username), []) trades.append({ "action": "SELL", "ticker": ticker, "shares": shares, "price": price, "proceeds": round(proceeds, 2), "realized_pnl": round(realized_pnl, 2), "entry_price": pos["avg_cost"], "reason": reason, "timestamp": datetime.now().isoformat(), }) _save_json(_trades_path(game_id, username), trades) return {"success": True, "ticker": ticker, "shares": shares, "price": price, "proceeds": round(proceeds, 2), "realized_pnl": round(realized_pnl, 2)} def update_price(game_id, username, ticker, price): """Update current price for a position.""" pf = _load_json(_portfolio_path(game_id, username)) if pf and ticker in pf["positions"]: pos = pf["positions"][ticker] pos["current_price"] = price new_stop = price * 0.90 if new_stop > pos.get("trailing_stop", 0): pos["trailing_stop"] = new_stop _save_json(_portfolio_path(game_id, username), pf) def get_portfolio(game_id, username): """Get player's portfolio with unrealized P&L.""" pf = _load_json(_portfolio_path(game_id, username)) if not pf: return None game = get_game(game_id) starting_cash = game.get("starting_cash", 100_000) if game else 100_000 total_value = pf["cash"] positions_out = {} for ticker, pos in pf["positions"].items(): unrealized_pnl = (pos["current_price"] - pos["avg_cost"]) * pos["shares"] market_value = pos["current_price"] * pos["shares"] total_value += market_value positions_out[ticker] = { **pos, "unrealized_pnl": round(unrealized_pnl, 2), "market_value": round(market_value, 2), } total_pnl = total_value - starting_cash return { "username": username, "game_id": game_id, "cash": round(pf["cash"], 2), "positions": positions_out, "total_value": round(total_value, 2), "total_pnl": round(total_pnl, 2), "pnl_pct": round(total_pnl / starting_cash * 100, 2), "num_positions": len(positions_out), } def get_trades(game_id, username): """Get player's trade history.""" return _load_json(_trades_path(game_id, username), []) def daily_snapshot(game_id, username): """Take daily snapshot for a player.""" p = get_portfolio(game_id, username) if not p: return None snapshots = _load_json(_snapshots_path(game_id, username), []) today = date.today().isoformat() snapshots = [s for s in snapshots if s["date"] != today] snapshots.append({ "date": today, "total_value": p["total_value"], "total_pnl": p["total_pnl"], "pnl_pct": p["pnl_pct"], "cash": p["cash"], "num_positions": p["num_positions"], }) _save_json(_snapshots_path(game_id, username), snapshots) return snapshots[-1] def get_snapshots(game_id, username): """Get player's daily snapshots.""" return _load_json(_snapshots_path(game_id, username), []) def get_leaderboard(game_id): """Get game leaderboard sorted by % return.""" game = get_game(game_id) if not game: return [] board = [] for username in game["players"]: p = get_portfolio(game_id, username) if p: trades = get_trades(game_id, username) num_trades = len([t for t in trades if t.get("action") == "SELL"]) board.append({ "username": username, "total_value": p["total_value"], "total_pnl": p["total_pnl"], "pnl_pct": p["pnl_pct"], "num_positions": p["num_positions"], "num_trades": num_trades, "cash": p["cash"], }) return sorted(board, key=lambda x: x["pnl_pct"], reverse=True) # ── Convenience: find default game ── def get_default_game_id(): """Get the first active game (usually 'GARP Challenge').""" games = list_games(active_only=True) if games: return games[0]["game_id"] return None # ── Initialize default game ── def ensure_default_game(): """Create default GARP Challenge game with 'case' player if it doesn't exist.""" games = list_games(active_only=True) for g in games: if g["name"] == "GARP Challenge": return g["game_id"] game_id = create_game("GARP Challenge", starting_cash=100_000.0, creator="case") join_game(game_id, "case") return game_id if __name__ == "__main__": game_id = ensure_default_game() game = get_game(game_id) print(f"Game: {game['name']} ({game_id})") print(f"Players: {game['players']}") board = get_leaderboard(game_id) for entry in board: print(f" {entry['username']}: ${entry['total_value']:,.2f} ({entry['pnl_pct']:+.2f}%)")