#!/usr/bin/env python3 """ CoinEx Live Futures Trader - ENHANCED SAFETY VERSION Real money trading based on the paper trading strategy. Extremely careful implementation with kill switches and safety measures. FUTURES ONLY — NEVER TOUCH SPOT """ import json import os import sys import time import hashlib import hmac import requests import argparse from datetime import datetime, timezone from pathlib import Path # Add parent to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from scripts.short_scanner import scan_coin, COINS as SHORT_COINS # Load CoinEx credentials CREDENTIALS_FILE = Path.home() / ".openclaw/workspace/.credentials/coinex.env" def load_coinex_credentials(): """Load CoinEx API credentials from environment file.""" if not CREDENTIALS_FILE.exists(): raise RuntimeError(f"Credentials file not found: {CREDENTIALS_FILE}") credentials = {} with open(CREDENTIALS_FILE) as f: for line in f: line = line.strip() if line and '=' in line and not line.startswith('#'): key, value = line.split('=', 1) credentials[key] = value if 'COINEX_ACCESS_ID' not in credentials or 'COINEX_SECRET_KEY' not in credentials: raise RuntimeError("Missing COINEX_ACCESS_ID or COINEX_SECRET_KEY in credentials file") return credentials['COINEX_ACCESS_ID'], credentials['COINEX_SECRET_KEY'] # Configuration TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", os.environ.get("BOT_TOKEN", "")) TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", os.environ.get("CHAT_ID", "6443752046")) # CoinEx API settings COINEX_BASE_URL = "https://api.coinex.com" # Data directory DATA_DIR = Path(__file__).parent.parent / "data" / "coinex-live" STATE_FILE = DATA_DIR / "trader_state.json" TRADES_LOG = DATA_DIR / "trades.log" ERROR_LOG = DATA_DIR / "errors.log" LOCKFILE = DATA_DIR / "live-trader-lock.json" CONFIG_FILE = DATA_DIR / "trader_config.json" # KILL SWITCH SETTINGS STARTING_BALANCE_KEY = "starting_balance" RETRY_ATTEMPTS = 3 RETRY_DELAY = 2.0 # Global state for circuit breaker consecutive_failures = 0 def setup_logging(): """Ensure log files exist and directories are created.""" DATA_DIR.mkdir(parents=True, exist_ok=True) for log_file in [TRADES_LOG, ERROR_LOG]: if not log_file.exists(): log_file.write_text("") def log_trade(message, is_dry_run=False): """Log trade information to trades.log with timestamp.""" prefix = "[DRY-RUN]" if is_dry_run else "[LIVE]" timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') log_entry = f"[{timestamp}] {prefix} {message}\n" print(f"[TRADE] {prefix} {message}") with open(TRADES_LOG, 'a') as f: f.write(log_entry) def log_error(message, is_dry_run=False): """Log error information to errors.log with timestamp.""" prefix = "[DRY-RUN]" if is_dry_run else "[LIVE]" timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') log_entry = f"[{timestamp}] {prefix} ERROR: {message}\n" print(f"[ERROR] {prefix} {message}") with open(ERROR_LOG, 'a') as f: f.write(log_entry) def load_config(): """Load trader configuration from file with fallback to defaults.""" default_config = { "mode": "paused", "position_size_pct": 5.0, "max_positions": 3, "max_leverage": 10, "kill_switch_drawdown_pct": 50.0, "tp_pct": 5.0, "sl_pct": -3.0, "trailing_stop_pct": 2.0, "circuit_breaker_threshold": 3, "scan_interval_minutes": 5, "long_threshold": 45, "short_threshold": 50, "coin_blacklist": [], "coin_whitelist": [] } if CONFIG_FILE.exists(): try: config = json.loads(CONFIG_FILE.read_text()) # Merge with defaults to handle missing keys for key, default_value in default_config.items(): if key not in config: config[key] = default_value return config except Exception as e: log_error(f"Failed to load config, using defaults: {e}") return default_config else: # Create default config file try: CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) CONFIG_FILE.write_text(json.dumps(default_config, indent=2)) log_trade("Created default trader_config.json") except Exception as e: log_error(f"Failed to create default config: {e}") return default_config def load_state(): """Load trader state from file.""" if STATE_FILE.exists(): try: return json.loads(STATE_FILE.read_text()) except Exception as e: log_error(f"Failed to load state: {e}") return {"peak_pnl": {}, "last_alert": None} return {"peak_pnl": {}, "last_alert": None} def save_state(state): """Save trader state to file.""" try: STATE_FILE.parent.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(json.dumps(state, indent=2)) except Exception as e: log_error(f"Failed to save state: {e}") def create_lockfile(reason, last_equity=None, positions_at_halt=None): """Create lockfile to prevent restart after kill switch or circuit breaker.""" lockfile_data = { "reason": reason, "timestamp": datetime.now(timezone.utc).isoformat(), "last_equity": last_equity, "positions_at_halt": positions_at_halt or [] } try: LOCKFILE.parent.mkdir(parents=True, exist_ok=True) LOCKFILE.write_text(json.dumps(lockfile_data, indent=2)) log_error(f"LOCKFILE CREATED: {reason}") except Exception as e: log_error(f"Failed to create lockfile: {e}") def check_lockfile(): """Check if lockfile exists and refuse to run if it does.""" if LOCKFILE.exists(): try: lockfile_data = json.loads(LOCKFILE.read_text()) reason = lockfile_data.get("reason", "Unknown") timestamp = lockfile_data.get("timestamp", "Unknown") print(f"🚨 LOCKFILE EXISTS - Bot was halted due to: {reason}") print(f"Halt time: {timestamp}") print(f"To restart, manually delete: {LOCKFILE}") print("⚠️ Only do this if you've verified the issue is resolved!") return True except Exception as e: print(f"Error reading lockfile: {e}") return True return False def send_telegram(message): """Send Telegram alert.""" if not TELEGRAM_BOT_TOKEN: print(f"[TG] {message}") return try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" response = requests.post(url, json={ "chat_id": TELEGRAM_CHAT_ID, "text": message, "parse_mode": "HTML" }, timeout=10) response.raise_for_status() except Exception as e: log_error(f"Telegram failed: {e}") def increment_failure_count(max_failures=3): """Increment consecutive API failure count and check circuit breaker.""" global consecutive_failures consecutive_failures += 1 if consecutive_failures >= max_failures: reason = f"Circuit breaker triggered - {consecutive_failures} consecutive API failures" create_lockfile(reason) send_telegram(f"🚨 CIRCUIT BREAKER TRIGGERED\n{reason}\nBot halted - manual intervention required") log_error(reason) sys.exit(1) def reset_failure_count(): """Reset consecutive failure count after successful API call.""" global consecutive_failures consecutive_failures = 0 class CoinExAPI: """CoinEx API client with authentication and safety measures.""" def __init__(self, access_id, secret_key): self.access_id = access_id self.secret_key = secret_key self.base_url = COINEX_BASE_URL def _sign_request(self, method, path, params=None, json_data=None): """Generate CoinEx API signature.""" timestamp = str(int(time.time() * 1000)) if method == "GET" and params: query_string = "&".join([f"{k}={v}" for k, v in sorted(params.items())]) prepared = f"{method}{path}?{query_string}{timestamp}" elif method == "POST" and json_data: import json as json_mod body = json_mod.dumps(json_data, separators=(',', ':')) prepared = f"{method}{path}{body}{timestamp}" else: prepared = f"{method}{path}{timestamp}" sign = hmac.new( self.secret_key.encode('latin-1'), prepared.encode('latin-1'), hashlib.sha256 ).hexdigest().lower() return { "X-COINEX-KEY": self.access_id, "X-COINEX-SIGN": sign, "X-COINEX-TIMESTAMP": timestamp, "Content-Type": "application/json" } def _request(self, method, path, params=None, json_data=None): """Make authenticated request to CoinEx API.""" try: headers = self._sign_request(method, path, params, json_data) url = f"{self.base_url}{path}" if method == "GET": response = requests.get(url, headers=headers, params=params, timeout=30) elif method == "POST": import json as json_mod response = requests.post(url, headers=headers, data=json_mod.dumps(json_data, separators=(',', ':')), timeout=30) else: raise ValueError(f"Unsupported method: {method}") response.raise_for_status() result = response.json() if result.get("code") != 0: raise RuntimeError(f"CoinEx API error: {result.get('message', 'Unknown error')}") # Success - reset failure count reset_failure_count() return result.get("data", {}) except Exception as e: log_error(f"CoinEx API request failed: {method} {path} - {e}") increment_failure_count() raise def get_futures_balance(self): """Get futures account balance.""" return self._request("GET", "/v2/assets/futures/balance") def get_market_price(self, market): """Get current market price for a symbol.""" return self._request("GET", "/v2/futures/market", {"market": market}) def get_positions(self): """Get all open futures positions. Returns None on failure (never empty list).""" try: result = self._request("GET", "/v2/futures/pending-position", params={"market_type": "futures"}) return result if result is not None else [] except Exception as e: log_error(f"CRITICAL: Positions endpoint failed: {e}") # SAFETY FIX #2: Return None on failure, never assume empty positions return None def place_order(self, market, side, type_order, amount, price=None, leverage=None): """Place a futures order with verification.""" data = { "market": market, "market_type": "FUTURES", "side": side, # "buy" or "sell" "type": type_order, # "market" or "limit" "amount": str(amount) } if price: data["price"] = str(price) result = self._request("POST", "/v2/futures/order", json_data=data) # SAFETY FIX #6: Verify order was accepted if not result or "order_id" not in result: raise RuntimeError(f"Order placement failed - no order ID returned: {result}") order_id = result["order_id"] log_trade(f"Order placed successfully - ID: {order_id}") return result def set_leverage(self, market, leverage): """Set leverage for a market and verify it was set correctly.""" data = { "market": market, "market_type": "FUTURES", "margin_mode": "cross", "leverage": leverage } result = self._request("POST", "/v2/futures/adjust-position-leverage", json_data=data) # SAFETY FIX #7: Verify leverage was set correctly if result: actual_leverage = result.get("leverage", 0) if actual_leverage and abs(int(actual_leverage) - leverage) > 0: raise RuntimeError(f"Leverage verification failed - requested {leverage}x, got {actual_leverage}x") log_trade(f"Leverage verified: {market} set to {leverage}x") return result def cancel_order(self, market, order_id): """Cancel a futures order.""" data = { "market": market, "market_type": "FUTURES", "order_id": order_id } return self._request("POST", "/v2/futures/cancel-order", json_data=data) def close_position_with_retry(api, market, side, amount, is_dry_run=False): """Close position with retry logic for emergency situations.""" if is_dry_run: log_trade(f"Would close position {market} {side} amount {amount}", is_dry_run=True) return True # SAFETY FIX #3: Retry logic for position closes for attempt in range(RETRY_ATTEMPTS): try: result = api.place_order(market, side, "market", amount) log_trade(f"Position closed successfully: {market} (attempt {attempt + 1})") return True except Exception as e: log_error(f"Failed to close position {market} attempt {attempt + 1}: {e}") if attempt < RETRY_ATTEMPTS - 1: time.sleep(RETRY_DELAY) else: # All retries exhausted - send emergency alert emergency_msg = f"EMERGENCY: Failed to close position {market} after {RETRY_ATTEMPTS} retries — MANUAL INTERVENTION REQUIRED" send_telegram(f"🚨 {emergency_msg}") log_error(emergency_msg) return False return False def check_kill_switch(api, state, config, is_dry_run=False): """Check if kill switch should be triggered due to drawdown.""" try: balance_response = api.get_futures_balance() # SAFETY FIX #1: Kill switch bypass protection if not balance_response: reason = "Balance API returned empty response - STOPPING for safety" log_error(reason) create_lockfile(reason) send_telegram(f"🚨 KILL SWITCH - API FAILURE\n{reason}") return True # Handle different response formats current_equity = 0 if isinstance(balance_response, list): if balance_response: balance_data = balance_response[0] current_equity = float(balance_data.get("available", 0)) + float(balance_data.get("frozen", 0)) else: reason = "Balance API returned empty list - STOPPING for safety" log_error(reason) create_lockfile(reason) send_telegram(f"🚨 KILL SWITCH - EMPTY BALANCE\n{reason}") return True elif isinstance(balance_response, dict): current_equity = float(balance_response.get("available", 0)) + float(balance_response.get("frozen", 0)) else: reason = f"Balance API returned unexpected format: {type(balance_response)} - STOPPING for safety" log_error(reason) create_lockfile(reason) send_telegram(f"🚨 KILL SWITCH - INVALID BALANCE FORMAT\n{reason}") return True # SAFETY FIX #5: Zero equity is a kill-switch event if current_equity <= 0: reason = f"Equity is zero or negative (${current_equity}) - KILL SWITCH TRIGGERED" log_error(reason) create_lockfile(reason, current_equity) send_telegram(f"🚨 KILL SWITCH - ZERO EQUITY\n{reason}") return True # Initialize starting balance if not set if STARTING_BALANCE_KEY not in state: state[STARTING_BALANCE_KEY] = current_equity save_state(state) log_trade(f"Starting balance recorded: ${current_equity:.2f}", is_dry_run) return False starting_balance = state[STARTING_BALANCE_KEY] drawdown = (starting_balance - current_equity) / starting_balance log_trade(f"Balance check - Current: ${current_equity:.2f}, Starting: ${starting_balance:.2f}, Drawdown: {drawdown*100:.2f}%", is_dry_run) kill_switch_threshold = config.get("kill_switch_drawdown_pct", 50.0) / 100.0 if drawdown >= kill_switch_threshold: # KILL SWITCH TRIGGERED msg = (f"🚨 KILL SWITCH TRIGGERED 🚨\n" f"Account hit {drawdown*100:.1f}% drawdown\n" f"Starting: ${starting_balance:.2f} → Current: ${current_equity:.2f}\n" f"Bot stopped, attempting to close all positions") send_telegram(msg) log_error(f"KILL SWITCH: {drawdown*100:.1f}% drawdown") # Try to close all positions with retry logic positions_at_halt = [] try: positions = api.get_positions() if positions: # Only process if we got valid data for pos in positions: market = pos.get("market") if market: # Record position for lockfile positions_at_halt.append({ "market": market, "side": pos.get("side"), "amount": pos.get("amount"), "pnl": pos.get("unrealized_pnl", 0) }) # Close with retry logic side = "buy" if pos.get("side") == "sell" else "sell" amount = abs(float(pos.get("close_avbl", pos.get("open_interest", pos.get("amount", 0))))) if amount > 0: success = close_position_with_retry(api, market, side, amount, is_dry_run) if success: log_trade(f"Kill switch: closed position {market}", is_dry_run) except Exception as e: log_error(f"Failed to close positions during kill switch: {e}") # Create lockfile create_lockfile(f"Kill switch - {drawdown*100:.1f}% drawdown", current_equity, positions_at_halt) return True return False except Exception as e: # SAFETY FIX #1: API failure = STOP, don't continue reason = f"Kill switch check failed: {e} - STOPPING for safety" log_error(reason) create_lockfile(reason) send_telegram(f"🚨 KILL SWITCH - API ERROR\n{reason}") return True def get_leverage_for_score(config, score): """Determine leverage based on signal score using configurable tiers. Tiers are sorted by min_score descending, so highest matching tier wins. Example tiers: [{"min_score": 45, "leverage": 5}, {"min_score": 60, "leverage": 7}] Score 55 → 5x, Score 65 → 7x """ max_leverage = config.get("max_leverage", 10) tiers = config.get("leverage_tiers", [ {"min_score": 45, "leverage": 5}, {"min_score": 60, "leverage": 7} ]) # Sort descending by min_score so we match the highest qualifying tier tiers_sorted = sorted(tiers, key=lambda t: t["min_score"], reverse=True) for tier in tiers_sorted: if score >= tier["min_score"]: return min(max_leverage, tier["leverage"]) # Fallback: lowest tier leverage or 5x return min(max_leverage, tiers_sorted[-1]["leverage"] if tiers_sorted else 5) def get_position_size(api, config, is_dry_run=False): """Calculate position size based on current equity.""" try: balance_response = api.get_futures_balance() # Handle different response formats if isinstance(balance_response, list): if balance_response: balance_data = balance_response[0] equity = float(balance_data.get("available", 0)) + float(balance_data.get("frozen", 0)) else: log_error("Empty balance list returned from API") return None # Return None instead of 0 elif isinstance(balance_response, dict): equity = float(balance_response.get("available", 0)) + float(balance_response.get("frozen", 0)) else: log_error(f"Unexpected balance response format: {type(balance_response)}") return None # Return None instead of 0 # SAFETY FIX #5: Zero equity check if equity <= 0: log_error(f"Equity is zero or negative: ${equity}") return None # This should trigger kill switch upstream position_size_pct = config.get("position_size_pct", 5.0) position_size = equity * (position_size_pct / 100) # SAFETY FIX #5: Validate calculated position size if position_size <= 0: log_error(f"Calculated position size is invalid: ${position_size}") return None log_trade(f"Position size: ${position_size:.2f} ({position_size_pct}% of ${equity:.2f})", is_dry_run) return position_size except Exception as e: log_error(f"Failed to calculate position size: {e}") return None # Return None instead of 0 def get_token_amount(api, market, usd_margin, leverage): """Convert USD margin to token amount for CoinEx order. CoinEx expects amount in base currency (tokens), not USD. notional = margin * leverage, then amount = notional / price. Also checks against market minimum amount.""" try: import requests as req # Get current price from ticker (public endpoint, no auth needed) ticker_resp = req.get("https://api.coinex.com/v2/futures/ticker", params={"market": market}, timeout=10).json() if ticker_resp.get("code") != 0 or not ticker_resp.get("data"): log_error(f"Cannot get ticker for {market}") return None ticker = ticker_resp["data"][0] if isinstance(ticker_resp["data"], list) else ticker_resp["data"] price = float(ticker.get("last", 0)) if price <= 0: log_error(f"Invalid price for {market}: {price}") return None # Get min_amount from market info (public endpoint) market_resp = req.get("https://api.coinex.com/v2/futures/market", params={"market": market}, timeout=10).json() min_amount = 1 if market_resp.get("code") == 0 and market_resp.get("data"): mkt = market_resp["data"][0] if isinstance(market_resp["data"], list) else market_resp["data"] min_amount = float(mkt.get("min_amount", 1)) # Calculate: notional = margin * leverage, amount = notional / price notional = usd_margin * leverage amount = int(notional / price) if amount < min_amount: log_error(f"{market}: calculated amount {amount} below minimum {int(min_amount)} (need ${min_amount * price / leverage:.2f} margin at {leverage}x)") return None log_trade(f"{market}: ${usd_margin:.2f} margin × {leverage}x = ${notional:.2f} notional = {amount} tokens @ ${price:.4f}") return amount except Exception as e: log_error(f"Failed to calculate token amount for {market}: {e}") return None def format_symbol_for_coinex(symbol): """Convert scanner symbol to CoinEx futures format.""" # Scanner uses symbols like "BTC", CoinEx uses "BTCUSDT" if symbol.endswith("USDT"): return symbol return f"{symbol}USDT" def run_short_scan(is_dry_run=False): """Run short scanner on all coins - same logic as paper trader.""" results = [] for symbol in SHORT_COINS: try: r = scan_coin(symbol) if r: r["direction"] = "short" r["coinex_symbol"] = format_symbol_for_coinex(symbol) results.append(r) time.sleep(0.15) except Exception as e: log_error(f"Short scan failed for {symbol}: {e}", is_dry_run) return sorted(results, key=lambda x: x['score'], reverse=True) def run_long_scan(is_dry_run=False): """Run long scanner - same logic as paper trader.""" results = [] for symbol in SHORT_COINS: try: r = scan_coin(symbol) if r: # Invert short criteria for long opportunities long_score = 0 reasons = [] if r['rsi'] <= 25: long_score += 30 reasons.append(f"RSI extremely oversold ({r['rsi']})") elif r['rsi'] <= 30: long_score += 25 reasons.append(f"RSI oversold ({r['rsi']})") elif r['rsi'] <= 35: long_score += 15 reasons.append(f"RSI low ({r['rsi']})") elif r['rsi'] <= 40: long_score += 5 reasons.append(f"RSI mildly low ({r['rsi']})") if r['vwap_pct'] < -5: long_score += 20 reasons.append(f"Well below VWAP ({r['vwap_pct']:+.1f}%)") elif r['vwap_pct'] < -3: long_score += 15 reasons.append(f"Below VWAP ({r['vwap_pct']:+.1f}%)") elif r['vwap_pct'] < -1: long_score += 8 reasons.append(f"Slightly below VWAP ({r['vwap_pct']:+.1f}%)") if r['change_24h'] < -15: long_score += 15 reasons.append(f"Dumped {r['change_24h']:.1f}% 24h") elif r['change_24h'] < -8: long_score += 10 reasons.append(f"Down {r['change_24h']:.1f}% 24h") elif r['change_24h'] < -4: long_score += 5 reasons.append(f"Down {r['change_24h']:.1f}% 24h") if r['bb_position'] < 0: long_score += 15 reasons.append(f"Below lower Bollinger ({r['bb_position']:.2f})") elif r['bb_position'] < 0.15: long_score += 10 reasons.append(f"Near lower Bollinger ({r['bb_position']:.2f})") results.append({ "symbol": r["symbol"], "coinex_symbol": format_symbol_for_coinex(r["symbol"]), "price": r["price"], "rsi": r["rsi"], "vwap_pct": r["vwap_pct"], "change_24h": r["change_24h"], "bb_position": r["bb_position"], "score": long_score, "reasons": reasons, "direction": "long", }) time.sleep(0.15) except Exception as e: log_error(f"Long scan failed for {symbol}: {e}", is_dry_run) return sorted(results, key=lambda x: x['score'], reverse=True) def manage_exits(api, state, config, dry_run=False): """Manage position exits for TP/SL/trailing stops.""" exits = [] try: positions = api.get_positions() # SAFETY FIX #2: Handle position query failure if positions is None: log_error("Position query failed - halting trading cycle for safety") return None # Return None to signal halt if not positions or not isinstance(positions, list): log_trade("No positions to manage (empty or invalid response)", dry_run) return exits for pos in positions: market = pos.get("market", "") position_id = f"{market}_{pos.get('side', '')}" unrealized_pnl = float(pos.get("unrealized_pnl", 0)) margin_used = float(pos.get("margin", 1)) if margin_used == 0: continue pnl_pct = (unrealized_pnl / margin_used) * 100 # Track peak PnL for trailing stop if position_id not in state.get("peak_pnl", {}): state["peak_pnl"][position_id] = pnl_pct if pnl_pct > state["peak_pnl"][position_id]: state["peak_pnl"][position_id] = pnl_pct peak = state["peak_pnl"][position_id] reason = None # Check exit conditions using config tp_pct = config.get("tp_pct", 5.0) sl_pct = config.get("sl_pct", -3.0) trailing_stop_pct = config.get("trailing_stop_pct", 2.0) if tp_pct and pnl_pct >= tp_pct: reason = f"TP hit ({pnl_pct:+.1f}%)" elif sl_pct and pnl_pct <= sl_pct: reason = f"SL hit ({pnl_pct:+.1f}%)" elif peak >= 2.0 and (peak - pnl_pct) >= trailing_stop_pct: reason = f"Trailing stop (peak {peak:+.1f}%, now {pnl_pct:+.1f}%)" if reason: try: # Calculate exit order side = "buy" if pos.get("side") == "sell" else "sell" amount = abs(float(pos.get("close_avbl", pos.get("open_interest", pos.get("amount", 0))))) if not dry_run and amount > 0: # Use retry logic for position closes success = close_position_with_retry(api, market, side, amount, dry_run) if success: log_trade(f"CLOSED {market} {pos.get('side')} - {reason} - PnL: ${unrealized_pnl:+.2f}", dry_run) exits.append({ "market": market, "side": pos.get("side"), "reason": reason, "pnl": unrealized_pnl, "pnl_pct": pnl_pct }) # Clean up peak tracking state["peak_pnl"].pop(position_id, None) else: log_trade(f"Would close {market} {pos.get('side')} - {reason}", dry_run) except Exception as e: log_error(f"Failed to close position {market}: {e}", dry_run) except Exception as e: log_error(f"Failed to manage exits: {e}", dry_run) return None # Signal failure return exits def open_new_positions(api, state, config, dry_run=False): """Scan for opportunities and open new positions.""" opened = [] try: # Get current positions to avoid doubling up positions = api.get_positions() # SAFETY FIX #2: Handle position query failure if positions is None: log_error("Position query failed - cannot safely open new positions") return None if not positions or not isinstance(positions, list): positions = [] existing_markets = {pos.get("market", "") for pos in positions if isinstance(pos, dict)} num_open = len([pos for pos in positions if isinstance(pos, dict) and pos.get("market")]) max_positions = config.get("max_positions", 3) slots_available = max_positions - num_open log_trade(f"Current positions: {num_open}, Available slots: {slots_available}", dry_run) if slots_available <= 0: return opened # Get position size position_size = get_position_size(api, config, dry_run) # SAFETY FIX #5: Position size validation if position_size is None or position_size <= 0: log_error("Position size is invalid - stopping new position opens for safety") if position_size is None: # This could indicate a kill-switch scenario return None return opened # Run scanners shorts = run_short_scan(dry_run) longs = run_long_scan(dry_run) # Get thresholds from config short_threshold = config.get("short_threshold", 50) long_threshold = config.get("long_threshold", 45) max_leverage = config.get("max_leverage", 10) # Process short opportunities for opportunity in shorts: if slots_available <= 0: break if opportunity["score"] < short_threshold: break if opportunity["coinex_symbol"] in existing_markets: continue # Determine leverage (capped by config) leverage = get_leverage_for_score(config, opportunity["score"]) market = opportunity["coinex_symbol"] try: if not dry_run: # Convert USD margin to token amount token_amount = get_token_amount(api, market, position_size, leverage) if token_amount is None: log_trade(f"SKIP SHORT {market} - amount below minimum", dry_run) continue # SAFETY FIX #7: Set and verify leverage api.set_leverage(market, leverage) # Place short order (sell) with verification result = api.place_order(market, "sell", "market", token_amount) log_trade(f"OPENED SHORT {market} {leverage}x - Score: {opportunity['score']} - {token_amount} tokens (${position_size:.2f} margin)", dry_run) opened.append({ "market": market, "direction": "short", "leverage": leverage, "score": opportunity["score"], "size": position_size, "order_id": result.get("order_id") }) existing_markets.add(market) slots_available -= 1 else: log_trade(f"Would open SHORT {market} {leverage}x - Score: {opportunity['score']}", dry_run) time.sleep(0.5) # Rate limiting except Exception as e: log_error(f"Failed to open short position {market}: {e}", dry_run) # Process long opportunities for opportunity in longs: if slots_available <= 0: break if opportunity["score"] < long_threshold: break if opportunity["coinex_symbol"] in existing_markets: continue # Determine leverage (capped by config) leverage = get_leverage_for_score(config, opportunity["score"]) market = opportunity["coinex_symbol"] try: if not dry_run: # Convert USD margin to token amount token_amount = get_token_amount(api, market, position_size, leverage) if token_amount is None: log_trade(f"SKIP LONG {market} - amount below minimum", dry_run) continue # SAFETY FIX #7: Set and verify leverage api.set_leverage(market, leverage) # Place long order (buy) with verification result = api.place_order(market, "buy", "market", token_amount) log_trade(f"OPENED LONG {market} {leverage}x - Score: {opportunity['score']} - {token_amount} tokens (${position_size:.2f} margin)", dry_run) opened.append({ "market": market, "direction": "long", "leverage": leverage, "score": opportunity["score"], "size": position_size, "order_id": result.get("order_id") }) existing_markets.add(market) slots_available -= 1 else: log_trade(f"Would open LONG {market} {leverage}x - Score: {opportunity['score']}", dry_run) time.sleep(0.5) # Rate limiting except Exception as e: log_error(f"Failed to open long position {market}: {e}", dry_run) except Exception as e: log_error(f"Failed to open new positions: {e}", dry_run) return None # Signal failure return opened def startup_safety_checks(api, is_dry_run=False): """Perform comprehensive safety checks before starting trading.""" log_trade("=== STARTUP SAFETY CHECKS ===", is_dry_run) # SAFETY FIX #10: Check lockfile first if check_lockfile(): sys.exit(1) log_trade("✓ Lockfile check passed", is_dry_run) # Check balance accessibility try: balance_response = api.get_futures_balance() if not balance_response: raise RuntimeError("Balance API returned empty response") # Calculate current equity current_equity = 0 if isinstance(balance_response, list): if balance_response: balance_data = balance_response[0] current_equity = float(balance_data.get("available", 0)) + float(balance_data.get("frozen", 0)) else: raise RuntimeError("Balance API returned empty list") elif isinstance(balance_response, dict): current_equity = float(balance_response.get("available", 0)) + float(balance_response.get("frozen", 0)) else: raise RuntimeError(f"Balance API returned unexpected format: {type(balance_response)}") log_trade(f"✓ Balance check passed - Current equity: ${current_equity:.2f}", is_dry_run) if current_equity <= 0: raise RuntimeError(f"Current equity is zero or negative: ${current_equity}") except Exception as e: log_error(f"Balance check failed: {e}") reason = f"Startup balance check failed: {e}" create_lockfile(reason) send_telegram(f"🚨 STARTUP FAILURE\n{reason}") sys.exit(1) # Check positions accessibility try: positions = api.get_positions() if positions is None: raise RuntimeError("Positions API failed to return data") log_trade(f"✓ Positions check passed - Found {len(positions) if positions else 0} positions", is_dry_run) # Log existing positions for awareness if positions: for pos in positions: market = pos.get("market", "Unknown") side = pos.get("side", "Unknown") pnl = float(pos.get("unrealized_pnl", 0)) log_trade(f" Existing position: {market} {side} PnL: ${pnl:+.2f}", is_dry_run) except Exception as e: log_error(f"Position check failed: {e}") reason = f"Startup position check failed: {e}" create_lockfile(reason) send_telegram(f"🚨 STARTUP FAILURE\n{reason}") sys.exit(1) log_trade("=== ALL STARTUP CHECKS PASSED ===", is_dry_run) def main(): parser = argparse.ArgumentParser(description="CoinEx Live Futures Trader - Enhanced Safety Version") parser.add_argument("--dry-run", action="store_true", help="Dry run mode - no actual trades") args = parser.parse_args() setup_logging() print(f"=== CoinEx Live Futures Trader - ENHANCED SAFETY ===") print(f"Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE TRADING'}") # SAFETY FIX #9: Clear mode prefixes in logs if args.dry_run: log_trade("Running in DRY RUN mode - no actual trades will be placed", True) else: log_trade("LIVE TRADING MODE - real money at risk", False) try: # Load configuration first config = load_config() log_trade(f"Loaded config - Mode: {config.get('mode', 'unknown')}") # Check if trading is paused if config.get("mode") == "paused": log_trade("Trading is PAUSED via config - exiting") print("Trading is PAUSED - check trader_config.json") return # Load credentials and initialize API access_id, secret_key = load_coinex_credentials() api = CoinExAPI(access_id, secret_key) # Update increment_failure_count to use config def increment_failure_count_configured(): increment_failure_count(config.get("circuit_breaker_threshold", 3)) # SAFETY FIX #10: Startup safety checks startup_safety_checks(api, args.dry_run) # Load state state = load_state() # CRITICAL: Check kill switch FIRST if check_kill_switch(api, state, config, args.dry_run): log_error("Kill switch triggered - stopping bot") send_telegram("🛑 Bot stopped due to kill switch") return # Manage exits first exits = manage_exits(api, state, config, args.dry_run) if exits is None: log_error("Exit management failed - stopping trading cycle") return if exits: exit_messages = [] for ex in exits: emoji = "✅" if ex["pnl"] > 0 else "❌" msg = f"{emoji} Closed {ex['market']} {ex.get('side', ex.get('direction',''))} - {ex['reason']} - PnL: ${ex['pnl']:+.2f}" exit_messages.append(msg) if exit_messages and not args.dry_run: send_telegram(f"🔄 Position Updates\n" + "\n".join(exit_messages)) # Open new positions opened = open_new_positions(api, state, config, args.dry_run) if opened is None: log_error("Position opening failed - stopping trading cycle") return if opened: open_messages = [] for op in opened: emoji = "🔴" if op["direction"] == "short" else "🟢" msg = f"{emoji} {op['market']} {op['direction']} {op['leverage']}x - Score: {op['score']}" open_messages.append(msg) if open_messages and not args.dry_run: send_telegram(f"📊 New Positions\n" + "\n".join(open_messages)) # Save state save_state(state) # Cycle summary alert if not args.dry_run: try: bal = api.get_futures_balance() avail = float(bal[0].get("available", 0)) if bal else 0 positions = api.get_positions() or [] lines = [f"⚡ Cycle Summary | {len(positions)} positions | ${avail:.2f} free"] for pos in positions: pnl = float(pos.get("unrealized_pnl", 0)) margin_est = float(pos.get("cml_position_value", 0)) / float(pos.get("leverage", 1)) pnl_pct = (pnl / margin_est * 100) if margin_est > 0 else 0 emoji = "🟢" if pnl >= 0 else "🔴" lines.append(f"{emoji} {pos['market']} {pos.get('side','')} {pnl_pct:+.0f}% (${pnl:+.2f})") if not positions: lines.append("No open positions") action = "no action" if exits: action = f"closed {len(exits)}" if opened: action = f"opened {len(opened)}" if not exits else f"{action}, opened {len(opened)}" lines.append(f"Action: {action}") send_telegram("\n".join(lines)) except Exception as e: log_error(f"Cycle summary alert failed: {e}") log_trade("Trade cycle completed successfully", args.dry_run) print("Done.") except Exception as e: log_error(f"Fatal error in main: {e}", args.dry_run) send_telegram(f"🚨 CoinEx Live Trader Error: {e}") # Create lockfile for unexpected errors create_lockfile(f"Fatal error: {e}") sys.exit(1) if __name__ == "__main__": main()