#!/usr/bin/env python3 """ Reddit Market Intel — Scan Reddit for crypto/market sentiment and alpha. Scans configurable subreddits for hot topics, sentiment shifts, and emerging narratives. No API key required (uses public JSON endpoints). Usage: python3 reddit-market-intel.py # Full scan python3 reddit-market-intel.py --quick # Quick scan (top posts only) python3 reddit-market-intel.py --topic "solana" # Scan for specific topic """ import json import time import re import sys from datetime import datetime, timezone from pathlib import Path from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError from collections import Counter DATA_DIR = Path(__file__).parent.parent / "data" / "reddit-intel" DATA_DIR.mkdir(parents=True, exist_ok=True) USER_AGENT = "reddit-market-intel/1.0" RATE_LIMIT = 2 last_request = 0 DEFAULT_SUBREDDITS = [ "cryptocurrency", "Bitcoin", "ethtrader", "CryptoMarkets", "wallstreetbets", "stocks", "investing", "economy", "solana", "defi", "polymarket" ] SENTIMENT_POSITIVE = {"bullish", "moon", "pump", "gains", "breakout", "ath", "buy", "long", "šŸš€", "šŸ’Ž", "accumulate", "undervalued", "rally"} SENTIMENT_NEGATIVE = {"bearish", "dump", "crash", "sell", "short", "scam", "rug", "overvalued", "bubble", "liquidat", "rekt", "fear"} def fetch_json(url): global last_request elapsed = time.time() - last_request if elapsed < RATE_LIMIT: time.sleep(RATE_LIMIT - elapsed) req = Request(url, headers={'User-Agent': USER_AGENT}) try: with urlopen(req, timeout=15) as resp: last_request = time.time() return json.loads(resp.read().decode('utf-8')) except (HTTPError, URLError) as e: if hasattr(e, 'code') and e.code == 429: time.sleep(10) return fetch_json(url) return None def get_hot_posts(subreddit, limit=10): data = fetch_json(f"https://www.reddit.com/r/{subreddit}/hot.json?limit={limit}") if not data: return [] posts = [] for child in data.get("data", {}).get("children", []): d = child.get("data", {}) posts.append({ "title": d.get("title", ""), "score": d.get("score", 0), "comments": d.get("num_comments", 0), "url": f"https://reddit.com{d.get('permalink', '')}", "created": datetime.fromtimestamp(d.get("created_utc", 0), tz=timezone.utc).isoformat(), "selftext": (d.get("selftext", "") or "")[:500], "flair": d.get("link_flair_text", ""), "subreddit": subreddit, }) return posts def analyze_sentiment(text): words = set(text.lower().split()) pos = len(words & SENTIMENT_POSITIVE) neg = len(words & SENTIMENT_NEGATIVE) if pos > neg: return "bullish", pos - neg elif neg > pos: return "bearish", neg - pos return "neutral", 0 def extract_tickers(text): """Extract potential crypto/stock tickers ($BTC, $ETH, etc.)""" return list(set(re.findall(r'\$([A-Z]{2,6})', text))) def scan_subreddit(subreddit, limit=10): print(f" šŸ“” r/{subreddit}...", end=" ", flush=True) posts = get_hot_posts(subreddit, limit) if not posts: print("āŒ failed") return None sentiments = [] all_tickers = [] high_engagement = [] for p in posts: full_text = f"{p['title']} {p['selftext']}" sent, strength = analyze_sentiment(full_text) p["sentiment"] = sent p["sentiment_strength"] = strength sentiments.append(sent) tickers = extract_tickers(full_text) all_tickers.extend(tickers) p["tickers"] = tickers # High engagement = lots of comments relative to score if p["comments"] > 50 or p["score"] > 500: high_engagement.append(p) sentiment_counts = Counter(sentiments) ticker_counts = Counter(all_tickers) overall = "neutral" if sentiment_counts.get("bullish", 0) > sentiment_counts.get("bearish", 0): overall = "bullish" elif sentiment_counts.get("bearish", 0) > sentiment_counts.get("bullish", 0): overall = "bearish" print(f"āœ… {len(posts)} posts | sentiment: {overall}") return { "subreddit": subreddit, "posts": posts, "overall_sentiment": overall, "sentiment_breakdown": dict(sentiment_counts), "trending_tickers": dict(ticker_counts.most_common(10)), "high_engagement": high_engagement, } def search_topic(topic, limit=25): """Search Reddit for a specific topic across all subreddits.""" print(f"\nšŸ” Searching Reddit for: '{topic}'") data = fetch_json(f"https://www.reddit.com/search.json?q={topic}&sort=hot&limit={limit}") if not data: return [] posts = [] for child in data.get("data", {}).get("children", []): d = child.get("data", {}) full_text = f"{d.get('title', '')} {d.get('selftext', '')[:300]}" sent, strength = analyze_sentiment(full_text) posts.append({ "title": d.get("title", ""), "subreddit": d.get("subreddit", ""), "score": d.get("score", 0), "comments": d.get("num_comments", 0), "sentiment": sent, "tickers": extract_tickers(full_text), "url": f"https://reddit.com{d.get('permalink', '')}", }) return posts def run_full_scan(quick=False): print(f"\nšŸ”Ž Reddit Market Intel — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) limit = 5 if quick else 15 results = [] all_tickers = Counter() sentiment_summary = Counter() print("\nšŸ“Š Scanning subreddits:") for sub in DEFAULT_SUBREDDITS: result = scan_subreddit(sub, limit) if result: results.append(result) all_tickers.update(result["trending_tickers"]) sentiment_summary[result["overall_sentiment"]] += 1 # Build report report = { "timestamp": datetime.now().isoformat(), "subreddits_scanned": len(results), "overall_market_sentiment": max(sentiment_summary, key=sentiment_summary.get) if sentiment_summary else "neutral", "sentiment_by_sub": {r["subreddit"]: r["overall_sentiment"] for r in results}, "top_tickers": dict(all_tickers.most_common(20)), "high_engagement_posts": [], } for r in results: report["high_engagement_posts"].extend(r["high_engagement"][:3]) # Sort high engagement by comments report["high_engagement_posts"].sort(key=lambda x: x["comments"], reverse=True) report["high_engagement_posts"] = report["high_engagement_posts"][:15] # Save report_file = DATA_DIR / f"scan-{datetime.now().strftime('%Y%m%d-%H%M')}.json" report_file.write_text(json.dumps(report, indent=2)) # Also save latest (DATA_DIR / "latest.json").write_text(json.dumps(report, indent=2)) # Print summary print(f"\n{'=' * 60}") print(f"šŸ“ˆ Overall Market Sentiment: {report['overall_market_sentiment'].upper()}") print(f"\nšŸ·ļø Trending Tickers: {', '.join(f'${t}({c})' for t, c in all_tickers.most_common(10))}" if all_tickers else "") if report["high_engagement_posts"]: print(f"\nšŸ”„ Top Engagement Posts:") for p in report["high_engagement_posts"][:5]: print(f" [{p['sentiment']}] r/{p['subreddit']} | ⬆{p['score']} šŸ’¬{p['comments']}") print(f" {p['title'][:80]}") print(f"\nšŸ’¾ Report saved: {report_file}") return report if __name__ == "__main__": if "--topic" in sys.argv: idx = sys.argv.index("--topic") topic = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "crypto" posts = search_topic(topic) for p in posts[:10]: print(f" [{p['sentiment']}] r/{p['subreddit']} ⬆{p['score']} šŸ’¬{p['comments']}") print(f" {p['title'][:80]}") elif "--quick" in sys.argv: run_full_scan(quick=True) else: run_full_scan()