Files
workspace/projects/reddit-scanner/main.py

287 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
reddit-scanner - Scan Reddit for sentiment and discussions on topics
Uses Reddit's public JSON API (no auth required, rate limited).
"""
import json
import time
import re
import sys
from datetime import datetime
from pathlib import Path
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
from urllib.parse import quote_plus
from collections import Counter
PROJECT_DIR = Path(__file__).parent
DATA_DIR = PROJECT_DIR / "data"
CACHE_DIR = DATA_DIR / "cache"
USER_AGENT = "reddit-scanner/1.0 (personal use)"
RATE_LIMIT = 2 # seconds between requests
last_request = 0
def fetch_json(url: str) -> dict:
"""Fetch JSON from Reddit API with rate limiting."""
global last_request
# Rate limit
elapsed = time.time() - last_request
if elapsed < RATE_LIMIT:
time.sleep(RATE_LIMIT - elapsed)
req = Request(url, headers={'User-Agent': USER_AGENT})
try:
with urlopen(req, timeout=15) as resp:
last_request = time.time()
return json.loads(resp.read().decode('utf-8'))
except HTTPError as e:
if e.code == 429:
print(" Rate limited, waiting...")
time.sleep(10)
return fetch_json(url)
print(f" HTTP Error: {e.code}")
return None
except URLError as e:
print(f" Error: {e}")
return None
def get_subreddit_posts(subreddit: str, sort: str = "hot", limit: int = 25) -> list:
"""Fetch posts from a subreddit."""
url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={limit}"
data = fetch_json(url)
if not data or 'data' not in data:
return []
posts = []
for child in data['data'].get('children', []):
post = child['data']
posts.append({
'id': post['id'],
'title': post['title'],
'author': post['author'],
'score': post['score'],
'upvote_ratio': post.get('upvote_ratio', 0),
'num_comments': post['num_comments'],
'url': post['url'],
'selftext': post.get('selftext', '')[:500],
'created': datetime.fromtimestamp(post['created_utc']).isoformat(),
'permalink': f"https://reddit.com{post['permalink']}",
'subreddit': subreddit,
})
return posts
def get_post_comments(subreddit: str, post_id: str, limit: int = 50) -> list:
"""Fetch comments from a post."""
url = f"https://www.reddit.com/r/{subreddit}/comments/{post_id}.json?limit={limit}"
data = fetch_json(url)
if not data or len(data) < 2:
return []
comments = []
def extract_comments(children, depth=0):
for child in children:
if child['kind'] != 't1':
continue
comment = child['data']
comments.append({
'id': comment['id'],
'author': comment['author'],
'body': comment['body'][:1000],
'score': comment['score'],
'depth': depth,
'created': datetime.fromtimestamp(comment['created_utc']).isoformat(),
})
# Get replies
if comment.get('replies') and isinstance(comment['replies'], dict):
replies = comment['replies']['data'].get('children', [])
extract_comments(replies, depth + 1)
extract_comments(data[1]['data'].get('children', []))
return comments
def search_reddit(query: str, subreddit: str = None, sort: str = "relevance", limit: int = 25) -> list:
"""Search Reddit for a query."""
encoded_query = quote_plus(query)
if subreddit:
url = f"https://www.reddit.com/r/{subreddit}/search.json?q={encoded_query}&restrict_sr=1&sort={sort}&limit={limit}"
else:
url = f"https://www.reddit.com/search.json?q={encoded_query}&sort={sort}&limit={limit}"
data = fetch_json(url)
if not data or 'data' not in data:
return []
posts = []
for child in data['data'].get('children', []):
post = child['data']
posts.append({
'id': post['id'],
'title': post['title'],
'author': post['author'],
'score': post['score'],
'num_comments': post['num_comments'],
'subreddit': post['subreddit'],
'permalink': f"https://reddit.com{post['permalink']}",
'created': datetime.fromtimestamp(post['created_utc']).isoformat(),
})
return posts
def analyze_sentiment(texts: list) -> dict:
"""Simple keyword-based sentiment analysis."""
positive_words = {'good', 'great', 'awesome', 'love', 'best', 'amazing', 'excellent',
'fantastic', 'wonderful', 'perfect', 'nice', 'thanks', 'helpful',
'brilliant', 'impressive', 'excited', 'happy', 'beautiful'}
negative_words = {'bad', 'terrible', 'awful', 'hate', 'worst', 'horrible', 'sucks',
'disappointing', 'frustrated', 'annoying', 'broken', 'useless',
'stupid', 'boring', 'ugly', 'failed', 'waste', 'problem', 'issue'}
positive = 0
negative = 0
neutral = 0
word_freq = Counter()
for text in texts:
words = set(re.findall(r'\b\w+\b', text.lower()))
word_freq.update(words)
pos_count = len(words & positive_words)
neg_count = len(words & negative_words)
if pos_count > neg_count:
positive += 1
elif neg_count > pos_count:
negative += 1
else:
neutral += 1
total = positive + negative + neutral
return {
'positive': positive,
'negative': negative,
'neutral': neutral,
'total': total,
'positive_pct': round(positive / total * 100, 1) if total else 0,
'negative_pct': round(negative / total * 100, 1) if total else 0,
'top_words': word_freq.most_common(20),
}
def scan_topic(topic: str, subreddits: list = None):
"""Scan for a topic across subreddits."""
print(f"\n🔍 Scanning Reddit for: {topic}")
print("=" * 50)
if not subreddits:
subreddits = ['all']
all_posts = []
all_comments = []
for sub in subreddits:
print(f"\n📌 r/{sub}")
posts = search_reddit(topic, sub if sub != 'all' else None, limit=10)
print(f" Found {len(posts)} posts")
for post in posts[:3]: # Get comments from top 3
print(f" Fetching comments from: {post['title'][:40]}...")
comments = get_post_comments(post['subreddit'], post['id'], limit=30)
all_comments.extend([c['body'] for c in comments])
all_posts.extend(posts)
# Analyze
print(f"\n📊 Analysis")
print("-" * 30)
if all_posts:
avg_score = sum(p['score'] for p in all_posts) / len(all_posts)
avg_comments = sum(p['num_comments'] for p in all_posts) / len(all_posts)
print(f"Posts analyzed: {len(all_posts)}")
print(f"Avg score: {avg_score:.0f}")
print(f"Avg comments: {avg_comments:.0f}")
if all_comments:
sentiment = analyze_sentiment(all_comments)
print(f"\nComment sentiment ({sentiment['total']} comments):")
print(f" 👍 Positive: {sentiment['positive_pct']}%")
print(f" 👎 Negative: {sentiment['negative_pct']}%")
print(f" 😐 Neutral: {100 - sentiment['positive_pct'] - sentiment['negative_pct']:.1f}%")
print(f"\nTop words:")
for word, count in sentiment['top_words'][:10]:
if len(word) > 3:
print(f" {word}: {count}")
# Top posts
print(f"\n🔥 Top Posts")
print("-" * 30)
for post in sorted(all_posts, key=lambda x: x['score'], reverse=True)[:5]:
print(f"[{post['score']:4}] {post['title'][:50]}...")
print(f" r/{post['subreddit']} | {post['num_comments']} comments")
print()
def show_subreddit(subreddit: str, sort: str = "hot"):
"""Show posts from a subreddit."""
print(f"\n📌 r/{subreddit} ({sort})")
print("=" * 50)
posts = get_subreddit_posts(subreddit, sort, limit=15)
if not posts:
print("No posts found or subreddit doesn't exist")
return
for post in posts:
print(f"\n[{post['score']:4}] {post['title'][:60]}")
print(f" {post['num_comments']} comments | u/{post['author']}")
def main():
if len(sys.argv) < 2:
print("Usage:")
print(" reddit-scanner topic <query> [subreddits...]")
print(" reddit-scanner sub <subreddit> [hot|new|top]")
print(" reddit-scanner search <query>")
print("")
print("Examples:")
print(" reddit-scanner topic 'artificial intelligence' technology futurology")
print(" reddit-scanner sub python hot")
print(" reddit-scanner search 'rust programming'")
return
cmd = sys.argv[1]
if cmd == 'topic' and len(sys.argv) > 2:
topic = sys.argv[2]
subreddits = sys.argv[3:] if len(sys.argv) > 3 else None
scan_topic(topic, subreddits)
elif cmd == 'sub' and len(sys.argv) > 2:
subreddit = sys.argv[2]
sort = sys.argv[3] if len(sys.argv) > 3 else 'hot'
show_subreddit(subreddit, sort)
elif cmd == 'search' and len(sys.argv) > 2:
query = ' '.join(sys.argv[2:])
posts = search_reddit(query, limit=15)
print(f"\n🔍 Search: {query}")
print("=" * 50)
for post in posts:
print(f"\n[{post['score']:4}] {post['title'][:60]}")
print(f" r/{post['subreddit']} | {post['num_comments']} comments")
else:
print("Unknown command. Run without args for help.")
if __name__ == "__main__":
main()