#!/usr/bin/env python3 """ Feed Monitor — Scrapes X home timeline via Chrome CDP (localhost:9222). Deduplicates, filters for money/trading topics, saves captures, sends Telegram alerts. """ import json import hashlib import os import sys import time import http.client import urllib.request from datetime import datetime, timezone from pathlib import Path PROJECT_DIR = Path(__file__).parent DATA_DIR = PROJECT_DIR / "data" SEEN_FILE = DATA_DIR / "seen_posts.json" CAPTURES_DIR = DATA_DIR / "feed_captures" CAPTURES_DIR.mkdir(parents=True, exist_ok=True) CDP_HOST = "localhost" CDP_PORT = 9222 TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "6443752046") MONEY_KEYWORDS = [ "polymarket", "trade", "trading", "profit", "arbitrage", "crypto", "bitcoin", "btc", "ethereum", "eth", "solana", "sol", "stock", "stocks", "market", "portfolio", "defi", "token", "whale", "bullish", "bearish", "short", "long", "pnl", "alpha", "degen", "usdc", "usdt", "wallet", "airdrop", "memecoin", "nft", "yield", "staking", "leverage", "futures", "options", "hedge", "pump", "dump", "rug", "moon", "bag", "position", "signal", ] def send_telegram(message: str): if not TELEGRAM_BOT_TOKEN: print(f"[ALERT] {message}") return url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" data = json.dumps({ "chat_id": TELEGRAM_CHAT_ID, "text": message, "parse_mode": "HTML", "disable_web_page_preview": True, }).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) try: urllib.request.urlopen(req, timeout=10) except Exception as e: print(f" Telegram error: {e}") def cdp_send(ws, method: str, params: dict = None, msg_id: int = 1): """Send a CDP command over websocket and return the result.""" import websocket payload = {"id": msg_id, "method": method} if params: payload["params"] = params ws.send(json.dumps(payload)) while True: resp = json.loads(ws.recv()) if resp.get("id") == msg_id: return resp.get("result", {}) def get_x_tab_ws(): """Find an X.com tab in Chrome and return its websocket URL.""" conn = http.client.HTTPConnection(CDP_HOST, CDP_PORT, timeout=5) conn.request("GET", "/json") tabs = json.loads(conn.getresponse().read()) conn.close() for t in tabs: url = t.get("url", "") if "x.com" in url or "twitter.com" in url: ws_url = t.get("webSocketDebuggerUrl") if ws_url: return ws_url, t.get("url") return None, None def scrape_feed_via_cdp(): """Navigate to X home, scroll, extract posts via DOM evaluation.""" import websocket ws_url, current_url = get_x_tab_ws() if not ws_url: print("ERROR: No X.com tab found in Chrome at localhost:9222") sys.exit(1) print(f"Connected to tab: {current_url}") ws = websocket.create_connection(ws_url, timeout=30) # Navigate to home timeline cdp_send(ws, "Page.navigate", {"url": "https://x.com/home"}, 1) time.sleep(5) all_posts = [] seen_texts = set() for scroll_i in range(6): # Extract posts from timeline js = """ (() => { const posts = []; document.querySelectorAll('article[data-testid="tweet"]').forEach(article => { try { const textEl = article.querySelector('[data-testid="tweetText"]'); const text = textEl ? textEl.innerText : ''; const userEl = article.querySelector('[data-testid="User-Name"]'); const userName = userEl ? userEl.innerText : ''; const timeEl = article.querySelector('time'); const timestamp = timeEl ? timeEl.getAttribute('datetime') : ''; const linkEl = article.querySelector('a[href*="/status/"]'); const link = linkEl ? linkEl.getAttribute('href') : ''; posts.push({ text, userName, timestamp, link }); } catch(e) {} }); return JSON.stringify(posts); })() """ result = cdp_send(ws, "Runtime.evaluate", {"expression": js, "returnByValue": True}, 10 + scroll_i) raw = result.get("result", {}).get("value", "[]") posts = json.loads(raw) if isinstance(raw, str) else [] for p in posts: sig = p.get("text", "")[:120] if sig and sig not in seen_texts: seen_texts.add(sig) all_posts.append(p) # Scroll down cdp_send(ws, "Runtime.evaluate", {"expression": "window.scrollBy(0, 2000)"}, 100 + scroll_i) time.sleep(2) ws.close() return all_posts def post_hash(post: dict) -> str: text = post.get("text", "") + post.get("userName", "") return hashlib.sha256(text.encode()).hexdigest()[:16] def is_money_related(text: str) -> bool: lower = text.lower() return any(kw in lower for kw in MONEY_KEYWORDS) def load_seen() -> set: if SEEN_FILE.exists(): try: return set(json.loads(SEEN_FILE.read_text())) except: pass return set() def save_seen(seen: set): # Keep last 10k items = list(seen)[-10000:] SEEN_FILE.write_text(json.dumps(items)) def main(): now = datetime.now(timezone.utc) print(f"=== Feed Monitor === {now.strftime('%Y-%m-%d %H:%M UTC')}") posts = scrape_feed_via_cdp() print(f"Scraped {len(posts)} posts from timeline") seen = load_seen() new_posts = [] money_posts = [] for p in posts: h = post_hash(p) if h in seen: continue seen.add(h) new_posts.append(p) if is_money_related(p.get("text", "")): money_posts.append(p) save_seen(seen) print(f"New posts: {len(new_posts)}") print(f"Money-related: {len(money_posts)}") # Save capture ts = now.strftime("%Y%m%d-%H%M") capture = { "timestamp": now.isoformat(), "total_scraped": len(posts), "new_posts": len(new_posts), "money_posts": len(money_posts), "posts": money_posts, } capture_file = CAPTURES_DIR / f"feed-{ts}.json" capture_file.write_text(json.dumps(capture, indent=2)) print(f"Saved capture: {capture_file}") # Alert on money posts if money_posts: print(f"\nšŸ”” {len(money_posts)} money-related posts found!") for p in money_posts[:8]: user = p.get("userName", "").split("\n")[0] snippet = p.get("text", "")[:250].replace("\n", " ") link = p.get("link", "") full_link = f"https://x.com{link}" if link and not link.startswith("http") else link print(f" • {user}: {snippet[:100]}...") msg = f"šŸ” {user}\n\n{snippet}" if full_link: msg += f"\n\n{full_link}" send_telegram(msg) else: print("No new money-related posts.") return len(money_posts) if __name__ == "__main__": count = main() sys.exit(0)