#!/usr/bin/env python3 """ Crypto Market Watch - Paper Trading Game Engine Scans top 150 cryptos using VWAP + RSI + volume analysis. Makes autonomous paper trades with full tracking. """ import json import time import urllib.request from datetime import datetime, timezone from pathlib import Path from threading import Lock DATA_DIR = Path(__file__).parent / "data" DATA_DIR.mkdir(parents=True, exist_ok=True) PORTFOLIO_FILE = DATA_DIR / "portfolio.json" TRADES_FILE = DATA_DIR / "trades.json" SNAPSHOTS_FILE = DATA_DIR / "snapshots.json" WATCHLIST_FILE = DATA_DIR / "watchlist.json" def _load(path, default=None): if path.exists(): return json.loads(path.read_text()) return default if default is not None else {} def _save(path, data): path.write_text(json.dumps(data, indent=2)) # ── Portfolio Management ── def get_portfolio(): default = { "cash": 100000.0, "positions": {}, "starting_balance": 100000.0, "created_at": datetime.now(timezone.utc).isoformat(), } return _load(PORTFOLIO_FILE, default) def save_portfolio(portfolio): _save(PORTFOLIO_FILE, portfolio) def get_trades(): return _load(TRADES_FILE, []) def save_trades(trades): _save(TRADES_FILE, trades) def buy(symbol, price, amount_usd, reason=""): """Buy a crypto position.""" portfolio = get_portfolio() trades = get_trades() if amount_usd > portfolio["cash"]: return None, "Insufficient cash" qty = amount_usd / price portfolio["cash"] -= amount_usd pos = portfolio["positions"].get(symbol, { "qty": 0, "avg_price": 0, "total_cost": 0 }) new_total_cost = pos["total_cost"] + amount_usd new_qty = pos["qty"] + qty pos["avg_price"] = new_total_cost / new_qty if new_qty > 0 else 0 pos["qty"] = new_qty pos["total_cost"] = new_total_cost pos["last_buy"] = datetime.now(timezone.utc).isoformat() portfolio["positions"][symbol] = pos save_portfolio(portfolio) trade = { "id": len(trades) + 1, "timestamp": datetime.now(timezone.utc).isoformat(), "action": "BUY", "symbol": symbol, "price": price, "qty": qty, "amount_usd": amount_usd, "reason": reason, } trades.append(trade) save_trades(trades) return trade, None def sell(symbol, price, pct=100, reason=""): """Sell a position (partial or full).""" portfolio = get_portfolio() trades = get_trades() pos = portfolio["positions"].get(symbol) if not pos or pos["qty"] <= 0: return None, f"No position in {symbol}" sell_qty = pos["qty"] * (pct / 100) sell_value = sell_qty * price cost_basis = pos["avg_price"] * sell_qty pnl = sell_value - cost_basis pnl_pct = (pnl / cost_basis) * 100 if cost_basis > 0 else 0 portfolio["cash"] += sell_value pos["qty"] -= sell_qty pos["total_cost"] -= cost_basis if pos["qty"] < 0.0000001: del portfolio["positions"][symbol] else: portfolio["positions"][symbol] = pos save_portfolio(portfolio) trade = { "id": len(trades) + 1, "timestamp": datetime.now(timezone.utc).isoformat(), "action": "SELL", "symbol": symbol, "price": price, "qty": sell_qty, "amount_usd": sell_value, "pnl": round(pnl, 2), "pnl_pct": round(pnl_pct, 2), "reason": reason, } trades.append(trade) save_trades(trades) return trade, None def get_portfolio_value(prices): """Calculate total portfolio value.""" portfolio = get_portfolio() positions_value = 0 position_details = [] for symbol, pos in portfolio["positions"].items(): current_price = prices.get(symbol, pos["avg_price"]) value = pos["qty"] * current_price pnl = value - pos["total_cost"] pnl_pct = (pnl / pos["total_cost"]) * 100 if pos["total_cost"] > 0 else 0 positions_value += value position_details.append({ "symbol": symbol, "qty": pos["qty"], "avg_price": pos["avg_price"], "current_price": current_price, "value": round(value, 2), "pnl": round(pnl, 2), "pnl_pct": round(pnl_pct, 2), }) total_value = portfolio["cash"] + positions_value total_pnl = total_value - portfolio["starting_balance"] total_pnl_pct = (total_pnl / portfolio["starting_balance"]) * 100 return { "cash": round(portfolio["cash"], 2), "positions_value": round(positions_value, 2), "total_value": round(total_value, 2), "total_pnl": round(total_pnl, 2), "total_pnl_pct": round(total_pnl_pct, 2), "positions": sorted(position_details, key=lambda x: -x["value"]), "num_positions": len(position_details), } def take_snapshot(prices): """Save a point-in-time portfolio snapshot.""" snapshots = _load(SNAPSHOTS_FILE, []) value = get_portfolio_value(prices) value["timestamp"] = datetime.now(timezone.utc).isoformat() snapshots.append(value) # Keep last 1000 _save(SNAPSHOTS_FILE, snapshots[-1000:]) return value # ── Market Data ── def get_top_coins(limit=150): """Fetch top coins by market cap from CoinGecko.""" coins = [] for page in range(1, (limit // 100) + 2): per_page = min(100, limit - len(coins)) if per_page <= 0: break url = ( f"https://api.coingecko.com/api/v3/coins/markets?" f"vs_currency=usd&order=market_cap_desc&per_page={per_page}&page={page}" f"&sparkline=false&price_change_percentage=1h,24h,7d" ) req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) try: resp = urllib.request.urlopen(req, timeout=15) batch = json.loads(resp.read()) coins.extend(batch) except Exception as e: print(f"Error fetching page {page}: {e}") break time.sleep(1) # Rate limit return coins def get_ohlcv(coin_id, days=2): """Get OHLCV data from CoinGecko for VWAP calculation.""" url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/ohlc?vs_currency=usd&days={days}" req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) try: resp = urllib.request.urlopen(req, timeout=10) raw = json.loads(resp.read()) # CoinGecko OHLC: [timestamp, open, high, low, close] return [{"t": r[0], "o": r[1], "h": r[2], "l": r[3], "c": r[4]} for r in raw] except: return [] def get_binance_klines(symbol, interval="1h", limit=48): """Get klines from Binance US for VWAP + volume.""" url = f"https://api.binance.us/api/v3/klines?symbol={symbol}USDT&interval={interval}&limit={limit}" req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) try: resp = urllib.request.urlopen(req, timeout=10) raw = json.loads(resp.read()) return [{"t": k[0], "o": float(k[1]), "h": float(k[2]), "l": float(k[3]), "c": float(k[4]), "v": float(k[5])} for k in raw] except: return [] # ── Technical Analysis ── def calc_vwap(candles): """Calculate VWAP with standard deviation bands.""" if not candles: return None cum_tpv = cum_vol = cum_sq = 0 for c in candles: vol = c.get("v", 1) # Default volume 1 if not available tp = (c["h"] + c["l"] + c["c"]) / 3 cum_tpv += tp * vol cum_vol += vol vwap = cum_tpv / cum_vol if cum_vol > 0 else candles[-1]["c"] # Standard deviation for c in candles: vol = c.get("v", 1) tp = (c["h"] + c["l"] + c["c"]) / 3 cum_sq += vol * (tp - vwap) ** 2 variance = cum_sq / cum_vol if cum_vol > 0 else 0 std = variance ** 0.5 return { "vwap": vwap, "std": std, "upper1": vwap + std, "lower1": vwap - std, "upper2": vwap + 2 * std, "lower2": vwap - 2 * std, } def calc_rsi(closes, period=14): """Calculate RSI.""" if len(closes) < period + 1: return 50 # Default neutral gains = [] losses = [] for i in range(1, len(closes)): diff = closes[i] - closes[i - 1] gains.append(max(diff, 0)) losses.append(max(-diff, 0)) avg_gain = sum(gains[-period:]) / period avg_loss = sum(losses[-period:]) / period if avg_loss == 0: return 100 rs = avg_gain / avg_loss return 100 - (100 / (1 + rs)) def analyze_coin(coin, klines=None): """Full VWAP + RSI + momentum analysis for a single coin.""" symbol = coin["symbol"].upper() price = coin["current_price"] if not klines: klines = get_binance_klines(symbol, "1h", 48) if not klines or len(klines) < 10: return None # VWAP vwap_data = calc_vwap(klines) vwap = vwap_data["vwap"] vwap_diff = (price - vwap) / vwap * 100 # RSI closes = [k["c"] for k in klines] rsi = calc_rsi(closes) # Volume trend recent_vol = sum(k["v"] for k in klines[-6:]) # Last 6h avg_vol = sum(k["v"] for k in klines) / (len(klines) / 6) vol_ratio = recent_vol / avg_vol if avg_vol > 0 else 1 # Momentum (last 4h vs prior 4h) if len(klines) >= 8: recent_mom = (klines[-1]["c"] - klines[-4]["c"]) / klines[-4]["c"] * 100 prior_mom = (klines[-4]["c"] - klines[-8]["c"]) / klines[-8]["c"] * 100 else: recent_mom = prior_mom = 0 # 24h change from CoinGecko change_24h = coin.get("price_change_percentage_24h", 0) or 0 # Score: -100 to +100 score = 0 signals = [] # VWAP position if vwap_diff < -2: score += 30 signals.append(f"Deep below VWAP ({vwap_diff:.1f}%)") elif vwap_diff < -0.5: score += 15 signals.append(f"Below VWAP ({vwap_diff:.1f}%)") elif vwap_diff > 2: score -= 20 signals.append(f"Extended above VWAP ({vwap_diff:.1f}%)") elif vwap_diff > 0.5: score += 5 signals.append(f"Above VWAP ({vwap_diff:.1f}%)") # RSI if rsi < 30: score += 30 signals.append(f"RSI oversold ({rsi:.0f})") elif rsi < 40: score += 15 signals.append(f"RSI low ({rsi:.0f})") elif rsi > 70: score -= 25 signals.append(f"RSI overbought ({rsi:.0f})") elif rsi > 60: score -= 10 signals.append(f"RSI elevated ({rsi:.0f})") # Volume confirmation if vol_ratio > 1.5: score += 10 if vwap_diff < 0 else -10 # High vol at support = bullish signals.append(f"High volume ({vol_ratio:.1f}x)") elif vol_ratio < 0.5: score -= 5 signals.append(f"Low volume ({vol_ratio:.1f}x)") # Momentum reversal if recent_mom > 0 and prior_mom < 0: score += 15 signals.append("Momentum reversal (bullish)") elif recent_mom < 0 and prior_mom > 0: score -= 15 signals.append("Momentum reversal (bearish)") # 24h trend if change_24h < -5: score += 10 # Potential bounce signals.append(f"24h dump ({change_24h:.1f}%)") elif change_24h > 5: score -= 10 # Potential pullback signals.append(f"24h pump ({change_24h:.1f}%)") return { "symbol": symbol, "name": coin["name"], "price": price, "market_cap_rank": coin.get("market_cap_rank", 0), "vwap": round(vwap, 6), "vwap_diff_pct": round(vwap_diff, 2), "rsi": round(rsi, 1), "vol_ratio": round(vol_ratio, 2), "momentum_4h": round(recent_mom, 2), "change_24h": round(change_24h, 2), "score": score, "signals": signals, "vwap_bands": {k: round(v, 6) for k, v in vwap_data.items()}, } # ── Trading Logic ── def should_buy(analysis): """Determine if we should buy based on analysis score.""" if not analysis: return False, "" score = analysis["score"] rsi = analysis["rsi"] vwap_diff = analysis["vwap_diff_pct"] # Strong buy: oversold + below VWAP if score >= 40: return True, f"Strong buy signal (score {score}): {', '.join(analysis['signals'])}" # Moderate buy: decent score + below VWAP if score >= 20 and vwap_diff < -0.5: return True, f"Buy signal (score {score}): {', '.join(analysis['signals'])}" return False, "" def should_sell(analysis, position): """Determine if we should sell a position.""" if not analysis: return False, 0, "" price = analysis["price"] avg_price = position["avg_price"] pnl_pct = (price - avg_price) / avg_price * 100 rsi = analysis["rsi"] vwap_diff = analysis["vwap_diff_pct"] score = analysis["score"] # Take profit: +5% gain with overbought signals if pnl_pct >= 5 and (rsi > 65 or vwap_diff > 1): return True, 100, f"Take profit ({pnl_pct:.1f}%, RSI {rsi:.0f})" # Strong take profit: +10% gain if pnl_pct >= 10: return True, 50, f"Partial take profit ({pnl_pct:.1f}%)" # Stop loss: -8% if pnl_pct <= -8: return True, 100, f"Stop loss ({pnl_pct:.1f}%)" # Bearish signals on losing position if pnl_pct < -3 and score <= -20: return True, 100, f"Cut loss (score {score}, PnL {pnl_pct:.1f}%)" # Extended above VWAP with big gain if pnl_pct >= 3 and vwap_diff > 2: return True, 50, f"Extended above VWAP ({vwap_diff:.1f}%), lock gains" return False, 0, "" # ── Main Scanner ── def run_scan(max_positions=10, position_size=5000): """ Full scan: analyze top 150 cryptos, make buy/sell decisions. max_positions: max simultaneous positions position_size: USD per position """ print(f"=== Crypto Market Watch Scan ===") print(f"Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") print() # Get top coins print("Fetching top 150 coins...", flush=True) coins = get_top_coins(150) # Deduplicate by symbol (CoinGecko sometimes returns dupes) seen_symbols = set() unique_coins = [] for c in coins: sym = c["symbol"].upper() if sym not in seen_symbols: seen_symbols.add(sym) unique_coins.append(c) coins = unique_coins print(f"Got {len(coins)} unique coins") # Map symbols to Binance format # Only analyze coins available on Binance US binance_symbols = set() try: url = "https://api.binance.us/api/v3/exchangeInfo" req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = urllib.request.urlopen(req, timeout=15) exchange = json.loads(resp.read()) for s in exchange["symbols"]: if s["quoteAsset"] == "USDT" and s["status"] == "TRADING": binance_symbols.add(s["baseAsset"]) except: pass print(f"Binance US has {len(binance_symbols)} USDT pairs") # Analyze each coin analyses = [] prices = {} for coin in coins: symbol = coin["symbol"].upper() prices[symbol] = coin["current_price"] if symbol not in binance_symbols: continue analysis = analyze_coin(coin) if analysis: analyses.append(analysis) time.sleep(0.2) # Rate limit print(f"Analyzed {len(analyses)} coins with Binance data") # Get current portfolio portfolio = get_portfolio() current_positions = set(portfolio["positions"].keys()) # ── SELL DECISIONS ── sells = [] for symbol, pos in list(portfolio["positions"].items()): analysis = next((a for a in analyses if a["symbol"] == symbol), None) if not analysis: # Can't analyze — skip (don't sell blind) continue do_sell, sell_pct, reason = should_sell(analysis, pos) if do_sell: trade, err = sell(symbol, analysis["price"], sell_pct, reason) if trade: sells.append(trade) print(f" 📤 SELL {symbol} @ ${analysis['price']:,.4f} ({sell_pct}%) — {reason}") # ── BUY DECISIONS ── portfolio = get_portfolio() # Refresh after sells num_positions = len(portfolio["positions"]) available_slots = max_positions - num_positions # Sort by score (best opportunities first) buy_candidates = sorted( [a for a in analyses if a["symbol"] not in portfolio["positions"]], key=lambda x: -x["score"] ) buys = [] for analysis in buy_candidates[:available_slots * 2]: # Check 2x candidates if len(buys) >= available_slots: break if portfolio["cash"] < position_size: break do_buy, reason = should_buy(analysis) if do_buy: trade, err = buy(analysis["symbol"], analysis["price"], position_size, reason) if trade: buys.append(trade) portfolio = get_portfolio() # Refresh print(f" 📥 BUY {analysis['symbol']} @ ${analysis['price']:,.4f} — {reason}") # Take snapshot snapshot = take_snapshot(prices) # Summary print(f"\n{'='*50}") print(f"Buys: {len(buys)} | Sells: {len(sells)}") print(f"Portfolio: ${snapshot['total_value']:,.2f} ({snapshot['total_pnl_pct']:+.2f}%)") print(f"Cash: ${snapshot['cash']:,.2f} | Positions: {snapshot['num_positions']}") if snapshot["positions"]: print(f"\nOpen Positions:") for p in snapshot["positions"]: emoji = "🟢" if p["pnl"] >= 0 else "🔴" print(f" {emoji} {p['symbol']}: ${p['value']:,.2f} ({p['pnl_pct']:+.1f}%)") # Top opportunities not taken print(f"\nTop Scoring Coins:") for a in sorted(analyses, key=lambda x: -x["score"])[:10]: held = "📌" if a["symbol"] in portfolio.get("positions", {}) else " " print(f" {held} {a['symbol']:<8} score:{a['score']:>4} | RSI:{a['rsi']:.0f} | VWAP:{a['vwap_diff_pct']:+.1f}% | 24h:{a['change_24h']:+.1f}%") return {"buys": buys, "sells": sells, "snapshot": snapshot, "analyses": analyses} if __name__ == "__main__": result = run_scan()