#!/usr/bin/env python3 """ Analyze scraped X/Twitter posts for money-making signals. Reads posts.json, classifies and scores each post. Usage: python3 analyze-posts.py [--output analysis.json] """ import argparse import json import re import sys from datetime import datetime, timezone # Category keywords/patterns CATEGORIES = { "crypto": { "keywords": ["bitcoin", "btc", "ethereum", "eth", "solana", "sol", "crypto", "token", "defi", "dex", "nft", "airdrop", "memecoin", "altcoin", "bullish", "bearish", "pump", "dump", "moon", "hodl", "whale", "binance", "coinbase", "degen", "rug", "mint", "chain", "staking", "yield", "liquidity", "swap", "bridge"], "weight": 1.0 }, "polymarket": { "keywords": ["polymarket", "prediction market", "kalshi", "manifold", "betting market", "odds", "probability", "yes/no", "shares", "contract"], "weight": 1.0 }, "arbitrage": { "keywords": ["arbitrage", "arb", "spread", "price difference", "cross-exchange", "risk-free", "guaranteed profit", "mismatch", "exploit"], "weight": 1.0 }, "trading": { "keywords": ["long", "short", "leverage", "margin", "futures", "options", "calls", "puts", "entry", "exit", "target", "stop loss", "take profit", "chart", "technical analysis", "support", "resistance", "breakout", "reversal"], "weight": 0.8 }, "money_opportunity": { "keywords": ["free money", "easy money", "passive income", "side hustle", "make money", "earn", "profit", "roi", "returns", "alpha", "signal", "opportunity", "undervalued"], "weight": 0.7 } } # Spam/scam signals SPAM_SIGNALS = { "patterns": [ r"dm me", r"link in bio", r"join my", r"guaranteed \d+%", r"100x", r"1000x", r"send .* to receive", r"whitelist", r"presale", r"limited spots", r"act now", r"don't miss", r"last chance", r"🚀{3,}", r"💰{3,}", r"🔥{3,}", r"follow.*retweet.*like", r"giveaway", r"drop.*wallet", r"reply.*address" ], "weight": -1.0 } # Time sensitivity signals TIME_SENSITIVE = [ r"ending (soon|today|tonight|in \d+)", r"last \d+ (hour|minute|day)", r"expires? (today|tonight|soon|in)", r"deadline", r"closing (soon|in)", r"only \d+ (left|remaining|spots)", r"window closing", r"before .* (ends|closes|expires)" ] def classify_post(post): """Classify a single post and return analysis.""" text = ((post.get("text") or "") + " " + ((post.get("card") or {}).get("title") or "") + " " + ((post.get("card") or {}).get("description") or "")).lower() # Category detection categories = {} for cat_name, cat_info in CATEGORIES.items(): matches = [kw for kw in cat_info["keywords"] if kw in text] if matches: categories[cat_name] = { "matched": matches, "score": min(len(matches) * cat_info["weight"] * 0.2, 1.0) } # Spam detection spam_matches = [] for pattern in SPAM_SIGNALS["patterns"]: if re.search(pattern, text, re.IGNORECASE): spam_matches.append(pattern) spam_score = min(len(spam_matches) * 0.25, 1.0) # Time sensitivity time_sensitive = False time_matches = [] for pattern in TIME_SENSITIVE: m = re.search(pattern, text, re.IGNORECASE) if m: time_sensitive = True time_matches.append(m.group(0)) # Engagement quality (high engagement = more likely legit) metrics = post.get("metrics", {}) engagement_score = 0 try: likes = int(str(metrics.get("likes", "0")).replace(",", "")) reposts = int(str(metrics.get("reposts", "0")).replace(",", "")) views = int(str(metrics.get("views", "0")).replace(",", "")) if views > 0: engagement_rate = (likes + reposts) / views engagement_score = min(engagement_rate * 100, 1.0) except (ValueError, ZeroDivisionError): pass # Has external links (higher value for analysis) external_links = [l for l in post.get("links", []) if l.get("url", "").startswith("http") and "x.com" not in l.get("url", "")] # Overall signal score category_score = max((c["score"] for c in categories.values()), default=0) signal_score = max(0, min(1.0, category_score * 0.4 + engagement_score * 0.2 + (0.1 if external_links else 0) + (0.1 if time_sensitive else 0) - spam_score * 0.3 )) # Verdict if spam_score > 0.5: verdict = "likely_spam" elif signal_score > 0.5 and categories: verdict = "high_signal" elif signal_score > 0.25 and categories: verdict = "medium_signal" elif categories: verdict = "low_signal" else: verdict = "noise" return { "author": post.get("author", {}), "text_preview": post.get("text", "")[:200], "url": post.get("url", ""), "categories": categories, "spam_score": round(spam_score, 2), "spam_matches": spam_matches, "time_sensitive": time_sensitive, "time_matches": time_matches, "engagement_score": round(engagement_score, 2), "external_links": external_links, "signal_score": round(signal_score, 2), "verdict": verdict, "timestamp": post.get("timestamp"), "metrics": metrics } def main(): parser = argparse.ArgumentParser(description="Analyze X feed posts") parser.add_argument("input", help="Path to posts.json") parser.add_argument("--output", help="Output file (default: analysis.json in same dir)") parser.add_argument("--min-signal", type=float, default=0.0, help="Min signal score to include") args = parser.parse_args() with open(args.input) as f: data = json.load(f) posts = data.get("posts", []) print(f"Analyzing {len(posts)} posts...") analyses = [] for post in posts: analysis = classify_post(post) if analysis["signal_score"] >= args.min_signal: analyses.append(analysis) # Sort by signal score descending analyses.sort(key=lambda x: x["signal_score"], reverse=True) # Stats verdicts = {} for a in analyses: v = a["verdict"] verdicts[v] = verdicts.get(v, 0) + 1 result = { "analyzed_at": datetime.now(timezone.utc).isoformat(), "total_posts": len(posts), "analyzed_posts": len(analyses), "verdicts": verdicts, "posts": analyses } # Output output_path = args.output if not output_path: import os output_path = os.path.join(os.path.dirname(args.input), "analysis.json") with open(output_path, "w") as f: json.dump(result, f, indent=2) # Print summary print(f"\n=== Analysis Summary ===") print(f"Total posts: {len(posts)}") for verdict, count in sorted(verdicts.items()): emoji = {"high_signal": "🟢", "medium_signal": "🟡", "low_signal": "⚪", "likely_spam": "🔴", "noise": "⚫"}.get(verdict, "❓") print(f" {emoji} {verdict}: {count}") # Show top signals high = [a for a in analyses if a["verdict"] in ("high_signal", "medium_signal")] if high: print(f"\n=== Top Signals ===") for a in high[:10]: cats = ", ".join(a["categories"].keys()) ts = "⏰" if a["time_sensitive"] else "" print(f" [{a['signal_score']:.2f}] {a['author'].get('handle', '?')} — {cats} {ts}") print(f" {a['text_preview'][:100]}...") print(f"\nSaved to {output_path}") if __name__ == "__main__": main()