#!/usr/bin/env python3 """ Deep scraper for X/Twitter feed — extracts structured post data via CDP. Connects to an existing Chrome instance with remote debugging enabled. Usage: python3 scrape-x-feed.py [--port 9222] [--scroll-pages 5] [--output DIR] Requires: Chrome running with --remote-debugging-port=9222 """ import argparse import json import os import sys import time import subprocess import urllib.request from datetime import datetime import websocket def cdp_send(ws, method, params=None, msg_id=[0]): """Send CDP command over an open WebSocket connection.""" msg_id[0] += 1 mid = msg_id[0] msg = {"id": mid, "method": method, "params": params or {}} ws.settimeout(15) ws.send(json.dumps(msg)) for _ in range(100): # max 100 events before giving up try: resp = json.loads(ws.recv()) except Exception: return {"error": "timeout"} if resp.get("id") == mid: return resp # Skip CDP events return {"error": "too many events"} def get_ws_url(port): """Get WebSocket debugger URL from Chrome DevTools.""" url = f"http://127.0.0.1:{port}/json" resp = urllib.request.urlopen(url, timeout=5) tabs = json.loads(resp.read()) for tab in tabs: if "x.com" in tab.get("url", ""): return tab["webSocketDebuggerUrl"] # Fallback to first tab if tabs: return tabs[0]["webSocketDebuggerUrl"] raise RuntimeError("No Chrome tabs found") # JavaScript to extract tweets from the DOM EXTRACT_JS = r""" (() => { const posts = []; const seen = new Set(); // X/Twitter uses article elements for tweets const articles = document.querySelectorAll('article[data-testid="tweet"]'); for (const article of articles) { try { // Author info const userLinks = article.querySelectorAll('a[role="link"]'); let displayName = ''; let handle = ''; for (const link of userLinks) { const href = link.getAttribute('href') || ''; if (href.match(/^\/[a-zA-Z0-9_]+$/) && !href.includes('/status/')) { if (!handle) handle = href.replace('/', '@'); const nameEl = link.querySelector('span'); if (nameEl && !displayName) displayName = nameEl.textContent.trim(); } } // Tweet text const textEl = article.querySelector('[data-testid="tweetText"]'); const text = textEl ? textEl.textContent.trim() : ''; // Skip if we've seen this exact text+author combo const key = `${handle}:${text.slice(0, 50)}`; if (seen.has(key)) continue; seen.add(key); // Timestamp const timeEl = article.querySelector('time'); const timestamp = timeEl ? timeEl.getAttribute('datetime') : null; const timeText = timeEl ? timeEl.textContent.trim() : ''; // Link to tweet let tweetUrl = ''; const statusLinks = article.querySelectorAll('a[href*="/status/"]'); for (const sl of statusLinks) { const href = sl.getAttribute('href') || ''; if (href.match(/\/status\/\d+$/)) { tweetUrl = 'https://x.com' + href; break; } } // Engagement metrics const metrics = {}; const groups = article.querySelectorAll('[role="group"]'); for (const group of groups) { const buttons = group.querySelectorAll('button'); for (const btn of buttons) { const label = btn.getAttribute('aria-label') || ''; if (label.includes('repl')) { const m = label.match(/(\d[\d,.]*)/); if (m) metrics.replies = m[1]; } else if (label.includes('repost') || label.includes('Repost')) { const m = label.match(/(\d[\d,.]*)/); if (m) metrics.reposts = m[1]; } else if (label.includes('like') || label.includes('Like')) { const m = label.match(/(\d[\d,.]*)/); if (m) metrics.likes = m[1]; } else if (label.includes('view') || label.includes('View')) { const m = label.match(/(\d[\d,.]*)/); if (m) metrics.views = m[1]; } else if (label.includes('bookmark') || label.includes('Bookmark')) { const m = label.match(/(\d[\d,.]*)/); if (m) metrics.bookmarks = m[1]; } } } // Embedded links const links = []; if (textEl) { const anchors = textEl.querySelectorAll('a'); for (const a of anchors) { const href = a.getAttribute('href') || ''; const linkText = a.textContent.trim(); if (href && !href.startsWith('/')) { links.push({ url: href, text: linkText }); } else if (href.startsWith('/')) { links.push({ url: 'https://x.com' + href, text: linkText }); } } } // Media (images/video indicators) const media = []; const imgs = article.querySelectorAll('[data-testid="tweetPhoto"] img'); for (const img of imgs) { media.push({ type: 'image', src: img.src }); } const videoEl = article.querySelector('[data-testid="videoPlayer"]'); if (videoEl) media.push({ type: 'video' }); // Card/preview const card = article.querySelector('[data-testid="card.wrapper"]'); let cardData = null; if (card) { const cardTitle = card.querySelector('[data-testid="card.layoutLarge.title"], [data-testid="card.layoutSmall.title"]'); const cardDesc = card.querySelector('[data-testid="card.layoutLarge.description"], [data-testid="card.layoutSmall.description"]'); const cardLink = card.querySelector('a'); cardData = { title: cardTitle ? cardTitle.textContent.trim() : null, description: cardDesc ? cardDesc.textContent.trim() : null, url: cardLink ? cardLink.getAttribute('href') : null }; } // Is it a repost? const socialContext = article.querySelector('[data-testid="socialContext"]'); let repostBy = null; if (socialContext && socialContext.textContent.includes('reposted')) { repostBy = socialContext.textContent.replace(' reposted', '').trim(); } posts.push({ author: { displayName, handle }, text, timestamp, timeText, url: tweetUrl, metrics, links, media, card: cardData, repostBy }); } catch (e) { // Skip malformed tweets } } return JSON.stringify(posts); })() """ def scrape_via_cdp(port, scroll_pages, output_dir): """Scrape X feed using Chrome DevTools Protocol.""" ws_url = get_ws_url(port) print(f"Connecting to: {ws_url}") ws = websocket.create_connection(ws_url, timeout=30) print("Connected!") all_posts = [] seen_keys = set() for page in range(scroll_pages): print(f"Scraping page {page + 1}/{scroll_pages}...") # Execute extraction JS result = cdp_send(ws, "Runtime.evaluate", { "expression": EXTRACT_JS, "returnByValue": True }) value = result.get("result", {}).get("result", {}).get("value", "[]") posts = json.loads(value) if isinstance(value, str) else [] # Deduplicate new = 0 for post in posts: key = f"{post['author']['handle']}:{post['text'][:80]}" if key not in seen_keys: seen_keys.add(key) all_posts.append(post) new += 1 print(f" Found {len(posts)} posts ({new} new)") if page < scroll_pages - 1: # Scroll down cdp_send(ws, "Runtime.evaluate", { "expression": "window.scrollBy(0, window.innerHeight * 2)" }) time.sleep(3) ws.close() # Save output timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") run_dir = os.path.join(output_dir, timestamp) os.makedirs(run_dir, exist_ok=True) output_file = os.path.join(run_dir, "posts.json") with open(output_file, "w") as f: json.dump({ "timestamp": timestamp, "total_posts": len(all_posts), "posts": all_posts }, f, indent=2) # Also save a human-readable summary summary_file = os.path.join(run_dir, "summary.md") with open(summary_file, "w") as f: f.write(f"# X Feed Scrape — {timestamp}\n\n") f.write(f"**Total posts:** {len(all_posts)}\n\n") for i, post in enumerate(all_posts, 1): author = post['author'] f.write(f"## {i}. {author['displayName']} ({author['handle']})\n") if post.get('repostBy'): f.write(f"*Reposted by {post['repostBy']}*\n") f.write(f"\n{post['text']}\n\n") if post.get('metrics'): m = post['metrics'] parts = [] for k, v in m.items(): parts.append(f"{k}: {v}") f.write(f"šŸ“Š {' | '.join(parts)}\n") if post.get('links'): f.write(f"\nšŸ”— Links:\n") for link in post['links']: f.write(f" - [{link['text']}]({link['url']})\n") if post.get('card'): c = post['card'] f.write(f"\nšŸ“Ž Card: {c.get('title', 'N/A')}\n") if c.get('description'): f.write(f" {c['description']}\n") if post.get('url'): f.write(f"\nšŸ”— {post['url']}\n") f.write(f"\n---\n\n") print(f"\nDone! {len(all_posts)} posts saved to {run_dir}/") print(f" posts.json — structured data") print(f" summary.md — human-readable") return run_dir def scrape_via_xdotool(scroll_pages, output_dir): """Fallback: use xdotool + JS injection via xdg approach. This launches Chrome with remote debugging if not already running.""" # Check if Chrome is running with debugging port try: urllib.request.urlopen("http://127.0.0.1:9222/json", timeout=2) return scrape_via_cdp(9222, scroll_pages, output_dir) except Exception: pass # Launch Chrome with remote debugging print("Launching Chrome with remote debugging...") subprocess.Popen([ "/usr/bin/google-chrome-stable", "--no-sandbox", "--user-data-dir=/home/wdjones/.config/google-chrome", "--remote-debugging-port=9222", "https://x.com/home" ], env={**os.environ, "DISPLAY": ":0"}, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Wait for Chrome to be ready for _ in range(20): try: urllib.request.urlopen("http://127.0.0.1:9222/json", timeout=2) break except Exception: time.sleep(1) else: print("ERROR: Chrome didn't start with debugging port", file=sys.stderr) sys.exit(1) time.sleep(5) # Let page load return scrape_via_cdp(9222, scroll_pages, output_dir) def main(): parser = argparse.ArgumentParser(description="Deep scrape X/Twitter feed") parser.add_argument("--port", type=int, default=9222, help="Chrome debugging port") parser.add_argument("--scroll-pages", type=int, default=5, help="Number of scroll pages") parser.add_argument("--output", default="/home/wdjones/.openclaw/workspace/data/x-feed", help="Output directory") parser.add_argument("--launch", action="store_true", help="Launch Chrome if not running") args = parser.parse_args() os.makedirs(args.output, exist_ok=True) if args.launch: scrape_via_xdotool(args.scroll_pages, args.output) else: scrape_via_cdp(args.port, args.scroll_pages, args.output) if __name__ == "__main__": main()