#!/usr/bin/env python3
"""
CoinEx Live Futures Trader - ENHANCED SAFETY VERSION
Real money trading based on the paper trading strategy.
Extremely careful implementation with kill switches and safety measures.
FUTURES ONLY — NEVER TOUCH SPOT
"""
import json
import os
import sys
import time
import hashlib
import hmac
import requests
import argparse
from datetime import datetime, timezone
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from scripts.short_scanner import scan_coin, COINS as SHORT_COINS
# Load CoinEx credentials
CREDENTIALS_FILE = Path.home() / ".openclaw/workspace/.credentials/coinex.env"
def load_coinex_credentials():
"""Load CoinEx API credentials from environment file."""
if not CREDENTIALS_FILE.exists():
raise RuntimeError(f"Credentials file not found: {CREDENTIALS_FILE}")
credentials = {}
with open(CREDENTIALS_FILE) as f:
for line in f:
line = line.strip()
if line and '=' in line and not line.startswith('#'):
key, value = line.split('=', 1)
credentials[key] = value
if 'COINEX_ACCESS_ID' not in credentials or 'COINEX_SECRET_KEY' not in credentials:
raise RuntimeError("Missing COINEX_ACCESS_ID or COINEX_SECRET_KEY in credentials file")
return credentials['COINEX_ACCESS_ID'], credentials['COINEX_SECRET_KEY']
# Configuration
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", os.environ.get("BOT_TOKEN", ""))
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", os.environ.get("CHAT_ID", "6443752046"))
# CoinEx API settings
COINEX_BASE_URL = "https://api.coinex.com"
# Data directory
DATA_DIR = Path(__file__).parent.parent / "data" / "coinex-live"
STATE_FILE = DATA_DIR / "trader_state.json"
TRADES_LOG = DATA_DIR / "trades.log"
ERROR_LOG = DATA_DIR / "errors.log"
LOCKFILE = DATA_DIR / "live-trader-lock.json"
CONFIG_FILE = DATA_DIR / "trader_config.json"
# KILL SWITCH SETTINGS
STARTING_BALANCE_KEY = "starting_balance"
RETRY_ATTEMPTS = 3
RETRY_DELAY = 2.0
# Global state for circuit breaker
consecutive_failures = 0
def setup_logging():
"""Ensure log files exist and directories are created."""
DATA_DIR.mkdir(parents=True, exist_ok=True)
for log_file in [TRADES_LOG, ERROR_LOG]:
if not log_file.exists():
log_file.write_text("")
def log_trade(message, is_dry_run=False):
"""Log trade information to trades.log with timestamp."""
prefix = "[DRY-RUN]" if is_dry_run else "[LIVE]"
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
log_entry = f"[{timestamp}] {prefix} {message}\n"
print(f"[TRADE] {prefix} {message}")
with open(TRADES_LOG, 'a') as f:
f.write(log_entry)
def log_error(message, is_dry_run=False):
"""Log error information to errors.log with timestamp."""
prefix = "[DRY-RUN]" if is_dry_run else "[LIVE]"
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
log_entry = f"[{timestamp}] {prefix} ERROR: {message}\n"
print(f"[ERROR] {prefix} {message}")
with open(ERROR_LOG, 'a') as f:
f.write(log_entry)
def load_config():
"""Load trader configuration from file with fallback to defaults."""
default_config = {
"mode": "paused",
"position_size_pct": 5.0,
"max_positions": 3,
"max_leverage": 10,
"kill_switch_drawdown_pct": 50.0,
"tp_pct": 5.0,
"sl_pct": -3.0,
"trailing_stop_pct": 2.0,
"circuit_breaker_threshold": 3,
"scan_interval_minutes": 5,
"long_threshold": 45,
"short_threshold": 50,
"coin_blacklist": [],
"coin_whitelist": []
}
if CONFIG_FILE.exists():
try:
config = json.loads(CONFIG_FILE.read_text())
# Merge with defaults to handle missing keys
for key, default_value in default_config.items():
if key not in config:
config[key] = default_value
return config
except Exception as e:
log_error(f"Failed to load config, using defaults: {e}")
return default_config
else:
# Create default config file
try:
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
CONFIG_FILE.write_text(json.dumps(default_config, indent=2))
log_trade("Created default trader_config.json")
except Exception as e:
log_error(f"Failed to create default config: {e}")
return default_config
def load_state():
"""Load trader state from file."""
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text())
except Exception as e:
log_error(f"Failed to load state: {e}")
return {"peak_pnl": {}, "last_alert": None}
return {"peak_pnl": {}, "last_alert": None}
def save_state(state):
"""Save trader state to file."""
try:
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2))
except Exception as e:
log_error(f"Failed to save state: {e}")
def create_lockfile(reason, last_equity=None, positions_at_halt=None):
"""Create lockfile to prevent restart after kill switch or circuit breaker."""
lockfile_data = {
"reason": reason,
"timestamp": datetime.now(timezone.utc).isoformat(),
"last_equity": last_equity,
"positions_at_halt": positions_at_halt or []
}
try:
LOCKFILE.parent.mkdir(parents=True, exist_ok=True)
LOCKFILE.write_text(json.dumps(lockfile_data, indent=2))
log_error(f"LOCKFILE CREATED: {reason}")
except Exception as e:
log_error(f"Failed to create lockfile: {e}")
def check_lockfile():
"""Check if lockfile exists and refuse to run if it does."""
if LOCKFILE.exists():
try:
lockfile_data = json.loads(LOCKFILE.read_text())
reason = lockfile_data.get("reason", "Unknown")
timestamp = lockfile_data.get("timestamp", "Unknown")
print(f"🚨 LOCKFILE EXISTS - Bot was halted due to: {reason}")
print(f"Halt time: {timestamp}")
print(f"To restart, manually delete: {LOCKFILE}")
print("⚠️ Only do this if you've verified the issue is resolved!")
return True
except Exception as e:
print(f"Error reading lockfile: {e}")
return True
return False
def send_telegram(message):
"""Send Telegram alert."""
if not TELEGRAM_BOT_TOKEN:
print(f"[TG] {message}")
return
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
response = requests.post(url, json={
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "HTML"
}, timeout=10)
response.raise_for_status()
except Exception as e:
log_error(f"Telegram failed: {e}")
def increment_failure_count(max_failures=3):
"""Increment consecutive API failure count and check circuit breaker."""
global consecutive_failures
consecutive_failures += 1
if consecutive_failures >= max_failures:
reason = f"Circuit breaker triggered - {consecutive_failures} consecutive API failures"
create_lockfile(reason)
send_telegram(f"🚨 CIRCUIT BREAKER TRIGGERED\n{reason}\nBot halted - manual intervention required")
log_error(reason)
sys.exit(1)
def reset_failure_count():
"""Reset consecutive failure count after successful API call."""
global consecutive_failures
consecutive_failures = 0
class CoinExAPI:
"""CoinEx API client with authentication and safety measures."""
def __init__(self, access_id, secret_key):
self.access_id = access_id
self.secret_key = secret_key
self.base_url = COINEX_BASE_URL
def _sign_request(self, method, path, params=None, json_data=None):
"""Generate CoinEx API signature."""
timestamp = str(int(time.time() * 1000))
if method == "GET" and params:
query_string = "&".join([f"{k}={v}" for k, v in sorted(params.items())])
prepared = f"{method}{path}?{query_string}{timestamp}"
elif method == "POST" and json_data:
import json as json_mod
body = json_mod.dumps(json_data, separators=(',', ':'))
prepared = f"{method}{path}{body}{timestamp}"
else:
prepared = f"{method}{path}{timestamp}"
sign = hmac.new(
self.secret_key.encode('latin-1'),
prepared.encode('latin-1'),
hashlib.sha256
).hexdigest().lower()
return {
"X-COINEX-KEY": self.access_id,
"X-COINEX-SIGN": sign,
"X-COINEX-TIMESTAMP": timestamp,
"Content-Type": "application/json"
}
def _request(self, method, path, params=None, json_data=None):
"""Make authenticated request to CoinEx API."""
try:
headers = self._sign_request(method, path, params, json_data)
url = f"{self.base_url}{path}"
if method == "GET":
response = requests.get(url, headers=headers, params=params, timeout=30)
elif method == "POST":
import json as json_mod
response = requests.post(url, headers=headers, data=json_mod.dumps(json_data, separators=(',', ':')), timeout=30)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
result = response.json()
if result.get("code") != 0:
raise RuntimeError(f"CoinEx API error: {result.get('message', 'Unknown error')}")
# Success - reset failure count
reset_failure_count()
return result.get("data", {})
except Exception as e:
log_error(f"CoinEx API request failed: {method} {path} - {e}")
increment_failure_count()
raise
def get_futures_balance(self):
"""Get futures account balance."""
return self._request("GET", "/v2/assets/futures/balance")
def get_market_price(self, market):
"""Get current market price for a symbol."""
return self._request("GET", "/v2/futures/market", {"market": market})
def get_positions(self):
"""Get all open futures positions. Returns None on failure (never empty list)."""
try:
result = self._request("GET", "/v2/futures/pending-position", params={"market_type": "futures"})
return result if result is not None else []
except Exception as e:
log_error(f"CRITICAL: Positions endpoint failed: {e}")
# SAFETY FIX #2: Return None on failure, never assume empty positions
return None
def place_order(self, market, side, type_order, amount, price=None, leverage=None):
"""Place a futures order with verification."""
data = {
"market": market,
"market_type": "FUTURES",
"side": side, # "buy" or "sell"
"type": type_order, # "market" or "limit"
"amount": str(amount)
}
if price:
data["price"] = str(price)
result = self._request("POST", "/v2/futures/order", json_data=data)
# SAFETY FIX #6: Verify order was accepted
if not result or "order_id" not in result:
raise RuntimeError(f"Order placement failed - no order ID returned: {result}")
order_id = result["order_id"]
log_trade(f"Order placed successfully - ID: {order_id}")
return result
def set_leverage(self, market, leverage):
"""Set leverage for a market and verify it was set correctly."""
data = {
"market": market,
"market_type": "FUTURES",
"margin_mode": "cross",
"leverage": leverage
}
result = self._request("POST", "/v2/futures/adjust-position-leverage", json_data=data)
# SAFETY FIX #7: Verify leverage was set correctly
if result:
actual_leverage = result.get("leverage", 0)
if actual_leverage and abs(int(actual_leverage) - leverage) > 0:
raise RuntimeError(f"Leverage verification failed - requested {leverage}x, got {actual_leverage}x")
log_trade(f"Leverage verified: {market} set to {leverage}x")
return result
def cancel_order(self, market, order_id):
"""Cancel a futures order."""
data = {
"market": market,
"market_type": "FUTURES",
"order_id": order_id
}
return self._request("POST", "/v2/futures/cancel-order", json_data=data)
def close_position_with_retry(api, market, side, amount, is_dry_run=False):
"""Close position with retry logic for emergency situations."""
if is_dry_run:
log_trade(f"Would close position {market} {side} amount {amount}", is_dry_run=True)
return True
# SAFETY FIX #3: Retry logic for position closes
for attempt in range(RETRY_ATTEMPTS):
try:
result = api.place_order(market, side, "market", amount)
log_trade(f"Position closed successfully: {market} (attempt {attempt + 1})")
return True
except Exception as e:
log_error(f"Failed to close position {market} attempt {attempt + 1}: {e}")
if attempt < RETRY_ATTEMPTS - 1:
time.sleep(RETRY_DELAY)
else:
# All retries exhausted - send emergency alert
emergency_msg = f"EMERGENCY: Failed to close position {market} after {RETRY_ATTEMPTS} retries — MANUAL INTERVENTION REQUIRED"
send_telegram(f"🚨 {emergency_msg}")
log_error(emergency_msg)
return False
return False
def check_kill_switch(api, state, config, is_dry_run=False):
"""Check if kill switch should be triggered due to drawdown."""
try:
balance_response = api.get_futures_balance()
# SAFETY FIX #1: Kill switch bypass protection
if not balance_response:
reason = "Balance API returned empty response - STOPPING for safety"
log_error(reason)
create_lockfile(reason)
send_telegram(f"🚨 KILL SWITCH - API FAILURE\n{reason}")
return True
# Handle different response formats
current_equity = 0
if isinstance(balance_response, list):
if balance_response:
balance_data = balance_response[0]
current_equity = float(balance_data.get("available", 0)) + float(balance_data.get("frozen", 0))
else:
reason = "Balance API returned empty list - STOPPING for safety"
log_error(reason)
create_lockfile(reason)
send_telegram(f"🚨 KILL SWITCH - EMPTY BALANCE\n{reason}")
return True
elif isinstance(balance_response, dict):
current_equity = float(balance_response.get("available", 0)) + float(balance_response.get("frozen", 0))
else:
reason = f"Balance API returned unexpected format: {type(balance_response)} - STOPPING for safety"
log_error(reason)
create_lockfile(reason)
send_telegram(f"🚨 KILL SWITCH - INVALID BALANCE FORMAT\n{reason}")
return True
# SAFETY FIX #5: Zero equity is a kill-switch event
if current_equity <= 0:
reason = f"Equity is zero or negative (${current_equity}) - KILL SWITCH TRIGGERED"
log_error(reason)
create_lockfile(reason, current_equity)
send_telegram(f"🚨 KILL SWITCH - ZERO EQUITY\n{reason}")
return True
# Initialize starting balance if not set
if STARTING_BALANCE_KEY not in state:
state[STARTING_BALANCE_KEY] = current_equity
save_state(state)
log_trade(f"Starting balance recorded: ${current_equity:.2f}", is_dry_run)
return False
starting_balance = state[STARTING_BALANCE_KEY]
drawdown = (starting_balance - current_equity) / starting_balance
log_trade(f"Balance check - Current: ${current_equity:.2f}, Starting: ${starting_balance:.2f}, Drawdown: {drawdown*100:.2f}%", is_dry_run)
kill_switch_threshold = config.get("kill_switch_drawdown_pct", 50.0) / 100.0
if drawdown >= kill_switch_threshold:
# KILL SWITCH TRIGGERED
msg = (f"🚨 KILL SWITCH TRIGGERED 🚨\n"
f"Account hit {drawdown*100:.1f}% drawdown\n"
f"Starting: ${starting_balance:.2f} → Current: ${current_equity:.2f}\n"
f"Bot stopped, attempting to close all positions")
send_telegram(msg)
log_error(f"KILL SWITCH: {drawdown*100:.1f}% drawdown")
# Try to close all positions with retry logic
positions_at_halt = []
try:
positions = api.get_positions()
if positions: # Only process if we got valid data
for pos in positions:
market = pos.get("market")
if market:
# Record position for lockfile
positions_at_halt.append({
"market": market,
"side": pos.get("side"),
"amount": pos.get("amount"),
"pnl": pos.get("unrealized_pnl", 0)
})
# Close with retry logic
side = "buy" if pos.get("side") == "sell" else "sell"
amount = abs(float(pos.get("close_avbl", pos.get("open_interest", pos.get("amount", 0)))))
if amount > 0:
success = close_position_with_retry(api, market, side, amount, is_dry_run)
if success:
log_trade(f"Kill switch: closed position {market}", is_dry_run)
except Exception as e:
log_error(f"Failed to close positions during kill switch: {e}")
# Create lockfile
create_lockfile(f"Kill switch - {drawdown*100:.1f}% drawdown", current_equity, positions_at_halt)
return True
return False
except Exception as e:
# SAFETY FIX #1: API failure = STOP, don't continue
reason = f"Kill switch check failed: {e} - STOPPING for safety"
log_error(reason)
create_lockfile(reason)
send_telegram(f"🚨 KILL SWITCH - API ERROR\n{reason}")
return True
def get_leverage_for_score(config, score):
"""Determine leverage based on signal score using configurable tiers.
Tiers are sorted by min_score descending, so highest matching tier wins.
Example tiers: [{"min_score": 45, "leverage": 5}, {"min_score": 60, "leverage": 7}]
Score 55 → 5x, Score 65 → 7x
"""
max_leverage = config.get("max_leverage", 10)
tiers = config.get("leverage_tiers", [
{"min_score": 45, "leverage": 5},
{"min_score": 60, "leverage": 7}
])
# Sort descending by min_score so we match the highest qualifying tier
tiers_sorted = sorted(tiers, key=lambda t: t["min_score"], reverse=True)
for tier in tiers_sorted:
if score >= tier["min_score"]:
return min(max_leverage, tier["leverage"])
# Fallback: lowest tier leverage or 5x
return min(max_leverage, tiers_sorted[-1]["leverage"] if tiers_sorted else 5)
def get_position_size(api, config, is_dry_run=False):
"""Calculate position size based on current equity."""
try:
balance_response = api.get_futures_balance()
# Handle different response formats
if isinstance(balance_response, list):
if balance_response:
balance_data = balance_response[0]
equity = float(balance_data.get("available", 0)) + float(balance_data.get("frozen", 0))
else:
log_error("Empty balance list returned from API")
return None # Return None instead of 0
elif isinstance(balance_response, dict):
equity = float(balance_response.get("available", 0)) + float(balance_response.get("frozen", 0))
else:
log_error(f"Unexpected balance response format: {type(balance_response)}")
return None # Return None instead of 0
# SAFETY FIX #5: Zero equity check
if equity <= 0:
log_error(f"Equity is zero or negative: ${equity}")
return None # This should trigger kill switch upstream
position_size_pct = config.get("position_size_pct", 5.0)
position_size = equity * (position_size_pct / 100)
# SAFETY FIX #5: Validate calculated position size
if position_size <= 0:
log_error(f"Calculated position size is invalid: ${position_size}")
return None
log_trade(f"Position size: ${position_size:.2f} ({position_size_pct}% of ${equity:.2f})", is_dry_run)
return position_size
except Exception as e:
log_error(f"Failed to calculate position size: {e}")
return None # Return None instead of 0
def get_token_amount(api, market, usd_margin, leverage):
"""Convert USD margin to token amount for CoinEx order.
CoinEx expects amount in base currency (tokens), not USD.
notional = margin * leverage, then amount = notional / price.
Also checks against market minimum amount."""
try:
import requests as req
# Get current price from ticker (public endpoint, no auth needed)
ticker_resp = req.get("https://api.coinex.com/v2/futures/ticker",
params={"market": market}, timeout=10).json()
if ticker_resp.get("code") != 0 or not ticker_resp.get("data"):
log_error(f"Cannot get ticker for {market}")
return None
ticker = ticker_resp["data"][0] if isinstance(ticker_resp["data"], list) else ticker_resp["data"]
price = float(ticker.get("last", 0))
if price <= 0:
log_error(f"Invalid price for {market}: {price}")
return None
# Get min_amount from market info (public endpoint)
market_resp = req.get("https://api.coinex.com/v2/futures/market",
params={"market": market}, timeout=10).json()
min_amount = 1
if market_resp.get("code") == 0 and market_resp.get("data"):
mkt = market_resp["data"][0] if isinstance(market_resp["data"], list) else market_resp["data"]
min_amount = float(mkt.get("min_amount", 1))
# Calculate: notional = margin * leverage, amount = notional / price
notional = usd_margin * leverage
amount = int(notional / price)
if amount < min_amount:
log_error(f"{market}: calculated amount {amount} below minimum {int(min_amount)} (need ${min_amount * price / leverage:.2f} margin at {leverage}x)")
return None
log_trade(f"{market}: ${usd_margin:.2f} margin × {leverage}x = ${notional:.2f} notional = {amount} tokens @ ${price:.4f}")
return amount
except Exception as e:
log_error(f"Failed to calculate token amount for {market}: {e}")
return None
def format_symbol_for_coinex(symbol):
"""Convert scanner symbol to CoinEx futures format."""
# Scanner uses symbols like "BTC", CoinEx uses "BTCUSDT"
if symbol.endswith("USDT"):
return symbol
return f"{symbol}USDT"
def run_short_scan(is_dry_run=False):
"""Run short scanner on all coins - same logic as paper trader."""
results = []
for symbol in SHORT_COINS:
try:
r = scan_coin(symbol)
if r:
r["direction"] = "short"
r["coinex_symbol"] = format_symbol_for_coinex(symbol)
results.append(r)
time.sleep(0.15)
except Exception as e:
log_error(f"Short scan failed for {symbol}: {e}", is_dry_run)
return sorted(results, key=lambda x: x['score'], reverse=True)
def run_long_scan(is_dry_run=False):
"""Run long scanner - same logic as paper trader."""
results = []
for symbol in SHORT_COINS:
try:
r = scan_coin(symbol)
if r:
# Invert short criteria for long opportunities
long_score = 0
reasons = []
if r['rsi'] <= 25:
long_score += 30
reasons.append(f"RSI extremely oversold ({r['rsi']})")
elif r['rsi'] <= 30:
long_score += 25
reasons.append(f"RSI oversold ({r['rsi']})")
elif r['rsi'] <= 35:
long_score += 15
reasons.append(f"RSI low ({r['rsi']})")
elif r['rsi'] <= 40:
long_score += 5
reasons.append(f"RSI mildly low ({r['rsi']})")
if r['vwap_pct'] < -5:
long_score += 20
reasons.append(f"Well below VWAP ({r['vwap_pct']:+.1f}%)")
elif r['vwap_pct'] < -3:
long_score += 15
reasons.append(f"Below VWAP ({r['vwap_pct']:+.1f}%)")
elif r['vwap_pct'] < -1:
long_score += 8
reasons.append(f"Slightly below VWAP ({r['vwap_pct']:+.1f}%)")
if r['change_24h'] < -15:
long_score += 15
reasons.append(f"Dumped {r['change_24h']:.1f}% 24h")
elif r['change_24h'] < -8:
long_score += 10
reasons.append(f"Down {r['change_24h']:.1f}% 24h")
elif r['change_24h'] < -4:
long_score += 5
reasons.append(f"Down {r['change_24h']:.1f}% 24h")
if r['bb_position'] < 0:
long_score += 15
reasons.append(f"Below lower Bollinger ({r['bb_position']:.2f})")
elif r['bb_position'] < 0.15:
long_score += 10
reasons.append(f"Near lower Bollinger ({r['bb_position']:.2f})")
results.append({
"symbol": r["symbol"],
"coinex_symbol": format_symbol_for_coinex(r["symbol"]),
"price": r["price"],
"rsi": r["rsi"],
"vwap_pct": r["vwap_pct"],
"change_24h": r["change_24h"],
"bb_position": r["bb_position"],
"score": long_score,
"reasons": reasons,
"direction": "long",
})
time.sleep(0.15)
except Exception as e:
log_error(f"Long scan failed for {symbol}: {e}", is_dry_run)
return sorted(results, key=lambda x: x['score'], reverse=True)
def manage_exits(api, state, config, dry_run=False):
"""Manage position exits for TP/SL/trailing stops."""
exits = []
try:
positions = api.get_positions()
# SAFETY FIX #2: Handle position query failure
if positions is None:
log_error("Position query failed - halting trading cycle for safety")
return None # Return None to signal halt
if not positions or not isinstance(positions, list):
log_trade("No positions to manage (empty or invalid response)", dry_run)
return exits
for pos in positions:
market = pos.get("market", "")
position_id = f"{market}_{pos.get('side', '')}"
unrealized_pnl = float(pos.get("unrealized_pnl", 0))
margin_used = float(pos.get("margin", 1))
if margin_used == 0:
continue
pnl_pct = (unrealized_pnl / margin_used) * 100
# Track peak PnL for trailing stop
if position_id not in state.get("peak_pnl", {}):
state["peak_pnl"][position_id] = pnl_pct
if pnl_pct > state["peak_pnl"][position_id]:
state["peak_pnl"][position_id] = pnl_pct
peak = state["peak_pnl"][position_id]
reason = None
# Check exit conditions using config
tp_pct = config.get("tp_pct", 5.0)
sl_pct = config.get("sl_pct", -3.0)
trailing_stop_pct = config.get("trailing_stop_pct", 2.0)
if tp_pct and pnl_pct >= tp_pct:
reason = f"TP hit ({pnl_pct:+.1f}%)"
elif sl_pct and pnl_pct <= sl_pct:
reason = f"SL hit ({pnl_pct:+.1f}%)"
elif peak >= 2.0 and (peak - pnl_pct) >= trailing_stop_pct:
reason = f"Trailing stop (peak {peak:+.1f}%, now {pnl_pct:+.1f}%)"
if reason:
try:
# Calculate exit order
side = "buy" if pos.get("side") == "sell" else "sell"
amount = abs(float(pos.get("close_avbl", pos.get("open_interest", pos.get("amount", 0)))))
if not dry_run and amount > 0:
# Use retry logic for position closes
success = close_position_with_retry(api, market, side, amount, dry_run)
if success:
log_trade(f"CLOSED {market} {pos.get('side')} - {reason} - PnL: ${unrealized_pnl:+.2f}", dry_run)
exits.append({
"market": market,
"side": pos.get("side"),
"reason": reason,
"pnl": unrealized_pnl,
"pnl_pct": pnl_pct
})
# Clean up peak tracking
state["peak_pnl"].pop(position_id, None)
else:
log_trade(f"Would close {market} {pos.get('side')} - {reason}", dry_run)
except Exception as e:
log_error(f"Failed to close position {market}: {e}", dry_run)
except Exception as e:
log_error(f"Failed to manage exits: {e}", dry_run)
return None # Signal failure
return exits
def open_new_positions(api, state, config, dry_run=False):
"""Scan for opportunities and open new positions."""
opened = []
try:
# Get current positions to avoid doubling up
positions = api.get_positions()
# SAFETY FIX #2: Handle position query failure
if positions is None:
log_error("Position query failed - cannot safely open new positions")
return None
if not positions or not isinstance(positions, list):
positions = []
existing_markets = {pos.get("market", "") for pos in positions if isinstance(pos, dict)}
num_open = len([pos for pos in positions if isinstance(pos, dict) and pos.get("market")])
max_positions = config.get("max_positions", 3)
slots_available = max_positions - num_open
log_trade(f"Current positions: {num_open}, Available slots: {slots_available}", dry_run)
if slots_available <= 0:
return opened
# Get position size
position_size = get_position_size(api, config, dry_run)
# SAFETY FIX #5: Position size validation
if position_size is None or position_size <= 0:
log_error("Position size is invalid - stopping new position opens for safety")
if position_size is None:
# This could indicate a kill-switch scenario
return None
return opened
# Run scanners
shorts = run_short_scan(dry_run)
longs = run_long_scan(dry_run)
# Get thresholds from config
short_threshold = config.get("short_threshold", 50)
long_threshold = config.get("long_threshold", 45)
max_leverage = config.get("max_leverage", 10)
# Process short opportunities
for opportunity in shorts:
if slots_available <= 0:
break
if opportunity["score"] < short_threshold:
break
if opportunity["coinex_symbol"] in existing_markets:
continue
# Determine leverage (capped by config)
leverage = get_leverage_for_score(config, opportunity["score"])
market = opportunity["coinex_symbol"]
try:
if not dry_run:
# Convert USD margin to token amount
token_amount = get_token_amount(api, market, position_size, leverage)
if token_amount is None:
log_trade(f"SKIP SHORT {market} - amount below minimum", dry_run)
continue
# SAFETY FIX #7: Set and verify leverage
api.set_leverage(market, leverage)
# Place short order (sell) with verification
result = api.place_order(market, "sell", "market", token_amount)
log_trade(f"OPENED SHORT {market} {leverage}x - Score: {opportunity['score']} - {token_amount} tokens (${position_size:.2f} margin)", dry_run)
opened.append({
"market": market,
"direction": "short",
"leverage": leverage,
"score": opportunity["score"],
"size": position_size,
"order_id": result.get("order_id")
})
existing_markets.add(market)
slots_available -= 1
else:
log_trade(f"Would open SHORT {market} {leverage}x - Score: {opportunity['score']}", dry_run)
time.sleep(0.5) # Rate limiting
except Exception as e:
log_error(f"Failed to open short position {market}: {e}", dry_run)
# Process long opportunities
for opportunity in longs:
if slots_available <= 0:
break
if opportunity["score"] < long_threshold:
break
if opportunity["coinex_symbol"] in existing_markets:
continue
# Determine leverage (capped by config)
leverage = get_leverage_for_score(config, opportunity["score"])
market = opportunity["coinex_symbol"]
try:
if not dry_run:
# Convert USD margin to token amount
token_amount = get_token_amount(api, market, position_size, leverage)
if token_amount is None:
log_trade(f"SKIP LONG {market} - amount below minimum", dry_run)
continue
# SAFETY FIX #7: Set and verify leverage
api.set_leverage(market, leverage)
# Place long order (buy) with verification
result = api.place_order(market, "buy", "market", token_amount)
log_trade(f"OPENED LONG {market} {leverage}x - Score: {opportunity['score']} - {token_amount} tokens (${position_size:.2f} margin)", dry_run)
opened.append({
"market": market,
"direction": "long",
"leverage": leverage,
"score": opportunity["score"],
"size": position_size,
"order_id": result.get("order_id")
})
existing_markets.add(market)
slots_available -= 1
else:
log_trade(f"Would open LONG {market} {leverage}x - Score: {opportunity['score']}", dry_run)
time.sleep(0.5) # Rate limiting
except Exception as e:
log_error(f"Failed to open long position {market}: {e}", dry_run)
except Exception as e:
log_error(f"Failed to open new positions: {e}", dry_run)
return None # Signal failure
return opened
def startup_safety_checks(api, is_dry_run=False):
"""Perform comprehensive safety checks before starting trading."""
log_trade("=== STARTUP SAFETY CHECKS ===", is_dry_run)
# SAFETY FIX #10: Check lockfile first
if check_lockfile():
sys.exit(1)
log_trade("✓ Lockfile check passed", is_dry_run)
# Check balance accessibility
try:
balance_response = api.get_futures_balance()
if not balance_response:
raise RuntimeError("Balance API returned empty response")
# Calculate current equity
current_equity = 0
if isinstance(balance_response, list):
if balance_response:
balance_data = balance_response[0]
current_equity = float(balance_data.get("available", 0)) + float(balance_data.get("frozen", 0))
else:
raise RuntimeError("Balance API returned empty list")
elif isinstance(balance_response, dict):
current_equity = float(balance_response.get("available", 0)) + float(balance_response.get("frozen", 0))
else:
raise RuntimeError(f"Balance API returned unexpected format: {type(balance_response)}")
log_trade(f"✓ Balance check passed - Current equity: ${current_equity:.2f}", is_dry_run)
if current_equity <= 0:
raise RuntimeError(f"Current equity is zero or negative: ${current_equity}")
except Exception as e:
log_error(f"Balance check failed: {e}")
reason = f"Startup balance check failed: {e}"
create_lockfile(reason)
send_telegram(f"🚨 STARTUP FAILURE\n{reason}")
sys.exit(1)
# Check positions accessibility
try:
positions = api.get_positions()
if positions is None:
raise RuntimeError("Positions API failed to return data")
log_trade(f"✓ Positions check passed - Found {len(positions) if positions else 0} positions", is_dry_run)
# Log existing positions for awareness
if positions:
for pos in positions:
market = pos.get("market", "Unknown")
side = pos.get("side", "Unknown")
pnl = float(pos.get("unrealized_pnl", 0))
log_trade(f" Existing position: {market} {side} PnL: ${pnl:+.2f}", is_dry_run)
except Exception as e:
log_error(f"Position check failed: {e}")
reason = f"Startup position check failed: {e}"
create_lockfile(reason)
send_telegram(f"🚨 STARTUP FAILURE\n{reason}")
sys.exit(1)
log_trade("=== ALL STARTUP CHECKS PASSED ===", is_dry_run)
def main():
parser = argparse.ArgumentParser(description="CoinEx Live Futures Trader - Enhanced Safety Version")
parser.add_argument("--dry-run", action="store_true", help="Dry run mode - no actual trades")
args = parser.parse_args()
setup_logging()
print(f"=== CoinEx Live Futures Trader - ENHANCED SAFETY ===")
print(f"Time: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE TRADING'}")
# SAFETY FIX #9: Clear mode prefixes in logs
if args.dry_run:
log_trade("Running in DRY RUN mode - no actual trades will be placed", True)
else:
log_trade("LIVE TRADING MODE - real money at risk", False)
try:
# Load configuration first
config = load_config()
log_trade(f"Loaded config - Mode: {config.get('mode', 'unknown')}")
# Check if trading is paused
if config.get("mode") == "paused":
log_trade("Trading is PAUSED via config - exiting")
print("Trading is PAUSED - check trader_config.json")
return
# Load credentials and initialize API
access_id, secret_key = load_coinex_credentials()
api = CoinExAPI(access_id, secret_key)
# Update increment_failure_count to use config
def increment_failure_count_configured():
increment_failure_count(config.get("circuit_breaker_threshold", 3))
# SAFETY FIX #10: Startup safety checks
startup_safety_checks(api, args.dry_run)
# Load state
state = load_state()
# CRITICAL: Check kill switch FIRST
if check_kill_switch(api, state, config, args.dry_run):
log_error("Kill switch triggered - stopping bot")
send_telegram("🛑 Bot stopped due to kill switch")
return
# Manage exits first
exits = manage_exits(api, state, config, args.dry_run)
if exits is None:
log_error("Exit management failed - stopping trading cycle")
return
if exits:
exit_messages = []
for ex in exits:
emoji = "✅" if ex["pnl"] > 0 else "❌"
msg = f"{emoji} Closed {ex['market']} {ex.get('side', ex.get('direction',''))} - {ex['reason']} - PnL: ${ex['pnl']:+.2f}"
exit_messages.append(msg)
if exit_messages and not args.dry_run:
send_telegram(f"🔄 Position Updates\n" + "\n".join(exit_messages))
# Open new positions
opened = open_new_positions(api, state, config, args.dry_run)
if opened is None:
log_error("Position opening failed - stopping trading cycle")
return
if opened:
open_messages = []
for op in opened:
emoji = "🔴" if op["direction"] == "short" else "🟢"
msg = f"{emoji} {op['market']} {op['direction']} {op['leverage']}x - Score: {op['score']}"
open_messages.append(msg)
if open_messages and not args.dry_run:
send_telegram(f"📊 New Positions\n" + "\n".join(open_messages))
# Save state
save_state(state)
# Cycle summary alert
if not args.dry_run:
try:
bal = api.get_futures_balance()
avail = float(bal[0].get("available", 0)) if bal else 0
positions = api.get_positions() or []
lines = [f"⚡ Cycle Summary | {len(positions)} positions | ${avail:.2f} free"]
for pos in positions:
pnl = float(pos.get("unrealized_pnl", 0))
margin_est = float(pos.get("cml_position_value", 0)) / float(pos.get("leverage", 1))
pnl_pct = (pnl / margin_est * 100) if margin_est > 0 else 0
emoji = "🟢" if pnl >= 0 else "🔴"
lines.append(f"{emoji} {pos['market']} {pos.get('side','')} {pnl_pct:+.0f}% (${pnl:+.2f})")
if not positions:
lines.append("No open positions")
action = "no action"
if exits:
action = f"closed {len(exits)}"
if opened:
action = f"opened {len(opened)}" if not exits else f"{action}, opened {len(opened)}"
lines.append(f"Action: {action}")
send_telegram("\n".join(lines))
except Exception as e:
log_error(f"Cycle summary alert failed: {e}")
log_trade("Trade cycle completed successfully", args.dry_run)
print("Done.")
except Exception as e:
log_error(f"Fatal error in main: {e}", args.dry_run)
send_telegram(f"🚨 CoinEx Live Trader Error: {e}")
# Create lockfile for unexpected errors
create_lockfile(f"Fatal error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()