Files
workspace/projects/feed-hunter/anoin123-monitor.py

123 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
anoin123 Trade Monitor
Polls Polymarket for new trades and sends Telegram alerts.
Runs as systemd timer every 5 minutes.
"""
import json
import os
import sys
import urllib.request
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
WALLET = "0x96489abcb9f583d6835c8ef95ffc923d05a86825"
PROJECT_DIR = Path(__file__).parent
DATA_DIR = PROJECT_DIR / "data" / "anoin123-tracking"
TRADES_FILE = DATA_DIR / "all-trades.json"
STATS_FILE = DATA_DIR / "stats.json"
CRED_FILE = Path("/home/wdjones/.openclaw/workspace/.credentials/telegram-bot.env")
CHAT_ID = "6443752046"
def load_bot_token():
with open(CRED_FILE) as f:
for line in f:
if line.startswith("BOT_TOKEN="):
return line.strip().split("=", 1)[1]
raise RuntimeError("BOT_TOKEN not found in credentials")
def send_telegram(text, token):
url = f"https://api.telegram.org/bot{token}/sendMessage"
data = urllib.parse.urlencode({
"chat_id": CHAT_ID,
"text": text,
"parse_mode": "HTML",
}).encode()
try:
req = urllib.request.Request(url, data=data)
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f"Telegram send failed: {e}", file=sys.stderr)
def fetch_trades(limit=100):
url = f"https://data-api.polymarket.com/trades?user={WALLET}&limit={limit}"
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=15)
return json.loads(resp.read())
def check_new_trades(token):
DATA_DIR.mkdir(parents=True, exist_ok=True)
known_hashes = set()
all_trades = []
if TRADES_FILE.exists():
with open(TRADES_FILE) as f:
all_trades = json.load(f)
known_hashes = {
t.get("transactionHash", "") + str(t.get("outcomeIndex", ""))
for t in all_trades
}
recent = fetch_trades(100)
new_trades = []
for t in recent:
key = t.get("transactionHash", "") + str(t.get("outcomeIndex", ""))
if key not in known_hashes:
new_trades.append(t)
known_hashes.add(key)
all_trades.append(t)
if new_trades:
with open(TRADES_FILE, "w") as f:
json.dump(all_trades, f)
lines = [f"🎯 <b>anoin123 New Activity</b> ({len(new_trades)} trades)"]
for t in new_trades[:15]:
side = t.get("side", "?")
amt = t.get("size", 0)
price = t.get("price", 0)
title = t.get("title", "")[:60]
outcome = t.get("outcome", "")
icon = "📈" if side == "BUY" else "📉"
lines.append(
f" {icon} {side} ${amt:,.2f} @ {price:.3f}{title} ({outcome})"
)
if len(new_trades) > 15:
lines.append(f" ... and {len(new_trades) - 15} more")
send_telegram("\n".join(lines), token)
print(f"Alerted: {len(new_trades)} new trades")
else:
print("No new trades")
def main():
token = load_bot_token()
try:
check_new_trades(token)
except Exception as e:
print(f"Trade check error: {e}", file=sys.stderr)
# Update stats
DATA_DIR.mkdir(parents=True, exist_ok=True)
stats = {}
if STATS_FILE.exists():
with open(STATS_FILE) as f:
stats = json.load(f)
stats["last_check"] = datetime.now(timezone.utc).isoformat()
with open(STATS_FILE, "w") as f:
json.dump(stats, f, indent=2)
if __name__ == "__main__":
main()