#!/usr/bin/env python3 """ Self-Healing Server — Always-on infrastructure monitoring & auto-repair. Monitors services, disk, memory, CPU. Auto-restarts failed services. Logs all actions. Run via cron every 5 minutes or as a daemon. Usage: python3 self-healing-server.py # Run once (check & heal) python3 self-healing-server.py --daemon # Run continuously (every 5 min) python3 self-healing-server.py --status # Show current health """ import json import subprocess import os import sys import time import psutil from datetime import datetime from pathlib import Path DATA_DIR = Path(__file__).parent.parent / "data" / "self-healing" DATA_DIR.mkdir(parents=True, exist_ok=True) LOG_FILE = DATA_DIR / "heal-log.jsonl" CONFIG_FILE = DATA_DIR / "config.json" DEFAULT_CONFIG = { "monitored_services": [ "nexus", "nginx", "ssh" ], "monitored_ports": { "8000": "control-panel", "8888": "feed-hunter", "8889": "market-watch", "8890": "ticker", "80": "nginx", "3000": "nexus" }, "thresholds": { "disk_percent": 90, "memory_percent": 90, "cpu_percent": 95, "swap_percent": 80 }, "auto_restart": True, "check_interval_seconds": 300 } def load_config(): if CONFIG_FILE.exists(): return json.loads(CONFIG_FILE.read_text()) CONFIG_FILE.write_text(json.dumps(DEFAULT_CONFIG, indent=2)) return DEFAULT_CONFIG def log_event(event_type, message, action=None, success=None): entry = { "ts": datetime.now().isoformat(), "type": event_type, "message": message, } if action: entry["action"] = action if success is not None: entry["success"] = success with open(LOG_FILE, "a") as f: f.write(json.dumps(entry) + "\n") icon = "✅" if success else ("⚠️" if success is None else "❌") print(f" {icon} [{event_type}] {message}") def run_cmd(cmd, timeout=10): try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) return r.returncode, r.stdout.strip(), r.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "timeout" def check_services(config): issues = [] for svc in config["monitored_services"]: # Try systemd user units first, then system code, out, _ = run_cmd(f"systemctl --user is-active {svc} 2>/dev/null || systemctl is-active {svc} 2>/dev/null") if "active" not in out: issues.append(("service_down", svc)) log_event("service_down", f"Service '{svc}' is not active (status: {out})") if config.get("auto_restart"): # Try user unit first, then system code, _, err = run_cmd(f"systemctl --user restart {svc} 2>/dev/null || sudo systemctl restart {svc} 2>/dev/null") success = code == 0 log_event("auto_restart", f"Attempted restart of '{svc}'", action="restart", success=success) else: log_event("service_ok", f"Service '{svc}' is active", success=True) return issues def check_ports(config): issues = [] connections = psutil.net_connections(kind='inet') listening_ports = {str(c.laddr.port) for c in connections if c.status == 'LISTEN'} for port, name in config.get("monitored_ports", {}).items(): if port not in listening_ports: issues.append(("port_down", f"{name} (:{port})")) log_event("port_down", f"Nothing listening on port {port} ({name})") else: log_event("port_ok", f"Port {port} ({name}) is listening", success=True) return issues def check_resources(config): issues = [] thresholds = config.get("thresholds", {}) # Disk disk = psutil.disk_usage('/') if disk.percent >= thresholds.get("disk_percent", 90): issues.append(("disk_high", f"{disk.percent}%")) log_event("disk_high", f"Disk usage at {disk.percent}% (threshold: {thresholds.get('disk_percent', 90)}%)") # Auto-clean: journal logs, apt cache run_cmd("sudo journalctl --vacuum-time=3d 2>/dev/null") run_cmd("sudo apt-get autoremove -y 2>/dev/null") log_event("auto_clean", "Ran journal vacuum and apt autoremove", action="clean", success=True) else: log_event("disk_ok", f"Disk usage at {disk.percent}%", success=True) # Memory mem = psutil.virtual_memory() if mem.percent >= thresholds.get("memory_percent", 90): issues.append(("memory_high", f"{mem.percent}%")) log_event("memory_high", f"Memory usage at {mem.percent}%") # Clear caches run_cmd("sync && echo 3 | sudo tee /proc/sys/vm/drop_caches 2>/dev/null") log_event("auto_clean", "Dropped filesystem caches", action="drop_caches", success=True) else: log_event("memory_ok", f"Memory usage at {mem.percent}%", success=True) # CPU (1-min load avg) load_1, _, _ = psutil.getloadavg() cpu_count = psutil.cpu_count() cpu_pct = (load_1 / cpu_count) * 100 if cpu_pct >= thresholds.get("cpu_percent", 95): issues.append(("cpu_high", f"{cpu_pct:.0f}%")) log_event("cpu_high", f"CPU load at {cpu_pct:.0f}% (load avg: {load_1})") else: log_event("cpu_ok", f"CPU load at {cpu_pct:.0f}%", success=True) # Swap swap = psutil.swap_memory() if swap.percent >= thresholds.get("swap_percent", 80): issues.append(("swap_high", f"{swap.percent}%")) log_event("swap_high", f"Swap usage at {swap.percent}%") return issues def check_zombie_processes(): issues = [] zombies = [p for p in psutil.process_iter(['pid', 'name', 'status']) if p.info['status'] == psutil.STATUS_ZOMBIE] if len(zombies) > 5: issues.append(("zombies", f"{len(zombies)} zombie processes")) log_event("zombies", f"Found {len(zombies)} zombie processes") return issues def run_health_check(): config = load_config() print(f"\n🏥 Self-Healing Server Check — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) all_issues = [] print("\n📡 Services:") all_issues.extend(check_services(config)) print("\n🔌 Ports:") all_issues.extend(check_ports(config)) print("\n💾 Resources:") all_issues.extend(check_resources(config)) print("\n👻 Processes:") all_issues.extend(check_zombie_processes()) if not any(i[0] == "zombies" for i in all_issues): print(" ✅ No zombie process issues") # Summary print(f"\n{'=' * 60}") if all_issues: print(f"⚠️ {len(all_issues)} issue(s) found:") for itype, detail in all_issues: print(f" - {itype}: {detail}") else: print("✅ All systems healthy!") # Write status file status = { "last_check": datetime.now().isoformat(), "healthy": len(all_issues) == 0, "issues": [{"type": t, "detail": d} for t, d in all_issues], "disk_pct": psutil.disk_usage('/').percent, "mem_pct": psutil.virtual_memory().percent, "cpu_count": psutil.cpu_count(), } (DATA_DIR / "status.json").write_text(json.dumps(status, indent=2)) return all_issues def show_status(): status_file = DATA_DIR / "status.json" if not status_file.exists(): print("No status yet. Run a health check first.") return status = json.loads(status_file.read_text()) print(json.dumps(status, indent=2)) if __name__ == "__main__": if "--status" in sys.argv: show_status() elif "--daemon" in sys.argv: config = load_config() interval = config.get("check_interval_seconds", 300) print(f"🔄 Running in daemon mode (every {interval}s). Ctrl+C to stop.") while True: try: run_health_check() time.sleep(interval) except KeyboardInterrupt: print("\nStopped.") break else: run_health_check()