#!/usr/bin/env python3 """ reddit-scanner - Scan Reddit for sentiment and discussions on topics Uses Reddit's public JSON API (no auth required, rate limited). """ import json import time import re import sys from datetime import datetime from pathlib import Path from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError from urllib.parse import quote_plus from collections import Counter PROJECT_DIR = Path(__file__).parent DATA_DIR = PROJECT_DIR / "data" CACHE_DIR = DATA_DIR / "cache" USER_AGENT = "reddit-scanner/1.0 (personal use)" RATE_LIMIT = 2 # seconds between requests last_request = 0 def fetch_json(url: str) -> dict: """Fetch JSON from Reddit API with rate limiting.""" global last_request # Rate limit elapsed = time.time() - last_request if elapsed < RATE_LIMIT: time.sleep(RATE_LIMIT - elapsed) req = Request(url, headers={'User-Agent': USER_AGENT}) try: with urlopen(req, timeout=15) as resp: last_request = time.time() return json.loads(resp.read().decode('utf-8')) except HTTPError as e: if e.code == 429: print(" Rate limited, waiting...") time.sleep(10) return fetch_json(url) print(f" HTTP Error: {e.code}") return None except URLError as e: print(f" Error: {e}") return None def get_subreddit_posts(subreddit: str, sort: str = "hot", limit: int = 25) -> list: """Fetch posts from a subreddit.""" url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={limit}" data = fetch_json(url) if not data or 'data' not in data: return [] posts = [] for child in data['data'].get('children', []): post = child['data'] posts.append({ 'id': post['id'], 'title': post['title'], 'author': post['author'], 'score': post['score'], 'upvote_ratio': post.get('upvote_ratio', 0), 'num_comments': post['num_comments'], 'url': post['url'], 'selftext': post.get('selftext', '')[:500], 'created': datetime.fromtimestamp(post['created_utc']).isoformat(), 'permalink': f"https://reddit.com{post['permalink']}", 'subreddit': subreddit, }) return posts def get_post_comments(subreddit: str, post_id: str, limit: int = 50) -> list: """Fetch comments from a post.""" url = f"https://www.reddit.com/r/{subreddit}/comments/{post_id}.json?limit={limit}" data = fetch_json(url) if not data or len(data) < 2: return [] comments = [] def extract_comments(children, depth=0): for child in children: if child['kind'] != 't1': continue comment = child['data'] comments.append({ 'id': comment['id'], 'author': comment['author'], 'body': comment['body'][:1000], 'score': comment['score'], 'depth': depth, 'created': datetime.fromtimestamp(comment['created_utc']).isoformat(), }) # Get replies if comment.get('replies') and isinstance(comment['replies'], dict): replies = comment['replies']['data'].get('children', []) extract_comments(replies, depth + 1) extract_comments(data[1]['data'].get('children', [])) return comments def search_reddit(query: str, subreddit: str = None, sort: str = "relevance", limit: int = 25) -> list: """Search Reddit for a query.""" encoded_query = quote_plus(query) if subreddit: url = f"https://www.reddit.com/r/{subreddit}/search.json?q={encoded_query}&restrict_sr=1&sort={sort}&limit={limit}" else: url = f"https://www.reddit.com/search.json?q={encoded_query}&sort={sort}&limit={limit}" data = fetch_json(url) if not data or 'data' not in data: return [] posts = [] for child in data['data'].get('children', []): post = child['data'] posts.append({ 'id': post['id'], 'title': post['title'], 'author': post['author'], 'score': post['score'], 'num_comments': post['num_comments'], 'subreddit': post['subreddit'], 'permalink': f"https://reddit.com{post['permalink']}", 'created': datetime.fromtimestamp(post['created_utc']).isoformat(), }) return posts def analyze_sentiment(texts: list) -> dict: """Simple keyword-based sentiment analysis.""" positive_words = {'good', 'great', 'awesome', 'love', 'best', 'amazing', 'excellent', 'fantastic', 'wonderful', 'perfect', 'nice', 'thanks', 'helpful', 'brilliant', 'impressive', 'excited', 'happy', 'beautiful'} negative_words = {'bad', 'terrible', 'awful', 'hate', 'worst', 'horrible', 'sucks', 'disappointing', 'frustrated', 'annoying', 'broken', 'useless', 'stupid', 'boring', 'ugly', 'failed', 'waste', 'problem', 'issue'} positive = 0 negative = 0 neutral = 0 word_freq = Counter() for text in texts: words = set(re.findall(r'\b\w+\b', text.lower())) word_freq.update(words) pos_count = len(words & positive_words) neg_count = len(words & negative_words) if pos_count > neg_count: positive += 1 elif neg_count > pos_count: negative += 1 else: neutral += 1 total = positive + negative + neutral return { 'positive': positive, 'negative': negative, 'neutral': neutral, 'total': total, 'positive_pct': round(positive / total * 100, 1) if total else 0, 'negative_pct': round(negative / total * 100, 1) if total else 0, 'top_words': word_freq.most_common(20), } def scan_topic(topic: str, subreddits: list = None): """Scan for a topic across subreddits.""" print(f"\nšŸ” Scanning Reddit for: {topic}") print("=" * 50) if not subreddits: subreddits = ['all'] all_posts = [] all_comments = [] for sub in subreddits: print(f"\nšŸ“Œ r/{sub}") posts = search_reddit(topic, sub if sub != 'all' else None, limit=10) print(f" Found {len(posts)} posts") for post in posts[:3]: # Get comments from top 3 print(f" Fetching comments from: {post['title'][:40]}...") comments = get_post_comments(post['subreddit'], post['id'], limit=30) all_comments.extend([c['body'] for c in comments]) all_posts.extend(posts) # Analyze print(f"\nšŸ“Š Analysis") print("-" * 30) if all_posts: avg_score = sum(p['score'] for p in all_posts) / len(all_posts) avg_comments = sum(p['num_comments'] for p in all_posts) / len(all_posts) print(f"Posts analyzed: {len(all_posts)}") print(f"Avg score: {avg_score:.0f}") print(f"Avg comments: {avg_comments:.0f}") if all_comments: sentiment = analyze_sentiment(all_comments) print(f"\nComment sentiment ({sentiment['total']} comments):") print(f" šŸ‘ Positive: {sentiment['positive_pct']}%") print(f" šŸ‘Ž Negative: {sentiment['negative_pct']}%") print(f" 😐 Neutral: {100 - sentiment['positive_pct'] - sentiment['negative_pct']:.1f}%") print(f"\nTop words:") for word, count in sentiment['top_words'][:10]: if len(word) > 3: print(f" {word}: {count}") # Top posts print(f"\nšŸ”„ Top Posts") print("-" * 30) for post in sorted(all_posts, key=lambda x: x['score'], reverse=True)[:5]: print(f"[{post['score']:4}] {post['title'][:50]}...") print(f" r/{post['subreddit']} | {post['num_comments']} comments") print() def show_subreddit(subreddit: str, sort: str = "hot"): """Show posts from a subreddit.""" print(f"\nšŸ“Œ r/{subreddit} ({sort})") print("=" * 50) posts = get_subreddit_posts(subreddit, sort, limit=15) if not posts: print("No posts found or subreddit doesn't exist") return for post in posts: print(f"\n[{post['score']:4}] {post['title'][:60]}") print(f" {post['num_comments']} comments | u/{post['author']}") def main(): if len(sys.argv) < 2: print("Usage:") print(" reddit-scanner topic [subreddits...]") print(" reddit-scanner sub [hot|new|top]") print(" reddit-scanner search ") print("") print("Examples:") print(" reddit-scanner topic 'artificial intelligence' technology futurology") print(" reddit-scanner sub python hot") print(" reddit-scanner search 'rust programming'") return cmd = sys.argv[1] if cmd == 'topic' and len(sys.argv) > 2: topic = sys.argv[2] subreddits = sys.argv[3:] if len(sys.argv) > 3 else None scan_topic(topic, subreddits) elif cmd == 'sub' and len(sys.argv) > 2: subreddit = sys.argv[2] sort = sys.argv[3] if len(sys.argv) > 3 else 'hot' show_subreddit(subreddit, sort) elif cmd == 'search' and len(sys.argv) > 2: query = ' '.join(sys.argv[2:]) posts = search_reddit(query, limit=15) print(f"\nšŸ” Search: {query}") print("=" * 50) for post in posts: print(f"\n[{post['score']:4}] {post['title'][:60]}") print(f" r/{post['subreddit']} | {post['num_comments']} comments") else: print("Unknown command. Run without args for help.") if __name__ == "__main__": main()