diff --git a/projects/reddit-scanner/.gitignore b/projects/reddit-scanner/.gitignore new file mode 100644 index 0000000..151af41 --- /dev/null +++ b/projects/reddit-scanner/.gitignore @@ -0,0 +1,7 @@ +venv/ +__pycache__/ +*.pyc +.env +*.egg-info/ +dist/ +build/ diff --git a/projects/reddit-scanner/README.md b/projects/reddit-scanner/README.md new file mode 100644 index 0000000..5867236 --- /dev/null +++ b/projects/reddit-scanner/README.md @@ -0,0 +1,17 @@ +# reddit-scanner + +Scan Reddit for sentiment and discussions on topics + +## Setup + +```bash +python -m venv venv +source venv/bin/activate +pip install -r requirements.txt +``` + +## Usage + +```bash +python main.py +``` diff --git a/projects/reddit-scanner/main.py b/projects/reddit-scanner/main.py new file mode 100755 index 0000000..0ab23e5 --- /dev/null +++ b/projects/reddit-scanner/main.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +""" +reddit-scanner - Scan Reddit for sentiment and discussions on topics + +Uses Reddit's public JSON API (no auth required, rate limited). +""" + +import json +import time +import re +import sys +from datetime import datetime +from pathlib import Path +from urllib.request import urlopen, Request +from urllib.error import URLError, HTTPError +from urllib.parse import quote_plus +from collections import Counter + +PROJECT_DIR = Path(__file__).parent +DATA_DIR = PROJECT_DIR / "data" +CACHE_DIR = DATA_DIR / "cache" + +USER_AGENT = "reddit-scanner/1.0 (personal use)" +RATE_LIMIT = 2 # seconds between requests + +last_request = 0 + +def fetch_json(url: str) -> dict: + """Fetch JSON from Reddit API with rate limiting.""" + global last_request + + # Rate limit + elapsed = time.time() - last_request + if elapsed < RATE_LIMIT: + time.sleep(RATE_LIMIT - elapsed) + + req = Request(url, headers={'User-Agent': USER_AGENT}) + try: + with urlopen(req, timeout=15) as resp: + last_request = time.time() + return json.loads(resp.read().decode('utf-8')) + except HTTPError as e: + if e.code == 429: + print(" Rate limited, waiting...") + time.sleep(10) + return fetch_json(url) + print(f" HTTP Error: {e.code}") + return None + except URLError as e: + print(f" Error: {e}") + return None + +def get_subreddit_posts(subreddit: str, sort: str = "hot", limit: int = 25) -> list: + """Fetch posts from a subreddit.""" + url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={limit}" + data = fetch_json(url) + + if not data or 'data' not in data: + return [] + + posts = [] + for child in data['data'].get('children', []): + post = child['data'] + posts.append({ + 'id': post['id'], + 'title': post['title'], + 'author': post['author'], + 'score': post['score'], + 'upvote_ratio': post.get('upvote_ratio', 0), + 'num_comments': post['num_comments'], + 'url': post['url'], + 'selftext': post.get('selftext', '')[:500], + 'created': datetime.fromtimestamp(post['created_utc']).isoformat(), + 'permalink': f"https://reddit.com{post['permalink']}", + 'subreddit': subreddit, + }) + + return posts + +def get_post_comments(subreddit: str, post_id: str, limit: int = 50) -> list: + """Fetch comments from a post.""" + url = f"https://www.reddit.com/r/{subreddit}/comments/{post_id}.json?limit={limit}" + data = fetch_json(url) + + if not data or len(data) < 2: + return [] + + comments = [] + + def extract_comments(children, depth=0): + for child in children: + if child['kind'] != 't1': + continue + comment = child['data'] + comments.append({ + 'id': comment['id'], + 'author': comment['author'], + 'body': comment['body'][:1000], + 'score': comment['score'], + 'depth': depth, + 'created': datetime.fromtimestamp(comment['created_utc']).isoformat(), + }) + # Get replies + if comment.get('replies') and isinstance(comment['replies'], dict): + replies = comment['replies']['data'].get('children', []) + extract_comments(replies, depth + 1) + + extract_comments(data[1]['data'].get('children', [])) + return comments + +def search_reddit(query: str, subreddit: str = None, sort: str = "relevance", limit: int = 25) -> list: + """Search Reddit for a query.""" + encoded_query = quote_plus(query) + if subreddit: + url = f"https://www.reddit.com/r/{subreddit}/search.json?q={encoded_query}&restrict_sr=1&sort={sort}&limit={limit}" + else: + url = f"https://www.reddit.com/search.json?q={encoded_query}&sort={sort}&limit={limit}" + + data = fetch_json(url) + + if not data or 'data' not in data: + return [] + + posts = [] + for child in data['data'].get('children', []): + post = child['data'] + posts.append({ + 'id': post['id'], + 'title': post['title'], + 'author': post['author'], + 'score': post['score'], + 'num_comments': post['num_comments'], + 'subreddit': post['subreddit'], + 'permalink': f"https://reddit.com{post['permalink']}", + 'created': datetime.fromtimestamp(post['created_utc']).isoformat(), + }) + + return posts + +def analyze_sentiment(texts: list) -> dict: + """Simple keyword-based sentiment analysis.""" + positive_words = {'good', 'great', 'awesome', 'love', 'best', 'amazing', 'excellent', + 'fantastic', 'wonderful', 'perfect', 'nice', 'thanks', 'helpful', + 'brilliant', 'impressive', 'excited', 'happy', 'beautiful'} + negative_words = {'bad', 'terrible', 'awful', 'hate', 'worst', 'horrible', 'sucks', + 'disappointing', 'frustrated', 'annoying', 'broken', 'useless', + 'stupid', 'boring', 'ugly', 'failed', 'waste', 'problem', 'issue'} + + positive = 0 + negative = 0 + neutral = 0 + word_freq = Counter() + + for text in texts: + words = set(re.findall(r'\b\w+\b', text.lower())) + word_freq.update(words) + + pos_count = len(words & positive_words) + neg_count = len(words & negative_words) + + if pos_count > neg_count: + positive += 1 + elif neg_count > pos_count: + negative += 1 + else: + neutral += 1 + + total = positive + negative + neutral + return { + 'positive': positive, + 'negative': negative, + 'neutral': neutral, + 'total': total, + 'positive_pct': round(positive / total * 100, 1) if total else 0, + 'negative_pct': round(negative / total * 100, 1) if total else 0, + 'top_words': word_freq.most_common(20), + } + +def scan_topic(topic: str, subreddits: list = None): + """Scan for a topic across subreddits.""" + print(f"\nšŸ” Scanning Reddit for: {topic}") + print("=" * 50) + + if not subreddits: + subreddits = ['all'] + + all_posts = [] + all_comments = [] + + for sub in subreddits: + print(f"\nšŸ“Œ r/{sub}") + posts = search_reddit(topic, sub if sub != 'all' else None, limit=10) + print(f" Found {len(posts)} posts") + + for post in posts[:3]: # Get comments from top 3 + print(f" Fetching comments from: {post['title'][:40]}...") + comments = get_post_comments(post['subreddit'], post['id'], limit=30) + all_comments.extend([c['body'] for c in comments]) + + all_posts.extend(posts) + + # Analyze + print(f"\nšŸ“Š Analysis") + print("-" * 30) + + if all_posts: + avg_score = sum(p['score'] for p in all_posts) / len(all_posts) + avg_comments = sum(p['num_comments'] for p in all_posts) / len(all_posts) + print(f"Posts analyzed: {len(all_posts)}") + print(f"Avg score: {avg_score:.0f}") + print(f"Avg comments: {avg_comments:.0f}") + + if all_comments: + sentiment = analyze_sentiment(all_comments) + print(f"\nComment sentiment ({sentiment['total']} comments):") + print(f" šŸ‘ Positive: {sentiment['positive_pct']}%") + print(f" šŸ‘Ž Negative: {sentiment['negative_pct']}%") + print(f" 😐 Neutral: {100 - sentiment['positive_pct'] - sentiment['negative_pct']:.1f}%") + + print(f"\nTop words:") + for word, count in sentiment['top_words'][:10]: + if len(word) > 3: + print(f" {word}: {count}") + + # Top posts + print(f"\nšŸ”„ Top Posts") + print("-" * 30) + for post in sorted(all_posts, key=lambda x: x['score'], reverse=True)[:5]: + print(f"[{post['score']:4}] {post['title'][:50]}...") + print(f" r/{post['subreddit']} | {post['num_comments']} comments") + print() + +def show_subreddit(subreddit: str, sort: str = "hot"): + """Show posts from a subreddit.""" + print(f"\nšŸ“Œ r/{subreddit} ({sort})") + print("=" * 50) + + posts = get_subreddit_posts(subreddit, sort, limit=15) + + if not posts: + print("No posts found or subreddit doesn't exist") + return + + for post in posts: + print(f"\n[{post['score']:4}] {post['title'][:60]}") + print(f" {post['num_comments']} comments | u/{post['author']}") + +def main(): + if len(sys.argv) < 2: + print("Usage:") + print(" reddit-scanner topic [subreddits...]") + print(" reddit-scanner sub [hot|new|top]") + print(" reddit-scanner search ") + print("") + print("Examples:") + print(" reddit-scanner topic 'artificial intelligence' technology futurology") + print(" reddit-scanner sub python hot") + print(" reddit-scanner search 'rust programming'") + return + + cmd = sys.argv[1] + + if cmd == 'topic' and len(sys.argv) > 2: + topic = sys.argv[2] + subreddits = sys.argv[3:] if len(sys.argv) > 3 else None + scan_topic(topic, subreddits) + + elif cmd == 'sub' and len(sys.argv) > 2: + subreddit = sys.argv[2] + sort = sys.argv[3] if len(sys.argv) > 3 else 'hot' + show_subreddit(subreddit, sort) + + elif cmd == 'search' and len(sys.argv) > 2: + query = ' '.join(sys.argv[2:]) + posts = search_reddit(query, limit=15) + print(f"\nšŸ” Search: {query}") + print("=" * 50) + for post in posts: + print(f"\n[{post['score']:4}] {post['title'][:60]}") + print(f" r/{post['subreddit']} | {post['num_comments']} comments") + + else: + print("Unknown command. Run without args for help.") + +if __name__ == "__main__": + main() diff --git a/projects/reddit-scanner/requirements.txt b/projects/reddit-scanner/requirements.txt new file mode 100644 index 0000000..46ec93d --- /dev/null +++ b/projects/reddit-scanner/requirements.txt @@ -0,0 +1 @@ +# Add dependencies here