Files
workspace/projects/market-watch/scanner.py
Case be43231c3f Market Watch: multiplayer GARP paper trading simulator
- Game engine with multiplayer support (create games, join, leaderboard)
- GARP stock screener (S&P 500 + 400 MidCap, 900+ tickers)
- Automated trading logic for AI player (Case)
- Web portal at marketwatch.local:8889 with dark theme
- Systemd timer for Mon-Fri market hours
- Telegram alerts on trades and daily summary
- Stock analysis deep dive data (BAC, CFG, FITB, INCY)
- Expanded scan results (22 GARP candidates)
- Craigslist account setup + credentials
2026-02-08 15:18:41 -06:00

234 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""GARP stock scanner - scans S&P 500 + S&P 400 MidCap for growth-at-reasonable-price candidates."""
import json
import os
import re
import sys
import time
from datetime import date, datetime
import numpy as np
import requests
import yfinance as yf
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
SCANS_DIR = os.path.join(DATA_DIR, "scans")
TICKERS_CACHE = os.path.join(DATA_DIR, "tickers.json")
HEADERS = {"User-Agent": "MarketWatch/1.0 (paper trading bot; contact: case-lgn@protonmail.com)"}
def _scrape_tickers(url):
"""Scrape tickers from a Wikipedia S&P constituents page."""
import io
import pandas as pd
resp = requests.get(url, timeout=30, headers=HEADERS)
tables = pd.read_html(io.StringIO(resp.text))
if tables:
df = tables[0]
col = "Symbol" if "Symbol" in df.columns else df.columns[0]
tickers = df[col].astype(str).str.strip().tolist()
tickers = [t.replace(".", "-") for t in tickers if re.match(r'^[A-Z]{1,5}(\.[A-Z])?$', t.replace("-", "."))]
return tickers
return []
def get_sp500_tickers():
return _scrape_tickers("https://en.wikipedia.org/wiki/List_of_S%26P_500_companies")
def get_sp400_tickers():
return _scrape_tickers("https://en.wikipedia.org/wiki/List_of_S%26P_400_companies")
def get_all_tickers(use_cache=True):
"""Get combined ticker list, with caching."""
if use_cache and os.path.exists(TICKERS_CACHE):
cache = json.loads(open(TICKERS_CACHE).read())
# Use cache if less than 7 days old
cached_date = cache.get("date", "")
if cached_date and (date.today() - date.fromisoformat(cached_date)).days < 7:
return cache["tickers"]
print("Fetching ticker lists from Wikipedia...")
sp500 = get_sp500_tickers()
print(f" S&P 500: {len(sp500)} tickers")
sp400 = get_sp400_tickers()
print(f" S&P 400: {len(sp400)} tickers")
all_tickers = sorted(set(sp500 + sp400))
os.makedirs(DATA_DIR, exist_ok=True)
with open(TICKERS_CACHE, "w") as f:
json.dump({"date": date.today().isoformat(), "tickers": all_tickers, "sp500": len(sp500), "sp400": len(sp400)}, f)
print(f" Combined: {len(all_tickers)} unique tickers")
return all_tickers
def compute_rsi(prices, period=14):
"""Compute RSI from a price series."""
if len(prices) < period + 1:
return None
deltas = np.diff(prices)
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains[-period:])
avg_loss = np.mean(losses[-period:])
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
return round(100 - (100 / (1 + rs)), 2)
def scan_ticker(ticker):
"""Evaluate a single ticker against GARP criteria. Returns dict or None."""
try:
stock = yf.Ticker(ticker)
info = stock.info
if not info or info.get("regularMarketPrice") is None:
return None
# Market cap filter
market_cap = info.get("marketCap", 0)
if not market_cap or market_cap < 5e9:
return None
# P/E filters
trailing_pe = info.get("trailingPE")
forward_pe = info.get("forwardPE")
if trailing_pe is None or trailing_pe <= 0 or trailing_pe >= 25:
return None
if forward_pe is None or forward_pe <= 0 or forward_pe >= 15:
return None
# Revenue growth
revenue_growth = info.get("revenueGrowth")
if revenue_growth is None or revenue_growth < 0.10:
return None
# EPS growth (earnings growth)
earnings_growth = info.get("earningsGrowth")
if earnings_growth is None or earnings_growth < 0.15:
return None
# ROE
roe = info.get("returnOnEquity")
if roe is None or roe < 0.05:
return None
# Optional filters (don't disqualify if unavailable)
peg = info.get("pegRatio")
if peg is not None and peg > 1.2:
return None
quick_ratio = info.get("quickRatio")
if quick_ratio is not None and quick_ratio < 1.5:
return None
de_ratio = info.get("debtToEquity")
if de_ratio is not None and de_ratio > 35:
return None
# Get price history for RSI and 52-week high
hist = stock.history(period="3mo")
if hist.empty or len(hist) < 20:
return None
closes = hist["Close"].values
current_price = closes[-1]
rsi = compute_rsi(closes)
# 52-week high
week52_high = info.get("fiftyTwoWeekHigh", current_price)
pct_from_high = ((week52_high - current_price) / week52_high) * 100 if week52_high else 0
return {
"ticker": ticker,
"price": round(current_price, 2),
"market_cap": market_cap,
"market_cap_b": round(market_cap / 1e9, 1),
"trailing_pe": round(trailing_pe, 2),
"forward_pe": round(forward_pe, 2),
"peg_ratio": round(peg, 2) if peg else None,
"revenue_growth": round(revenue_growth * 100, 1),
"earnings_growth": round(earnings_growth * 100, 1),
"roe": round(roe * 100, 1),
"quick_ratio": round(quick_ratio, 2) if quick_ratio else None,
"debt_to_equity": round(de_ratio, 1) if de_ratio else None,
"rsi": rsi,
"week52_high": round(week52_high, 2) if week52_high else None,
"pct_from_52wk_high": round(pct_from_high, 1),
}
except Exception as e:
return None
def run_scan(batch_size=5, delay=1.0):
"""Run full GARP scan. Returns list of candidates sorted by score."""
tickers = get_all_tickers()
candidates = []
total = len(tickers)
print(f"\nScanning {total} tickers...")
for i in range(0, total, batch_size):
batch = tickers[i:i + batch_size]
for ticker in batch:
idx = i + batch.index(ticker) + 1
sys.stdout.write(f"\r [{idx}/{total}] Scanning {ticker}... ")
sys.stdout.flush()
result = scan_ticker(ticker)
if result:
candidates.append(result)
print(f"\n{ticker} passed GARP filter (PE={result['trailing_pe']}, FwdPE={result['forward_pe']}, RevGr={result['revenue_growth']}%)")
if i + batch_size < total:
time.sleep(delay)
print(f"\n\nScan complete: {len(candidates)} candidates from {total} tickers")
# Sort by a composite score: lower forward PE + higher earnings growth
for c in candidates:
# Simple ranking score: lower is better
c["score"] = c["forward_pe"] - (c["earnings_growth"] / 10) - (c["revenue_growth"] / 10)
candidates.sort(key=lambda x: x["score"])
# Save results
os.makedirs(SCANS_DIR, exist_ok=True)
scan_file = os.path.join(SCANS_DIR, f"{date.today().isoformat()}.json")
scan_data = {
"date": date.today().isoformat(),
"timestamp": datetime.now().isoformat(),
"total_scanned": total,
"candidates_found": len(candidates),
"candidates": candidates,
}
with open(scan_file, "w") as f:
json.dump(scan_data, f, indent=2)
print(f"Results saved to {scan_file}")
return candidates
def load_latest_scan():
"""Load the most recent scan results."""
if not os.path.exists(SCANS_DIR):
return None
files = sorted(f for f in os.listdir(SCANS_DIR) if f.endswith(".json"))
if not files:
return None
with open(os.path.join(SCANS_DIR, files[-1])) as f:
return json.load(f)
if __name__ == "__main__":
candidates = run_scan()
if candidates:
print(f"\nTop candidates:")
for c in candidates[:10]:
print(f" {c['ticker']:6s} Price=${c['price']:8.2f} PE={c['trailing_pe']:5.1f} FwdPE={c['forward_pe']:5.1f} "
f"RevGr={c['revenue_growth']:5.1f}% EPSGr={c['earnings_growth']:5.1f}% RSI={c['rsi']}")
else:
print("No candidates found matching GARP criteria.")