Files
workspace/projects/feed-hunter/portal_monitor.py

274 lines
9.2 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Feed Hunter Portal Monitor - Enhanced monitoring with time-based scheduling
Monitors portal health, feed pipeline status, and sends alerts via Telegram
"""
import json
import os
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from pathlib import Path
# Configuration
PROJECT_DIR = Path(__file__).parent
PORTAL_URL = "http://localhost:8888"
STATUS_API = f"{PORTAL_URL}/api/data?type=status"
# Telegram config (using existing environment variables)
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "6443752046")
# Business hours (9 AM - 6 PM CST)
BUSINESS_HOURS_START = 9
BUSINESS_HOURS_END = 18
TIMEZONE_OFFSET = -6 # CST is UTC-6
# Status file for tracking alerts to prevent spam
STATUS_FILE = PROJECT_DIR / "data" / "monitor_status.json"
STATUS_FILE.parent.mkdir(parents=True, exist_ok=True)
# Alert thresholds
MAX_ALERTS_PER_HOUR = 3
PORTAL_TIMEOUT = 10 # seconds
CHROME_DOWN_ALERT_THRESHOLD = 2 # Alert after 2 consecutive failures
STALE_DATA_THRESHOLD = 3600 # Alert if no new data for 1 hour
def log(message: str, level: str = "INFO"):
"""Log with timestamp"""
now = datetime.now(timezone.utc)
print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')} UTC] [{level}] {message}")
def send_telegram(message: str, priority: str = "normal"):
"""Send Telegram message with rate limiting"""
if not TELEGRAM_BOT_TOKEN:
log(f"[ALERT-{priority.upper()}] {message}")
return
# Load status to check rate limiting
status = load_monitor_status()
now_ts = int(time.time())
# Clean old alert timestamps
status["alert_timestamps"] = [ts for ts in status["alert_timestamps"] if now_ts - ts < 3600]
# Check rate limit (except for critical alerts)
if priority != "critical" and len(status["alert_timestamps"]) >= MAX_ALERTS_PER_HOUR:
log(f"Rate limit reached, skipping alert: {message[:100]}...")
return
# Send message
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
# Add priority emoji
emoji = "🔴" if priority == "critical" else "⚠️" if priority == "warning" else ""
formatted_message = f"{emoji} <b>Feed Hunter Monitor</b>\n\n{message}"
data = json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": formatted_message,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
try:
urllib.request.urlopen(req, timeout=10)
status["alert_timestamps"].append(now_ts)
save_monitor_status(status)
log(f"Telegram alert sent: {message[:100]}...")
except Exception as e:
log(f"Telegram error: {e}", "ERROR")
def load_monitor_status() -> dict:
"""Load monitor status from file"""
if STATUS_FILE.exists():
try:
return json.loads(STATUS_FILE.read_text())
except:
pass
return {
"alert_timestamps": [],
"last_portal_up": None,
"last_chrome_up": None,
"consecutive_chrome_failures": 0,
"consecutive_portal_failures": 0,
"last_successful_scrape": None,
}
def save_monitor_status(status: dict):
"""Save monitor status to file"""
STATUS_FILE.write_text(json.dumps(status, indent=2))
def is_business_hours() -> bool:
"""Check if current time is during business hours (9 AM - 6 PM CST)"""
now = datetime.now(timezone.utc)
# Convert to CST
cst_time = now + timedelta(hours=TIMEZONE_OFFSET)
hour = cst_time.hour
return BUSINESS_HOURS_START <= hour < BUSINESS_HOURS_END
def check_portal_health() -> dict:
"""Check Feed Hunter Portal health via API"""
result = {
"portal_up": False,
"status_data": None,
"error": None
}
try:
req = urllib.request.Request(STATUS_API, headers={
"User-Agent": "FeedHunterMonitor/1.0"
})
with urllib.request.urlopen(req, timeout=PORTAL_TIMEOUT) as response:
if response.status == 200:
result["portal_up"] = True
result["status_data"] = json.loads(response.read().decode())
else:
result["error"] = f"HTTP {response.status}"
except urllib.error.URLError as e:
result["error"] = f"URL Error: {e.reason}"
except Exception as e:
result["error"] = f"Error: {str(e)}"
return result
def analyze_status_data(status_data: dict) -> list:
"""Analyze status data and return list of issues"""
issues = []
if not status_data:
return ["No status data available"]
# Check Chrome debug port
chrome_status = status_data.get("chrome", {})
if chrome_status.get("status") != "Running":
issues.append(f"Chrome debug port down: {chrome_status.get('detail', 'Unknown')}")
# Check for stale data
last_run = status_data.get("last_run", {})
if last_run.get("time") and last_run["time"] != "Unknown":
try:
# Try to parse the timestamp
last_time = datetime.strptime(last_run["time"], '%Y-%m-%d %H:%M:%S')
now = datetime.now()
time_diff = (now - last_time).total_seconds()
if time_diff > STALE_DATA_THRESHOLD:
hours = int(time_diff / 3600)
issues.append(f"No new scrape data for {hours} hours (last: {last_run['time']})")
except ValueError:
pass # Couldn't parse time, skip check
return issues
def run_monitor_check():
"""Run a single monitoring check"""
log("Starting monitoring check...")
status = load_monitor_status()
now_ts = int(time.time())
business_hours = is_business_hours()
log(f"Business hours: {business_hours}")
# Check portal health
health = check_portal_health()
if health["portal_up"]:
log("Portal is UP")
status["consecutive_portal_failures"] = 0
status["last_portal_up"] = now_ts
# Analyze the status data
issues = analyze_status_data(health["status_data"])
if not issues:
log("All systems healthy")
# Reset Chrome failure counter if everything is working
status["consecutive_chrome_failures"] = 0
status["last_chrome_up"] = now_ts
else:
log(f"Found {len(issues)} issues: {', '.join(issues)}")
# Handle Chrome issues specifically
chrome_down = any("Chrome" in issue for issue in issues)
if chrome_down:
status["consecutive_chrome_failures"] += 1
# Alert after consecutive failures
if status["consecutive_chrome_failures"] >= CHROME_DOWN_ALERT_THRESHOLD:
priority = "critical" if status["consecutive_chrome_failures"] > 3 else "warning"
send_telegram(
f"Chrome debug port has been down for {status['consecutive_chrome_failures']} consecutive checks.\n\n"
f"This affects the X/Twitter scraping pipeline.\n\n"
f"Status: {', '.join(issues)}",
priority
)
else:
status["consecutive_chrome_failures"] = 0
status["last_chrome_up"] = now_ts
# Alert on other issues
non_chrome_issues = [issue for issue in issues if "Chrome" not in issue]
if non_chrome_issues:
priority = "warning" if len(non_chrome_issues) == 1 else "critical"
send_telegram(
f"Feed Hunter Pipeline Issues Detected:\n\n" +
"\n".join(f"{issue}" for issue in non_chrome_issues),
priority
)
else:
log(f"Portal is DOWN: {health['error']}")
status["consecutive_portal_failures"] += 1
# Alert on portal being down
if status["consecutive_portal_failures"] >= 2:
priority = "critical" if status["consecutive_portal_failures"] > 3 else "warning"
send_telegram(
f"Feed Hunter Portal is DOWN ({status['consecutive_portal_failures']} consecutive failures)\n\n"
f"Error: {health['error']}\n\n"
f"Portal URL: {PORTAL_URL}",
priority
)
# Save updated status
save_monitor_status(status)
# Log summary
log(f"Monitor check complete. Portal failures: {status['consecutive_portal_failures']}, "
f"Chrome failures: {status['consecutive_chrome_failures']}")
def main():
"""Main entry point"""
try:
run_monitor_check()
except KeyboardInterrupt:
log("Monitor interrupted by user")
sys.exit(0)
except Exception as e:
log(f"Monitor error: {e}", "ERROR")
# Send critical alert for monitor failures
send_telegram(
f"Feed Hunter Monitor Script Error:\n\n{str(e)}\n\n"
f"Monitor may need manual intervention.",
"critical"
)
sys.exit(1)
if __name__ == "__main__":
main()