Files
workspace/skills/deep-scraper/scripts/analyze-posts.py
Case 8638500190 Feed Hunter: deep scraper skill, pipeline, simulator, first investigation
- Built deep-scraper skill (CDP-based X feed extraction)
- Three-stage pipeline: scrape → triage → investigate
- Paper trading simulator with position tracking
- First live investigation: verified kch123 Polymarket profile ($9.3M P&L)
- Opened first paper position: Seahawks Super Bowl @ 68c
- Telegram alerts with inline action buttons
- Portal build in progress (night shift)
2026-02-07 23:58:40 -06:00

238 lines
7.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Analyze scraped X/Twitter posts for money-making signals.
Reads posts.json, classifies and scores each post.
Usage:
python3 analyze-posts.py <path-to-posts.json> [--output analysis.json]
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
# Category keywords/patterns
CATEGORIES = {
"crypto": {
"keywords": ["bitcoin", "btc", "ethereum", "eth", "solana", "sol", "crypto",
"token", "defi", "dex", "nft", "airdrop", "memecoin", "altcoin",
"bullish", "bearish", "pump", "dump", "moon", "hodl", "whale",
"binance", "coinbase", "degen", "rug", "mint", "chain",
"staking", "yield", "liquidity", "swap", "bridge"],
"weight": 1.0
},
"polymarket": {
"keywords": ["polymarket", "prediction market", "kalshi", "manifold",
"betting market", "odds", "probability", "yes/no",
"shares", "contract"],
"weight": 1.0
},
"arbitrage": {
"keywords": ["arbitrage", "arb", "spread", "price difference",
"cross-exchange", "risk-free", "guaranteed profit",
"mismatch", "exploit"],
"weight": 1.0
},
"trading": {
"keywords": ["long", "short", "leverage", "margin", "futures",
"options", "calls", "puts", "entry", "exit", "target",
"stop loss", "take profit", "chart", "technical analysis",
"support", "resistance", "breakout", "reversal"],
"weight": 0.8
},
"money_opportunity": {
"keywords": ["free money", "easy money", "passive income", "side hustle",
"make money", "earn", "profit", "roi", "returns",
"alpha", "signal", "opportunity", "undervalued"],
"weight": 0.7
}
}
# Spam/scam signals
SPAM_SIGNALS = {
"patterns": [
r"dm me", r"link in bio", r"join my", r"guaranteed \d+%",
r"100x", r"1000x", r"send .* to receive",
r"whitelist", r"presale", r"limited spots",
r"act now", r"don't miss", r"last chance",
r"🚀{3,}", r"💰{3,}", r"🔥{3,}",
r"follow.*retweet.*like", r"giveaway",
r"drop.*wallet", r"reply.*address"
],
"weight": -1.0
}
# Time sensitivity signals
TIME_SENSITIVE = [
r"ending (soon|today|tonight|in \d+)",
r"last \d+ (hour|minute|day)",
r"expires? (today|tonight|soon|in)",
r"deadline",
r"closing (soon|in)",
r"only \d+ (left|remaining|spots)",
r"window closing",
r"before .* (ends|closes|expires)"
]
def classify_post(post):
"""Classify a single post and return analysis."""
text = ((post.get("text") or "") + " " +
((post.get("card") or {}).get("title") or "") + " " +
((post.get("card") or {}).get("description") or "")).lower()
# Category detection
categories = {}
for cat_name, cat_info in CATEGORIES.items():
matches = [kw for kw in cat_info["keywords"] if kw in text]
if matches:
categories[cat_name] = {
"matched": matches,
"score": min(len(matches) * cat_info["weight"] * 0.2, 1.0)
}
# Spam detection
spam_matches = []
for pattern in SPAM_SIGNALS["patterns"]:
if re.search(pattern, text, re.IGNORECASE):
spam_matches.append(pattern)
spam_score = min(len(spam_matches) * 0.25, 1.0)
# Time sensitivity
time_sensitive = False
time_matches = []
for pattern in TIME_SENSITIVE:
m = re.search(pattern, text, re.IGNORECASE)
if m:
time_sensitive = True
time_matches.append(m.group(0))
# Engagement quality (high engagement = more likely legit)
metrics = post.get("metrics", {})
engagement_score = 0
try:
likes = int(str(metrics.get("likes", "0")).replace(",", ""))
reposts = int(str(metrics.get("reposts", "0")).replace(",", ""))
views = int(str(metrics.get("views", "0")).replace(",", ""))
if views > 0:
engagement_rate = (likes + reposts) / views
engagement_score = min(engagement_rate * 100, 1.0)
except (ValueError, ZeroDivisionError):
pass
# Has external links (higher value for analysis)
external_links = [l for l in post.get("links", [])
if l.get("url", "").startswith("http") and "x.com" not in l.get("url", "")]
# Overall signal score
category_score = max((c["score"] for c in categories.values()), default=0)
signal_score = max(0, min(1.0,
category_score * 0.4 +
engagement_score * 0.2 +
(0.1 if external_links else 0) +
(0.1 if time_sensitive else 0) -
spam_score * 0.3
))
# Verdict
if spam_score > 0.5:
verdict = "likely_spam"
elif signal_score > 0.5 and categories:
verdict = "high_signal"
elif signal_score > 0.25 and categories:
verdict = "medium_signal"
elif categories:
verdict = "low_signal"
else:
verdict = "noise"
return {
"author": post.get("author", {}),
"text_preview": post.get("text", "")[:200],
"url": post.get("url", ""),
"categories": categories,
"spam_score": round(spam_score, 2),
"spam_matches": spam_matches,
"time_sensitive": time_sensitive,
"time_matches": time_matches,
"engagement_score": round(engagement_score, 2),
"external_links": external_links,
"signal_score": round(signal_score, 2),
"verdict": verdict,
"timestamp": post.get("timestamp"),
"metrics": metrics
}
def main():
parser = argparse.ArgumentParser(description="Analyze X feed posts")
parser.add_argument("input", help="Path to posts.json")
parser.add_argument("--output", help="Output file (default: analysis.json in same dir)")
parser.add_argument("--min-signal", type=float, default=0.0, help="Min signal score to include")
args = parser.parse_args()
with open(args.input) as f:
data = json.load(f)
posts = data.get("posts", [])
print(f"Analyzing {len(posts)} posts...")
analyses = []
for post in posts:
analysis = classify_post(post)
if analysis["signal_score"] >= args.min_signal:
analyses.append(analysis)
# Sort by signal score descending
analyses.sort(key=lambda x: x["signal_score"], reverse=True)
# Stats
verdicts = {}
for a in analyses:
v = a["verdict"]
verdicts[v] = verdicts.get(v, 0) + 1
result = {
"analyzed_at": datetime.now(timezone.utc).isoformat(),
"total_posts": len(posts),
"analyzed_posts": len(analyses),
"verdicts": verdicts,
"posts": analyses
}
# Output
output_path = args.output
if not output_path:
import os
output_path = os.path.join(os.path.dirname(args.input), "analysis.json")
with open(output_path, "w") as f:
json.dump(result, f, indent=2)
# Print summary
print(f"\n=== Analysis Summary ===")
print(f"Total posts: {len(posts)}")
for verdict, count in sorted(verdicts.items()):
emoji = {"high_signal": "🟢", "medium_signal": "🟡", "low_signal": "",
"likely_spam": "🔴", "noise": ""}.get(verdict, "")
print(f" {emoji} {verdict}: {count}")
# Show top signals
high = [a for a in analyses if a["verdict"] in ("high_signal", "medium_signal")]
if high:
print(f"\n=== Top Signals ===")
for a in high[:10]:
cats = ", ".join(a["categories"].keys())
ts = "" if a["time_sensitive"] else ""
print(f" [{a['signal_score']:.2f}] {a['author'].get('handle', '?')}{cats} {ts}")
print(f" {a['text_preview'][:100]}...")
print(f"\nSaved to {output_path}")
if __name__ == "__main__":
main()