Files
workspace/skills/deep-scraper/scripts/scrape-x-feed.py
Case 8638500190 Feed Hunter: deep scraper skill, pipeline, simulator, first investigation
- Built deep-scraper skill (CDP-based X feed extraction)
- Three-stage pipeline: scrape → triage → investigate
- Paper trading simulator with position tracking
- First live investigation: verified kch123 Polymarket profile ($9.3M P&L)
- Opened first paper position: Seahawks Super Bowl @ 68c
- Telegram alerts with inline action buttons
- Portal build in progress (night shift)
2026-02-07 23:58:40 -06:00

345 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Deep scraper for X/Twitter feed — extracts structured post data via CDP.
Connects to an existing Chrome instance with remote debugging enabled.
Usage:
python3 scrape-x-feed.py [--port 9222] [--scroll-pages 5] [--output DIR]
Requires: Chrome running with --remote-debugging-port=9222
"""
import argparse
import json
import os
import sys
import time
import subprocess
import urllib.request
from datetime import datetime
import websocket
def cdp_send(ws, method, params=None, msg_id=[0]):
"""Send CDP command over an open WebSocket connection."""
msg_id[0] += 1
mid = msg_id[0]
msg = {"id": mid, "method": method, "params": params or {}}
ws.settimeout(15)
ws.send(json.dumps(msg))
for _ in range(100): # max 100 events before giving up
try:
resp = json.loads(ws.recv())
except Exception:
return {"error": "timeout"}
if resp.get("id") == mid:
return resp
# Skip CDP events
return {"error": "too many events"}
def get_ws_url(port):
"""Get WebSocket debugger URL from Chrome DevTools."""
url = f"http://127.0.0.1:{port}/json"
resp = urllib.request.urlopen(url, timeout=5)
tabs = json.loads(resp.read())
for tab in tabs:
if "x.com" in tab.get("url", ""):
return tab["webSocketDebuggerUrl"]
# Fallback to first tab
if tabs:
return tabs[0]["webSocketDebuggerUrl"]
raise RuntimeError("No Chrome tabs found")
# JavaScript to extract tweets from the DOM
EXTRACT_JS = r"""
(() => {
const posts = [];
const seen = new Set();
// X/Twitter uses article elements for tweets
const articles = document.querySelectorAll('article[data-testid="tweet"]');
for (const article of articles) {
try {
// Author info
const userLinks = article.querySelectorAll('a[role="link"]');
let displayName = '';
let handle = '';
for (const link of userLinks) {
const href = link.getAttribute('href') || '';
if (href.match(/^\/[a-zA-Z0-9_]+$/) && !href.includes('/status/')) {
if (!handle) handle = href.replace('/', '@');
const nameEl = link.querySelector('span');
if (nameEl && !displayName) displayName = nameEl.textContent.trim();
}
}
// Tweet text
const textEl = article.querySelector('[data-testid="tweetText"]');
const text = textEl ? textEl.textContent.trim() : '';
// Skip if we've seen this exact text+author combo
const key = `${handle}:${text.slice(0, 50)}`;
if (seen.has(key)) continue;
seen.add(key);
// Timestamp
const timeEl = article.querySelector('time');
const timestamp = timeEl ? timeEl.getAttribute('datetime') : null;
const timeText = timeEl ? timeEl.textContent.trim() : '';
// Link to tweet
let tweetUrl = '';
const statusLinks = article.querySelectorAll('a[href*="/status/"]');
for (const sl of statusLinks) {
const href = sl.getAttribute('href') || '';
if (href.match(/\/status\/\d+$/)) {
tweetUrl = 'https://x.com' + href;
break;
}
}
// Engagement metrics
const metrics = {};
const groups = article.querySelectorAll('[role="group"]');
for (const group of groups) {
const buttons = group.querySelectorAll('button');
for (const btn of buttons) {
const label = btn.getAttribute('aria-label') || '';
if (label.includes('repl')) {
const m = label.match(/(\d[\d,.]*)/);
if (m) metrics.replies = m[1];
} else if (label.includes('repost') || label.includes('Repost')) {
const m = label.match(/(\d[\d,.]*)/);
if (m) metrics.reposts = m[1];
} else if (label.includes('like') || label.includes('Like')) {
const m = label.match(/(\d[\d,.]*)/);
if (m) metrics.likes = m[1];
} else if (label.includes('view') || label.includes('View')) {
const m = label.match(/(\d[\d,.]*)/);
if (m) metrics.views = m[1];
} else if (label.includes('bookmark') || label.includes('Bookmark')) {
const m = label.match(/(\d[\d,.]*)/);
if (m) metrics.bookmarks = m[1];
}
}
}
// Embedded links
const links = [];
if (textEl) {
const anchors = textEl.querySelectorAll('a');
for (const a of anchors) {
const href = a.getAttribute('href') || '';
const linkText = a.textContent.trim();
if (href && !href.startsWith('/')) {
links.push({ url: href, text: linkText });
} else if (href.startsWith('/')) {
links.push({ url: 'https://x.com' + href, text: linkText });
}
}
}
// Media (images/video indicators)
const media = [];
const imgs = article.querySelectorAll('[data-testid="tweetPhoto"] img');
for (const img of imgs) {
media.push({ type: 'image', src: img.src });
}
const videoEl = article.querySelector('[data-testid="videoPlayer"]');
if (videoEl) media.push({ type: 'video' });
// Card/preview
const card = article.querySelector('[data-testid="card.wrapper"]');
let cardData = null;
if (card) {
const cardTitle = card.querySelector('[data-testid="card.layoutLarge.title"], [data-testid="card.layoutSmall.title"]');
const cardDesc = card.querySelector('[data-testid="card.layoutLarge.description"], [data-testid="card.layoutSmall.description"]');
const cardLink = card.querySelector('a');
cardData = {
title: cardTitle ? cardTitle.textContent.trim() : null,
description: cardDesc ? cardDesc.textContent.trim() : null,
url: cardLink ? cardLink.getAttribute('href') : null
};
}
// Is it a repost?
const socialContext = article.querySelector('[data-testid="socialContext"]');
let repostBy = null;
if (socialContext && socialContext.textContent.includes('reposted')) {
repostBy = socialContext.textContent.replace(' reposted', '').trim();
}
posts.push({
author: { displayName, handle },
text,
timestamp,
timeText,
url: tweetUrl,
metrics,
links,
media,
card: cardData,
repostBy
});
} catch (e) {
// Skip malformed tweets
}
}
return JSON.stringify(posts);
})()
"""
def scrape_via_cdp(port, scroll_pages, output_dir):
"""Scrape X feed using Chrome DevTools Protocol."""
ws_url = get_ws_url(port)
print(f"Connecting to: {ws_url}")
ws = websocket.create_connection(ws_url, timeout=30)
print("Connected!")
all_posts = []
seen_keys = set()
for page in range(scroll_pages):
print(f"Scraping page {page + 1}/{scroll_pages}...")
# Execute extraction JS
result = cdp_send(ws, "Runtime.evaluate", {
"expression": EXTRACT_JS,
"returnByValue": True
})
value = result.get("result", {}).get("result", {}).get("value", "[]")
posts = json.loads(value) if isinstance(value, str) else []
# Deduplicate
new = 0
for post in posts:
key = f"{post['author']['handle']}:{post['text'][:80]}"
if key not in seen_keys:
seen_keys.add(key)
all_posts.append(post)
new += 1
print(f" Found {len(posts)} posts ({new} new)")
if page < scroll_pages - 1:
# Scroll down
cdp_send(ws, "Runtime.evaluate", {
"expression": "window.scrollBy(0, window.innerHeight * 2)"
})
time.sleep(3)
ws.close()
# Save output
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
run_dir = os.path.join(output_dir, timestamp)
os.makedirs(run_dir, exist_ok=True)
output_file = os.path.join(run_dir, "posts.json")
with open(output_file, "w") as f:
json.dump({
"timestamp": timestamp,
"total_posts": len(all_posts),
"posts": all_posts
}, f, indent=2)
# Also save a human-readable summary
summary_file = os.path.join(run_dir, "summary.md")
with open(summary_file, "w") as f:
f.write(f"# X Feed Scrape — {timestamp}\n\n")
f.write(f"**Total posts:** {len(all_posts)}\n\n")
for i, post in enumerate(all_posts, 1):
author = post['author']
f.write(f"## {i}. {author['displayName']} ({author['handle']})\n")
if post.get('repostBy'):
f.write(f"*Reposted by {post['repostBy']}*\n")
f.write(f"\n{post['text']}\n\n")
if post.get('metrics'):
m = post['metrics']
parts = []
for k, v in m.items():
parts.append(f"{k}: {v}")
f.write(f"📊 {' | '.join(parts)}\n")
if post.get('links'):
f.write(f"\n🔗 Links:\n")
for link in post['links']:
f.write(f" - [{link['text']}]({link['url']})\n")
if post.get('card'):
c = post['card']
f.write(f"\n📎 Card: {c.get('title', 'N/A')}\n")
if c.get('description'):
f.write(f" {c['description']}\n")
if post.get('url'):
f.write(f"\n🔗 {post['url']}\n")
f.write(f"\n---\n\n")
print(f"\nDone! {len(all_posts)} posts saved to {run_dir}/")
print(f" posts.json — structured data")
print(f" summary.md — human-readable")
return run_dir
def scrape_via_xdotool(scroll_pages, output_dir):
"""Fallback: use xdotool + JS injection via xdg approach.
This launches Chrome with remote debugging if not already running."""
# Check if Chrome is running with debugging port
try:
urllib.request.urlopen("http://127.0.0.1:9222/json", timeout=2)
return scrape_via_cdp(9222, scroll_pages, output_dir)
except Exception:
pass
# Launch Chrome with remote debugging
print("Launching Chrome with remote debugging...")
subprocess.Popen([
"/usr/bin/google-chrome-stable",
"--no-sandbox",
"--user-data-dir=/home/wdjones/.config/google-chrome",
"--remote-debugging-port=9222",
"https://x.com/home"
], env={**os.environ, "DISPLAY": ":0"},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Wait for Chrome to be ready
for _ in range(20):
try:
urllib.request.urlopen("http://127.0.0.1:9222/json", timeout=2)
break
except Exception:
time.sleep(1)
else:
print("ERROR: Chrome didn't start with debugging port", file=sys.stderr)
sys.exit(1)
time.sleep(5) # Let page load
return scrape_via_cdp(9222, scroll_pages, output_dir)
def main():
parser = argparse.ArgumentParser(description="Deep scrape X/Twitter feed")
parser.add_argument("--port", type=int, default=9222, help="Chrome debugging port")
parser.add_argument("--scroll-pages", type=int, default=5, help="Number of scroll pages")
parser.add_argument("--output", default="/home/wdjones/.openclaw/workspace/data/x-feed",
help="Output directory")
parser.add_argument("--launch", action="store_true", help="Launch Chrome if not running")
args = parser.parse_args()
os.makedirs(args.output, exist_ok=True)
if args.launch:
scrape_via_xdotool(args.scroll_pages, args.output)
else:
scrape_via_cdp(args.port, args.scroll_pages, args.output)
if __name__ == "__main__":
main()