#!/usr/bin/env python3 """ Feed Hunter Portal Monitor - Enhanced monitoring with time-based scheduling Monitors portal health, feed pipeline status, and sends alerts via Telegram """ import json import os import sys import time import urllib.request import urllib.error from datetime import datetime, timezone, timedelta from pathlib import Path # Configuration PROJECT_DIR = Path(__file__).parent PORTAL_URL = "http://localhost:8888" STATUS_API = f"{PORTAL_URL}/api/data?type=status" # Telegram config (using existing environment variables) TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "6443752046") # Business hours (9 AM - 6 PM CST) BUSINESS_HOURS_START = 9 BUSINESS_HOURS_END = 18 TIMEZONE_OFFSET = -6 # CST is UTC-6 # Status file for tracking alerts to prevent spam STATUS_FILE = PROJECT_DIR / "data" / "monitor_status.json" STATUS_FILE.parent.mkdir(parents=True, exist_ok=True) # Alert thresholds MAX_ALERTS_PER_HOUR = 3 PORTAL_TIMEOUT = 10 # seconds CHROME_DOWN_ALERT_THRESHOLD = 2 # Alert after 2 consecutive failures STALE_DATA_THRESHOLD = 3600 # Alert if no new data for 1 hour def log(message: str, level: str = "INFO"): """Log with timestamp""" now = datetime.now(timezone.utc) print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')} UTC] [{level}] {message}") def send_telegram(message: str, priority: str = "normal"): """Send Telegram message with rate limiting""" if not TELEGRAM_BOT_TOKEN: log(f"[ALERT-{priority.upper()}] {message}") return # Load status to check rate limiting status = load_monitor_status() now_ts = int(time.time()) # Clean old alert timestamps status["alert_timestamps"] = [ts for ts in status["alert_timestamps"] if now_ts - ts < 3600] # Check rate limit (except for critical alerts) if priority != "critical" and len(status["alert_timestamps"]) >= MAX_ALERTS_PER_HOUR: log(f"Rate limit reached, skipping alert: {message[:100]}...") return # Send message url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" # Add priority emoji emoji = "🔴" if priority == "critical" else "⚠️" if priority == "warning" else "ℹ️" formatted_message = f"{emoji} Feed Hunter Monitor\n\n{message}" data = json.dumps({ "chat_id": TELEGRAM_CHAT_ID, "text": formatted_message, "parse_mode": "HTML", "disable_web_page_preview": True, }).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) try: urllib.request.urlopen(req, timeout=10) status["alert_timestamps"].append(now_ts) save_monitor_status(status) log(f"Telegram alert sent: {message[:100]}...") except Exception as e: log(f"Telegram error: {e}", "ERROR") def load_monitor_status() -> dict: """Load monitor status from file""" if STATUS_FILE.exists(): try: return json.loads(STATUS_FILE.read_text()) except: pass return { "alert_timestamps": [], "last_portal_up": None, "last_chrome_up": None, "consecutive_chrome_failures": 0, "consecutive_portal_failures": 0, "last_successful_scrape": None, } def save_monitor_status(status: dict): """Save monitor status to file""" STATUS_FILE.write_text(json.dumps(status, indent=2)) def is_business_hours() -> bool: """Check if current time is during business hours (9 AM - 6 PM CST)""" now = datetime.now(timezone.utc) # Convert to CST cst_time = now + timedelta(hours=TIMEZONE_OFFSET) hour = cst_time.hour return BUSINESS_HOURS_START <= hour < BUSINESS_HOURS_END def check_portal_health() -> dict: """Check Feed Hunter Portal health via API""" result = { "portal_up": False, "status_data": None, "error": None } try: req = urllib.request.Request(STATUS_API, headers={ "User-Agent": "FeedHunterMonitor/1.0" }) with urllib.request.urlopen(req, timeout=PORTAL_TIMEOUT) as response: if response.status == 200: result["portal_up"] = True result["status_data"] = json.loads(response.read().decode()) else: result["error"] = f"HTTP {response.status}" except urllib.error.URLError as e: result["error"] = f"URL Error: {e.reason}" except Exception as e: result["error"] = f"Error: {str(e)}" return result def analyze_status_data(status_data: dict) -> list: """Analyze status data and return list of issues""" issues = [] if not status_data: return ["No status data available"] # Check Chrome debug port chrome_status = status_data.get("chrome", {}) if chrome_status.get("status") != "Running": issues.append(f"Chrome debug port down: {chrome_status.get('detail', 'Unknown')}") # Check for stale data last_run = status_data.get("last_run", {}) if last_run.get("time") and last_run["time"] != "Unknown": try: # Try to parse the timestamp last_time = datetime.strptime(last_run["time"], '%Y-%m-%d %H:%M:%S') now = datetime.now() time_diff = (now - last_time).total_seconds() if time_diff > STALE_DATA_THRESHOLD: hours = int(time_diff / 3600) issues.append(f"No new scrape data for {hours} hours (last: {last_run['time']})") except ValueError: pass # Couldn't parse time, skip check return issues def run_monitor_check(): """Run a single monitoring check""" log("Starting monitoring check...") status = load_monitor_status() now_ts = int(time.time()) business_hours = is_business_hours() log(f"Business hours: {business_hours}") # Check portal health health = check_portal_health() if health["portal_up"]: log("Portal is UP") status["consecutive_portal_failures"] = 0 status["last_portal_up"] = now_ts # Analyze the status data issues = analyze_status_data(health["status_data"]) if not issues: log("All systems healthy") # Reset Chrome failure counter if everything is working status["consecutive_chrome_failures"] = 0 status["last_chrome_up"] = now_ts else: log(f"Found {len(issues)} issues: {', '.join(issues)}") # Handle Chrome issues specifically chrome_down = any("Chrome" in issue for issue in issues) if chrome_down: status["consecutive_chrome_failures"] += 1 # Alert after consecutive failures if status["consecutive_chrome_failures"] >= CHROME_DOWN_ALERT_THRESHOLD: priority = "critical" if status["consecutive_chrome_failures"] > 3 else "warning" send_telegram( f"Chrome debug port has been down for {status['consecutive_chrome_failures']} consecutive checks.\n\n" f"This affects the X/Twitter scraping pipeline.\n\n" f"Status: {', '.join(issues)}", priority ) else: status["consecutive_chrome_failures"] = 0 status["last_chrome_up"] = now_ts # Alert on other issues non_chrome_issues = [issue for issue in issues if "Chrome" not in issue] if non_chrome_issues: priority = "warning" if len(non_chrome_issues) == 1 else "critical" send_telegram( f"Feed Hunter Pipeline Issues Detected:\n\n" + "\n".join(f"• {issue}" for issue in non_chrome_issues), priority ) else: log(f"Portal is DOWN: {health['error']}") status["consecutive_portal_failures"] += 1 # Alert on portal being down if status["consecutive_portal_failures"] >= 2: priority = "critical" if status["consecutive_portal_failures"] > 3 else "warning" send_telegram( f"Feed Hunter Portal is DOWN ({status['consecutive_portal_failures']} consecutive failures)\n\n" f"Error: {health['error']}\n\n" f"Portal URL: {PORTAL_URL}", priority ) # Save updated status save_monitor_status(status) # Log summary log(f"Monitor check complete. Portal failures: {status['consecutive_portal_failures']}, " f"Chrome failures: {status['consecutive_chrome_failures']}") def main(): """Main entry point""" try: run_monitor_check() except KeyboardInterrupt: log("Monitor interrupted by user") sys.exit(0) except Exception as e: log(f"Monitor error: {e}", "ERROR") # Send critical alert for monitor failures send_telegram( f"Feed Hunter Monitor Script Error:\n\n{str(e)}\n\n" f"Monitor may need manual intervention.", "critical" ) sys.exit(1) if __name__ == "__main__": main()