61 lines
2.0 KiB
Python
61 lines
2.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Wrapper to run crypto scan and send Telegram alerts.
|
|
Designed for systemd timer execution.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import urllib.request
|
|
from game_engine import run_scan, get_portfolio_value, get_trades
|
|
|
|
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
|
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "6443752046")
|
|
|
|
|
|
def send_telegram(message):
|
|
if not TELEGRAM_BOT_TOKEN:
|
|
return
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
|
data = json.dumps({
|
|
"chat_id": TELEGRAM_CHAT_ID,
|
|
"text": message,
|
|
"parse_mode": "HTML",
|
|
}).encode()
|
|
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
|
try:
|
|
urllib.request.urlopen(req, timeout=10)
|
|
except Exception as e:
|
|
print(f"Telegram error: {e}")
|
|
|
|
|
|
def main():
|
|
result = run_scan(max_positions=10, position_size=5000)
|
|
|
|
buys = result.get("buys", [])
|
|
sells = result.get("sells", [])
|
|
snapshot = result.get("snapshot", {})
|
|
|
|
# Only alert if there were trades
|
|
if buys or sells:
|
|
lines = ["🎮 <b>Crypto Watch Update</b>\n"]
|
|
|
|
for t in buys:
|
|
lines.append(f"📥 BUY {t['symbol']} @ ${t['price']:,.4f}")
|
|
lines.append(f" {t.get('reason', '')}\n")
|
|
|
|
for t in sells:
|
|
emoji = "🟢" if t.get("pnl", 0) >= 0 else "🔴"
|
|
lines.append(f"📤 SELL {t['symbol']} @ ${t['price']:,.4f}")
|
|
lines.append(f" {emoji} PnL: ${t.get('pnl', 0):+,.2f} ({t.get('pnl_pct', 0):+.1f}%)")
|
|
lines.append(f" {t.get('reason', '')}\n")
|
|
|
|
lines.append(f"💰 Portfolio: ${snapshot.get('total_value', 0):,.2f} ({snapshot.get('total_pnl_pct', 0):+.2f}%)")
|
|
lines.append(f"💵 Cash: ${snapshot.get('cash', 0):,.2f} | Positions: {snapshot.get('num_positions', 0)}")
|
|
|
|
send_telegram("\n".join(lines))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|