#!/usr/bin/env python3 """GARP stock scanner - scans S&P 500 + S&P 400 MidCap for growth-at-reasonable-price candidates.""" import json import os import re import sys import time from datetime import date, datetime import numpy as np import requests import yfinance as yf DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") SCANS_DIR = os.path.join(DATA_DIR, "scans") TICKERS_CACHE = os.path.join(DATA_DIR, "tickers.json") HEADERS = {"User-Agent": "MarketWatch/1.0 (paper trading bot; contact: case-lgn@protonmail.com)"} def _scrape_tickers(url): """Scrape tickers from a Wikipedia S&P constituents page.""" import io import pandas as pd resp = requests.get(url, timeout=30, headers=HEADERS) tables = pd.read_html(io.StringIO(resp.text)) if tables: df = tables[0] col = "Symbol" if "Symbol" in df.columns else df.columns[0] tickers = df[col].astype(str).str.strip().tolist() tickers = [t.replace(".", "-") for t in tickers if re.match(r'^[A-Z]{1,5}(\.[A-Z])?$', t.replace("-", "."))] return tickers return [] def get_sp500_tickers(): return _scrape_tickers("https://en.wikipedia.org/wiki/List_of_S%26P_500_companies") def get_sp400_tickers(): return _scrape_tickers("https://en.wikipedia.org/wiki/List_of_S%26P_400_companies") def get_all_tickers(use_cache=True): """Get combined ticker list, with caching.""" if use_cache and os.path.exists(TICKERS_CACHE): cache = json.loads(open(TICKERS_CACHE).read()) # Use cache if less than 7 days old cached_date = cache.get("date", "") if cached_date and (date.today() - date.fromisoformat(cached_date)).days < 7: return cache["tickers"] print("Fetching ticker lists from Wikipedia...") sp500 = get_sp500_tickers() print(f" S&P 500: {len(sp500)} tickers") sp400 = get_sp400_tickers() print(f" S&P 400: {len(sp400)} tickers") all_tickers = sorted(set(sp500 + sp400)) os.makedirs(DATA_DIR, exist_ok=True) with open(TICKERS_CACHE, "w") as f: json.dump({"date": date.today().isoformat(), "tickers": all_tickers, "sp500": len(sp500), "sp400": len(sp400)}, f) print(f" Combined: {len(all_tickers)} unique tickers") return all_tickers def compute_rsi(prices, period=14): """Compute RSI from a price series.""" if len(prices) < period + 1: return None deltas = np.diff(prices) gains = np.where(deltas > 0, deltas, 0) losses = np.where(deltas < 0, -deltas, 0) avg_gain = np.mean(gains[-period:]) avg_loss = np.mean(losses[-period:]) if avg_loss == 0: return 100.0 rs = avg_gain / avg_loss return round(100 - (100 / (1 + rs)), 2) def scan_ticker(ticker): """Evaluate a single ticker against GARP criteria. Returns dict or None.""" try: stock = yf.Ticker(ticker) info = stock.info if not info or info.get("regularMarketPrice") is None: return None # Market cap filter market_cap = info.get("marketCap", 0) if not market_cap or market_cap < 5e9: return None # P/E filters trailing_pe = info.get("trailingPE") forward_pe = info.get("forwardPE") if trailing_pe is None or trailing_pe <= 0 or trailing_pe >= 25: return None if forward_pe is None or forward_pe <= 0 or forward_pe >= 15: return None # Revenue growth revenue_growth = info.get("revenueGrowth") if revenue_growth is None or revenue_growth < 0.10: return None # EPS growth (earnings growth) earnings_growth = info.get("earningsGrowth") if earnings_growth is None or earnings_growth < 0.15: return None # ROE roe = info.get("returnOnEquity") if roe is None or roe < 0.05: return None # Optional filters (don't disqualify if unavailable) peg = info.get("pegRatio") if peg is not None and peg > 1.2: return None quick_ratio = info.get("quickRatio") if quick_ratio is not None and quick_ratio < 1.5: return None de_ratio = info.get("debtToEquity") if de_ratio is not None and de_ratio > 35: return None # Get price history for RSI and 52-week high hist = stock.history(period="3mo") if hist.empty or len(hist) < 20: return None closes = hist["Close"].values current_price = closes[-1] rsi = compute_rsi(closes) # 52-week high week52_high = info.get("fiftyTwoWeekHigh", current_price) pct_from_high = ((week52_high - current_price) / week52_high) * 100 if week52_high else 0 return { "ticker": ticker, "price": round(current_price, 2), "market_cap": market_cap, "market_cap_b": round(market_cap / 1e9, 1), "trailing_pe": round(trailing_pe, 2), "forward_pe": round(forward_pe, 2), "peg_ratio": round(peg, 2) if peg else None, "revenue_growth": round(revenue_growth * 100, 1), "earnings_growth": round(earnings_growth * 100, 1), "roe": round(roe * 100, 1), "quick_ratio": round(quick_ratio, 2) if quick_ratio else None, "debt_to_equity": round(de_ratio, 1) if de_ratio else None, "rsi": rsi, "week52_high": round(week52_high, 2) if week52_high else None, "pct_from_52wk_high": round(pct_from_high, 1), } except Exception as e: return None def run_scan(batch_size=5, delay=1.0): """Run full GARP scan. Returns list of candidates sorted by score.""" tickers = get_all_tickers() candidates = [] total = len(tickers) print(f"\nScanning {total} tickers...") for i in range(0, total, batch_size): batch = tickers[i:i + batch_size] for ticker in batch: idx = i + batch.index(ticker) + 1 sys.stdout.write(f"\r [{idx}/{total}] Scanning {ticker}... ") sys.stdout.flush() result = scan_ticker(ticker) if result: candidates.append(result) print(f"\n ✓ {ticker} passed GARP filter (PE={result['trailing_pe']}, FwdPE={result['forward_pe']}, RevGr={result['revenue_growth']}%)") if i + batch_size < total: time.sleep(delay) print(f"\n\nScan complete: {len(candidates)} candidates from {total} tickers") # Sort by a composite score: lower forward PE + higher earnings growth for c in candidates: # Simple ranking score: lower is better c["score"] = c["forward_pe"] - (c["earnings_growth"] / 10) - (c["revenue_growth"] / 10) candidates.sort(key=lambda x: x["score"]) # Save results os.makedirs(SCANS_DIR, exist_ok=True) scan_file = os.path.join(SCANS_DIR, f"{date.today().isoformat()}.json") scan_data = { "date": date.today().isoformat(), "timestamp": datetime.now().isoformat(), "total_scanned": total, "candidates_found": len(candidates), "candidates": candidates, } with open(scan_file, "w") as f: json.dump(scan_data, f, indent=2) print(f"Results saved to {scan_file}") return candidates def load_latest_scan(): """Load the most recent scan results.""" if not os.path.exists(SCANS_DIR): return None files = sorted(f for f in os.listdir(SCANS_DIR) if f.endswith(".json")) if not files: return None with open(os.path.join(SCANS_DIR, files[-1])) as f: return json.load(f) if __name__ == "__main__": candidates = run_scan() if candidates: print(f"\nTop candidates:") for c in candidates[:10]: print(f" {c['ticker']:6s} Price=${c['price']:8.2f} PE={c['trailing_pe']:5.1f} FwdPE={c['forward_pe']:5.1f} " f"RevGr={c['revenue_growth']:5.1f}% EPSGr={c['earnings_growth']:5.1f}% RSI={c['rsi']}") else: print("No candidates found matching GARP criteria.")