Add reddit-scanner project - Reddit search and topic scanning - Sentiment analysis from comments - No auth required (uses public JSON API) - Rate limiting built in
This commit is contained in:
7
projects/reddit-scanner/.gitignore
vendored
Normal file
7
projects/reddit-scanner/.gitignore
vendored
Normal file
@ -0,0 +1,7 @@
|
||||
venv/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
.env
|
||||
*.egg-info/
|
||||
dist/
|
||||
build/
|
||||
17
projects/reddit-scanner/README.md
Normal file
17
projects/reddit-scanner/README.md
Normal file
@ -0,0 +1,17 @@
|
||||
# reddit-scanner
|
||||
|
||||
Scan Reddit for sentiment and discussions on topics
|
||||
|
||||
## Setup
|
||||
|
||||
```bash
|
||||
python -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
python main.py
|
||||
```
|
||||
286
projects/reddit-scanner/main.py
Executable file
286
projects/reddit-scanner/main.py
Executable file
@ -0,0 +1,286 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
reddit-scanner - Scan Reddit for sentiment and discussions on topics
|
||||
|
||||
Uses Reddit's public JSON API (no auth required, rate limited).
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from urllib.request import urlopen, Request
|
||||
from urllib.error import URLError, HTTPError
|
||||
from urllib.parse import quote_plus
|
||||
from collections import Counter
|
||||
|
||||
PROJECT_DIR = Path(__file__).parent
|
||||
DATA_DIR = PROJECT_DIR / "data"
|
||||
CACHE_DIR = DATA_DIR / "cache"
|
||||
|
||||
USER_AGENT = "reddit-scanner/1.0 (personal use)"
|
||||
RATE_LIMIT = 2 # seconds between requests
|
||||
|
||||
last_request = 0
|
||||
|
||||
def fetch_json(url: str) -> dict:
|
||||
"""Fetch JSON from Reddit API with rate limiting."""
|
||||
global last_request
|
||||
|
||||
# Rate limit
|
||||
elapsed = time.time() - last_request
|
||||
if elapsed < RATE_LIMIT:
|
||||
time.sleep(RATE_LIMIT - elapsed)
|
||||
|
||||
req = Request(url, headers={'User-Agent': USER_AGENT})
|
||||
try:
|
||||
with urlopen(req, timeout=15) as resp:
|
||||
last_request = time.time()
|
||||
return json.loads(resp.read().decode('utf-8'))
|
||||
except HTTPError as e:
|
||||
if e.code == 429:
|
||||
print(" Rate limited, waiting...")
|
||||
time.sleep(10)
|
||||
return fetch_json(url)
|
||||
print(f" HTTP Error: {e.code}")
|
||||
return None
|
||||
except URLError as e:
|
||||
print(f" Error: {e}")
|
||||
return None
|
||||
|
||||
def get_subreddit_posts(subreddit: str, sort: str = "hot", limit: int = 25) -> list:
|
||||
"""Fetch posts from a subreddit."""
|
||||
url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={limit}"
|
||||
data = fetch_json(url)
|
||||
|
||||
if not data or 'data' not in data:
|
||||
return []
|
||||
|
||||
posts = []
|
||||
for child in data['data'].get('children', []):
|
||||
post = child['data']
|
||||
posts.append({
|
||||
'id': post['id'],
|
||||
'title': post['title'],
|
||||
'author': post['author'],
|
||||
'score': post['score'],
|
||||
'upvote_ratio': post.get('upvote_ratio', 0),
|
||||
'num_comments': post['num_comments'],
|
||||
'url': post['url'],
|
||||
'selftext': post.get('selftext', '')[:500],
|
||||
'created': datetime.fromtimestamp(post['created_utc']).isoformat(),
|
||||
'permalink': f"https://reddit.com{post['permalink']}",
|
||||
'subreddit': subreddit,
|
||||
})
|
||||
|
||||
return posts
|
||||
|
||||
def get_post_comments(subreddit: str, post_id: str, limit: int = 50) -> list:
|
||||
"""Fetch comments from a post."""
|
||||
url = f"https://www.reddit.com/r/{subreddit}/comments/{post_id}.json?limit={limit}"
|
||||
data = fetch_json(url)
|
||||
|
||||
if not data or len(data) < 2:
|
||||
return []
|
||||
|
||||
comments = []
|
||||
|
||||
def extract_comments(children, depth=0):
|
||||
for child in children:
|
||||
if child['kind'] != 't1':
|
||||
continue
|
||||
comment = child['data']
|
||||
comments.append({
|
||||
'id': comment['id'],
|
||||
'author': comment['author'],
|
||||
'body': comment['body'][:1000],
|
||||
'score': comment['score'],
|
||||
'depth': depth,
|
||||
'created': datetime.fromtimestamp(comment['created_utc']).isoformat(),
|
||||
})
|
||||
# Get replies
|
||||
if comment.get('replies') and isinstance(comment['replies'], dict):
|
||||
replies = comment['replies']['data'].get('children', [])
|
||||
extract_comments(replies, depth + 1)
|
||||
|
||||
extract_comments(data[1]['data'].get('children', []))
|
||||
return comments
|
||||
|
||||
def search_reddit(query: str, subreddit: str = None, sort: str = "relevance", limit: int = 25) -> list:
|
||||
"""Search Reddit for a query."""
|
||||
encoded_query = quote_plus(query)
|
||||
if subreddit:
|
||||
url = f"https://www.reddit.com/r/{subreddit}/search.json?q={encoded_query}&restrict_sr=1&sort={sort}&limit={limit}"
|
||||
else:
|
||||
url = f"https://www.reddit.com/search.json?q={encoded_query}&sort={sort}&limit={limit}"
|
||||
|
||||
data = fetch_json(url)
|
||||
|
||||
if not data or 'data' not in data:
|
||||
return []
|
||||
|
||||
posts = []
|
||||
for child in data['data'].get('children', []):
|
||||
post = child['data']
|
||||
posts.append({
|
||||
'id': post['id'],
|
||||
'title': post['title'],
|
||||
'author': post['author'],
|
||||
'score': post['score'],
|
||||
'num_comments': post['num_comments'],
|
||||
'subreddit': post['subreddit'],
|
||||
'permalink': f"https://reddit.com{post['permalink']}",
|
||||
'created': datetime.fromtimestamp(post['created_utc']).isoformat(),
|
||||
})
|
||||
|
||||
return posts
|
||||
|
||||
def analyze_sentiment(texts: list) -> dict:
|
||||
"""Simple keyword-based sentiment analysis."""
|
||||
positive_words = {'good', 'great', 'awesome', 'love', 'best', 'amazing', 'excellent',
|
||||
'fantastic', 'wonderful', 'perfect', 'nice', 'thanks', 'helpful',
|
||||
'brilliant', 'impressive', 'excited', 'happy', 'beautiful'}
|
||||
negative_words = {'bad', 'terrible', 'awful', 'hate', 'worst', 'horrible', 'sucks',
|
||||
'disappointing', 'frustrated', 'annoying', 'broken', 'useless',
|
||||
'stupid', 'boring', 'ugly', 'failed', 'waste', 'problem', 'issue'}
|
||||
|
||||
positive = 0
|
||||
negative = 0
|
||||
neutral = 0
|
||||
word_freq = Counter()
|
||||
|
||||
for text in texts:
|
||||
words = set(re.findall(r'\b\w+\b', text.lower()))
|
||||
word_freq.update(words)
|
||||
|
||||
pos_count = len(words & positive_words)
|
||||
neg_count = len(words & negative_words)
|
||||
|
||||
if pos_count > neg_count:
|
||||
positive += 1
|
||||
elif neg_count > pos_count:
|
||||
negative += 1
|
||||
else:
|
||||
neutral += 1
|
||||
|
||||
total = positive + negative + neutral
|
||||
return {
|
||||
'positive': positive,
|
||||
'negative': negative,
|
||||
'neutral': neutral,
|
||||
'total': total,
|
||||
'positive_pct': round(positive / total * 100, 1) if total else 0,
|
||||
'negative_pct': round(negative / total * 100, 1) if total else 0,
|
||||
'top_words': word_freq.most_common(20),
|
||||
}
|
||||
|
||||
def scan_topic(topic: str, subreddits: list = None):
|
||||
"""Scan for a topic across subreddits."""
|
||||
print(f"\n🔍 Scanning Reddit for: {topic}")
|
||||
print("=" * 50)
|
||||
|
||||
if not subreddits:
|
||||
subreddits = ['all']
|
||||
|
||||
all_posts = []
|
||||
all_comments = []
|
||||
|
||||
for sub in subreddits:
|
||||
print(f"\n📌 r/{sub}")
|
||||
posts = search_reddit(topic, sub if sub != 'all' else None, limit=10)
|
||||
print(f" Found {len(posts)} posts")
|
||||
|
||||
for post in posts[:3]: # Get comments from top 3
|
||||
print(f" Fetching comments from: {post['title'][:40]}...")
|
||||
comments = get_post_comments(post['subreddit'], post['id'], limit=30)
|
||||
all_comments.extend([c['body'] for c in comments])
|
||||
|
||||
all_posts.extend(posts)
|
||||
|
||||
# Analyze
|
||||
print(f"\n📊 Analysis")
|
||||
print("-" * 30)
|
||||
|
||||
if all_posts:
|
||||
avg_score = sum(p['score'] for p in all_posts) / len(all_posts)
|
||||
avg_comments = sum(p['num_comments'] for p in all_posts) / len(all_posts)
|
||||
print(f"Posts analyzed: {len(all_posts)}")
|
||||
print(f"Avg score: {avg_score:.0f}")
|
||||
print(f"Avg comments: {avg_comments:.0f}")
|
||||
|
||||
if all_comments:
|
||||
sentiment = analyze_sentiment(all_comments)
|
||||
print(f"\nComment sentiment ({sentiment['total']} comments):")
|
||||
print(f" 👍 Positive: {sentiment['positive_pct']}%")
|
||||
print(f" 👎 Negative: {sentiment['negative_pct']}%")
|
||||
print(f" 😐 Neutral: {100 - sentiment['positive_pct'] - sentiment['negative_pct']:.1f}%")
|
||||
|
||||
print(f"\nTop words:")
|
||||
for word, count in sentiment['top_words'][:10]:
|
||||
if len(word) > 3:
|
||||
print(f" {word}: {count}")
|
||||
|
||||
# Top posts
|
||||
print(f"\n🔥 Top Posts")
|
||||
print("-" * 30)
|
||||
for post in sorted(all_posts, key=lambda x: x['score'], reverse=True)[:5]:
|
||||
print(f"[{post['score']:4}] {post['title'][:50]}...")
|
||||
print(f" r/{post['subreddit']} | {post['num_comments']} comments")
|
||||
print()
|
||||
|
||||
def show_subreddit(subreddit: str, sort: str = "hot"):
|
||||
"""Show posts from a subreddit."""
|
||||
print(f"\n📌 r/{subreddit} ({sort})")
|
||||
print("=" * 50)
|
||||
|
||||
posts = get_subreddit_posts(subreddit, sort, limit=15)
|
||||
|
||||
if not posts:
|
||||
print("No posts found or subreddit doesn't exist")
|
||||
return
|
||||
|
||||
for post in posts:
|
||||
print(f"\n[{post['score']:4}] {post['title'][:60]}")
|
||||
print(f" {post['num_comments']} comments | u/{post['author']}")
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage:")
|
||||
print(" reddit-scanner topic <query> [subreddits...]")
|
||||
print(" reddit-scanner sub <subreddit> [hot|new|top]")
|
||||
print(" reddit-scanner search <query>")
|
||||
print("")
|
||||
print("Examples:")
|
||||
print(" reddit-scanner topic 'artificial intelligence' technology futurology")
|
||||
print(" reddit-scanner sub python hot")
|
||||
print(" reddit-scanner search 'rust programming'")
|
||||
return
|
||||
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == 'topic' and len(sys.argv) > 2:
|
||||
topic = sys.argv[2]
|
||||
subreddits = sys.argv[3:] if len(sys.argv) > 3 else None
|
||||
scan_topic(topic, subreddits)
|
||||
|
||||
elif cmd == 'sub' and len(sys.argv) > 2:
|
||||
subreddit = sys.argv[2]
|
||||
sort = sys.argv[3] if len(sys.argv) > 3 else 'hot'
|
||||
show_subreddit(subreddit, sort)
|
||||
|
||||
elif cmd == 'search' and len(sys.argv) > 2:
|
||||
query = ' '.join(sys.argv[2:])
|
||||
posts = search_reddit(query, limit=15)
|
||||
print(f"\n🔍 Search: {query}")
|
||||
print("=" * 50)
|
||||
for post in posts:
|
||||
print(f"\n[{post['score']:4}] {post['title'][:60]}")
|
||||
print(f" r/{post['subreddit']} | {post['num_comments']} comments")
|
||||
|
||||
else:
|
||||
print("Unknown command. Run without args for help.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
projects/reddit-scanner/requirements.txt
Normal file
1
projects/reddit-scanner/requirements.txt
Normal file
@ -0,0 +1 @@
|
||||
# Add dependencies here
|
||||
Reference in New Issue
Block a user