390 lines
14 KiB
Python
Executable File
390 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Tweet Analysis Tool - Scrapes and analyzes tweets via Chrome CDP."""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
try:
|
|
from playwright.async_api import async_playwright
|
|
except ImportError:
|
|
print("ERROR: playwright not installed. Run: pip install playwright", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
import yfinance as yf
|
|
except ImportError:
|
|
yf = None
|
|
|
|
|
|
def extract_tickers(text: str) -> list[str]:
|
|
"""Extract $TICKER patterns from text."""
|
|
return list(set(re.findall(r'\$([A-Z]{1,5}(?:\.[A-Z]{1,2})?)', text.upper())))
|
|
|
|
|
|
def lookup_tickers(tickers: list[str]) -> dict:
|
|
"""Look up ticker data via yfinance."""
|
|
if not yf or not tickers:
|
|
return {}
|
|
results = {}
|
|
for t in tickers[:5]: # limit to 5
|
|
try:
|
|
info = yf.Ticker(t).info
|
|
results[t] = {
|
|
"price": info.get("currentPrice") or info.get("regularMarketPrice"),
|
|
"market_cap": info.get("marketCap"),
|
|
"name": info.get("shortName"),
|
|
"volume": info.get("volume"),
|
|
"day_change_pct": info.get("regularMarketChangePercent"),
|
|
"52w_high": info.get("fiftyTwoWeekHigh"),
|
|
"52w_low": info.get("fiftyTwoWeekLow"),
|
|
}
|
|
except Exception:
|
|
results[t] = {"error": "lookup failed"}
|
|
return results
|
|
|
|
|
|
async def scrape_tweet(url: str) -> dict:
|
|
"""Connect to Chrome CDP and scrape tweet data."""
|
|
# Normalize URL
|
|
url = url.replace("twitter.com", "x.com")
|
|
if not url.startswith("http"):
|
|
url = "https://" + url
|
|
|
|
data = {
|
|
"url": url,
|
|
"author": None,
|
|
"handle": None,
|
|
"text": None,
|
|
"timestamp": None,
|
|
"metrics": {},
|
|
"images": [],
|
|
"bio": None,
|
|
"followers": None,
|
|
"following": None,
|
|
"reply_to": None,
|
|
"replies_sample": [],
|
|
"scrape_error": None,
|
|
}
|
|
|
|
async with async_playwright() as p:
|
|
try:
|
|
browser = await p.chromium.connect_over_cdp("http://localhost:9222")
|
|
except Exception as e:
|
|
data["scrape_error"] = f"CDP connection failed: {e}"
|
|
return data
|
|
|
|
try:
|
|
ctx = browser.contexts[0] if browser.contexts else await browser.new_context()
|
|
page = await ctx.new_page()
|
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
|
await page.wait_for_timeout(4000)
|
|
|
|
# Get the main tweet article
|
|
# Try to find the focal tweet
|
|
tweet_sel = 'article[data-testid="tweet"]'
|
|
articles = await page.query_selector_all(tweet_sel)
|
|
|
|
if not articles:
|
|
data["scrape_error"] = "No tweet articles found on page"
|
|
await page.close()
|
|
return data
|
|
|
|
# The focal tweet is typically the one with the largest text or specific structure
|
|
# On a tweet permalink, it's usually the first or second article
|
|
focal = None
|
|
for art in articles:
|
|
# The focal tweet has a different time display (absolute vs relative)
|
|
time_el = await art.query_selector('time')
|
|
if time_el:
|
|
dt = await time_el.get_attribute('datetime')
|
|
if dt:
|
|
focal = art
|
|
data["timestamp"] = dt
|
|
break
|
|
if not focal:
|
|
focal = articles[0]
|
|
|
|
# Author info
|
|
user_links = await focal.query_selector_all('a[role="link"]')
|
|
for link in user_links:
|
|
href = await link.get_attribute("href") or ""
|
|
if href.startswith("/") and href.count("/") == 1 and len(href) > 1:
|
|
spans = await link.query_selector_all("span")
|
|
for span in spans:
|
|
txt = (await span.inner_text()).strip()
|
|
if txt.startswith("@"):
|
|
data["handle"] = txt
|
|
elif txt and not data["author"] and not txt.startswith("@"):
|
|
data["author"] = txt
|
|
break
|
|
|
|
# Tweet text
|
|
text_el = await focal.query_selector('div[data-testid="tweetText"]')
|
|
if text_el:
|
|
data["text"] = await text_el.inner_text()
|
|
|
|
# Metrics (replies, retweets, likes, views)
|
|
group = await focal.query_selector('div[role="group"]')
|
|
if group:
|
|
buttons = await group.query_selector_all('button')
|
|
metric_names = ["replies", "retweets", "likes", "bookmarks"]
|
|
for i, btn in enumerate(buttons):
|
|
aria = await btn.get_attribute("aria-label") or ""
|
|
# Parse numbers from aria labels like "123 replies"
|
|
nums = re.findall(r'[\d,]+', aria)
|
|
if nums and i < len(metric_names):
|
|
data["metrics"][metric_names[i]] = nums[0].replace(",", "")
|
|
|
|
# Views - often in a separate span
|
|
view_spans = await focal.query_selector_all('a[role="link"] span')
|
|
for vs in view_spans:
|
|
txt = (await vs.inner_text()).strip()
|
|
if "views" in txt.lower() or "Views" in txt:
|
|
nums = re.findall(r'[\d,.KkMm]+', txt)
|
|
if nums:
|
|
data["metrics"]["views"] = nums[0]
|
|
|
|
# Images
|
|
imgs = await focal.query_selector_all('img[alt="Image"]')
|
|
for img in imgs:
|
|
src = await img.get_attribute("src")
|
|
if src:
|
|
data["images"].append(src)
|
|
|
|
# Check if it's a reply
|
|
reply_indicators = await page.query_selector_all('div[data-testid="tweet"] a[role="link"]')
|
|
|
|
# Try to get author profile info by hovering or checking
|
|
# We'll grab it from the page if visible
|
|
if data["handle"]:
|
|
handle_clean = data["handle"].lstrip("@")
|
|
# Check for bio/follower info in any hover cards or visible elements
|
|
all_text = await page.inner_text("body")
|
|
# Look for follower patterns
|
|
follower_match = re.search(r'([\d,.]+[KkMm]?)\s+Followers', all_text)
|
|
following_match = re.search(r'([\d,.]+[KkMm]?)\s+Following', all_text)
|
|
if follower_match:
|
|
data["followers"] = follower_match.group(1)
|
|
if following_match:
|
|
data["following"] = following_match.group(1)
|
|
|
|
# Sample some replies (articles after the focal tweet)
|
|
if len(articles) > 1:
|
|
for art in articles[1:4]:
|
|
reply_text_el = await art.query_selector('div[data-testid="tweetText"]')
|
|
if reply_text_el:
|
|
rt = await reply_text_el.inner_text()
|
|
if rt:
|
|
data["replies_sample"].append(rt[:200])
|
|
|
|
await page.close()
|
|
|
|
except Exception as e:
|
|
data["scrape_error"] = str(e)
|
|
try:
|
|
await page.close()
|
|
except:
|
|
pass
|
|
|
|
return data
|
|
|
|
|
|
def analyze(data: dict) -> dict:
|
|
"""Produce structured analysis from scraped data."""
|
|
text = data.get("text") or ""
|
|
tickers = extract_tickers(text)
|
|
ticker_data = lookup_tickers(tickers)
|
|
|
|
# Red flags detection
|
|
red_flags = []
|
|
text_lower = text.lower()
|
|
promo_words = ["100x", "1000x", "moon", "gem", "rocket", "guaranteed", "easy money",
|
|
"don't miss", "last chance", "about to explode", "next big", "sleeping giant",
|
|
"never stops printing", "true freedom", "beat the institutions", "revolution",
|
|
"empire", "vault", "get rich", "financial freedom", "life changing",
|
|
"without a degree", "from a bedroom", "join this"]
|
|
for w in promo_words:
|
|
if w in text_lower:
|
|
red_flags.append(f"Promotional language: '{w}'")
|
|
|
|
if len(tickers) > 3:
|
|
red_flags.append(f"Multiple tickers mentioned ({len(tickers)})")
|
|
|
|
if len(text) > 2000:
|
|
red_flags.append("Extremely long promotional thread")
|
|
if "github" in text_lower and ("star" in text_lower or "repo" in text_lower):
|
|
red_flags.append("Pushing GitHub repo (potential funnel to paid product)")
|
|
if any(w in text_lower for w in ["course", "discord", "premium", "paid group", "subscribe"]):
|
|
red_flags.append("Funneling to paid product/community")
|
|
|
|
# Check replies for coordinated patterns
|
|
replies = data.get("replies_sample", [])
|
|
if replies:
|
|
rocket_replies = sum(1 for r in replies if any(e in r for e in ["🚀", "💎", "🔥", "LFG"]))
|
|
if rocket_replies >= 2:
|
|
red_flags.append("Replies show coordinated hype patterns")
|
|
|
|
# Check for penny stock characteristics
|
|
for t, info in ticker_data.items():
|
|
if isinstance(info, dict) and not info.get("error"):
|
|
price = info.get("price")
|
|
mcap = info.get("market_cap")
|
|
if price and price < 1:
|
|
red_flags.append(f"${t} is a penny stock (${price})")
|
|
if mcap and mcap < 50_000_000:
|
|
red_flags.append(f"${t} micro-cap (<$50M market cap)")
|
|
|
|
# Build verdict
|
|
if len(red_flags) >= 3:
|
|
verdict = "High risk - multiple red flags detected, exercise extreme caution"
|
|
elif len(red_flags) >= 1:
|
|
verdict = "Some concerns - verify claims independently before acting"
|
|
elif tickers:
|
|
verdict = "Worth investigating - do your own due diligence"
|
|
else:
|
|
verdict = "Informational tweet - no immediate financial claims detected"
|
|
|
|
return {
|
|
"tweet_data": data,
|
|
"tickers_found": tickers,
|
|
"ticker_data": ticker_data,
|
|
"red_flags": red_flags,
|
|
"verdict": verdict,
|
|
}
|
|
|
|
|
|
def format_markdown(analysis: dict) -> str:
|
|
"""Format analysis as markdown."""
|
|
d = analysis["tweet_data"]
|
|
lines = [f"# Tweet Analysis", ""]
|
|
lines.append(f"**URL:** {d['url']}")
|
|
lines.append(f"**Analyzed:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
lines.append("")
|
|
|
|
# WHO
|
|
lines.append("## 👤 WHO")
|
|
lines.append(f"- **Author:** {d.get('author') or 'Unknown'}")
|
|
lines.append(f"- **Handle:** {d.get('handle') or 'Unknown'}")
|
|
if d.get("followers"):
|
|
lines.append(f"- **Followers:** {d['followers']}")
|
|
if d.get("following"):
|
|
lines.append(f"- **Following:** {d['following']}")
|
|
if d.get("bio"):
|
|
lines.append(f"- **Bio:** {d['bio']}")
|
|
lines.append("")
|
|
|
|
# WHAT
|
|
lines.append("## 📝 WHAT")
|
|
lines.append(f"> {d.get('text') or 'Could not extract tweet text'}")
|
|
lines.append("")
|
|
if d.get("timestamp"):
|
|
lines.append(f"**Posted:** {d['timestamp']}")
|
|
metrics = d.get("metrics", {})
|
|
if metrics:
|
|
m_parts = [f"{v} {k}" for k, v in metrics.items()]
|
|
lines.append(f"**Metrics:** {' | '.join(m_parts)}")
|
|
if d.get("images"):
|
|
lines.append(f"**Images:** {len(d['images'])} attached")
|
|
lines.append("")
|
|
|
|
# VERIFY
|
|
lines.append("## ✅ VERIFY")
|
|
tickers = analysis.get("tickers_found", [])
|
|
td = analysis.get("ticker_data", {})
|
|
if tickers:
|
|
lines.append(f"**Tickers mentioned:** {', '.join('$' + t for t in tickers)}")
|
|
lines.append("")
|
|
for t, info in td.items():
|
|
if isinstance(info, dict) and not info.get("error"):
|
|
lines.append(f"### ${t}" + (f" - {info.get('name', '')}" if info.get('name') else ""))
|
|
if info.get("price"):
|
|
lines.append(f"- **Price:** ${info['price']}")
|
|
if info.get("market_cap"):
|
|
mc = info["market_cap"]
|
|
if mc > 1e9:
|
|
lines.append(f"- **Market Cap:** ${mc/1e9:.2f}B")
|
|
else:
|
|
lines.append(f"- **Market Cap:** ${mc/1e6:.1f}M")
|
|
if info.get("volume"):
|
|
lines.append(f"- **Volume:** {info['volume']:,}")
|
|
if info.get("day_change_pct"):
|
|
lines.append(f"- **Day Change:** {info['day_change_pct']:.2f}%")
|
|
if info.get("52w_high") and info.get("52w_low"):
|
|
lines.append(f"- **52W Range:** ${info['52w_low']} - ${info['52w_high']}")
|
|
lines.append("")
|
|
elif isinstance(info, dict) and info.get("error"):
|
|
lines.append(f"- ${t}: lookup failed")
|
|
else:
|
|
lines.append("No tickers mentioned in tweet.")
|
|
lines.append("")
|
|
|
|
# RED FLAGS
|
|
lines.append("## 🚩 RED FLAGS")
|
|
flags = analysis.get("red_flags", [])
|
|
if flags:
|
|
for f in flags:
|
|
lines.append(f"- ⚠️ {f}")
|
|
else:
|
|
lines.append("- None detected")
|
|
lines.append("")
|
|
|
|
# MONEY
|
|
lines.append("## 💰 MONEY")
|
|
if tickers and not flags:
|
|
lines.append("Potential opportunity identified. Research further before any position.")
|
|
elif tickers and flags:
|
|
lines.append("Tickers mentioned but red flags present. High risk of promoted/manipulated asset.")
|
|
else:
|
|
lines.append("No direct financial opportunity identified in this tweet.")
|
|
lines.append("")
|
|
|
|
# VERDICT
|
|
lines.append("## 🎯 VERDICT")
|
|
lines.append(f"**{analysis['verdict']}**")
|
|
lines.append("")
|
|
|
|
# Scrape issues
|
|
if d.get("scrape_error"):
|
|
lines.append(f"---\n⚠️ *Scrape warning: {d['scrape_error']}*")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description="Analyze a tweet")
|
|
parser.add_argument("url", help="Tweet URL (x.com or twitter.com)")
|
|
parser.add_argument("--json", action="store_true", dest="json_output", help="Output JSON")
|
|
parser.add_argument("-o", "--output", help="Write output to file")
|
|
args = parser.parse_args()
|
|
|
|
# Validate URL
|
|
if not re.search(r'(x\.com|twitter\.com)/.+/status/\d+', args.url):
|
|
print("ERROR: Invalid tweet URL", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print("Scraping tweet...", file=sys.stderr)
|
|
data = await scrape_tweet(args.url)
|
|
|
|
print("Analyzing...", file=sys.stderr)
|
|
analysis = analyze(data)
|
|
|
|
if args.json_output:
|
|
output = json.dumps(analysis, indent=2, default=str)
|
|
else:
|
|
output = format_markdown(analysis)
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
f.write(output)
|
|
print(f"Written to {args.output}", file=sys.stderr)
|
|
else:
|
|
print(output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|