Add advanced tools - dashboard.py: Web dashboard on localhost:8080 - briefing.py: Morning briefing generator - scaffold.py: Project scaffolding (python, node, script, docs, experiment) - watcher.py: File change monitor - focus.py: Pomodoro-style focus timer

This commit is contained in:
2026-01-30 23:19:19 -06:00
parent 7123c4ccd2
commit b579251e90
5 changed files with 757 additions and 0 deletions

174
tools/watcher.py Executable file
View File

@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""
Simple file watcher - monitors workspace for changes and logs them.
Uses polling (no external dependencies).
"""
import os
import sys
import time
import json
from datetime import datetime
from pathlib import Path
from collections import defaultdict
WORKSPACE = Path("/home/wdjones/.openclaw/workspace")
STATE_FILE = WORKSPACE / ".watcher-state.json"
LOG_FILE = WORKSPACE / "memory" / "file-changes.log"
IGNORE_PATTERNS = {'.git', '__pycache__', 'node_modules', '.venv', 'venv', '.watcher-state.json'}
IGNORE_EXTENSIONS = {'.pyc', '.pyo', '.log'}
def should_ignore(path: Path) -> bool:
"""Check if path should be ignored."""
parts = path.parts
for pattern in IGNORE_PATTERNS:
if pattern in parts:
return True
if path.suffix in IGNORE_EXTENSIONS:
return True
return False
def get_file_state() -> dict:
"""Get current state of all files."""
state = {}
for root, dirs, files in os.walk(WORKSPACE):
# Filter directories
dirs[:] = [d for d in dirs if d not in IGNORE_PATTERNS and not d.startswith('.')]
for fname in files:
fpath = Path(root) / fname
if should_ignore(fpath):
continue
try:
stat = fpath.stat()
rel_path = str(fpath.relative_to(WORKSPACE))
state[rel_path] = {
'mtime': stat.st_mtime,
'size': stat.st_size,
}
except:
pass
return state
def load_state() -> dict:
"""Load previous state from file."""
if STATE_FILE.exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except:
pass
return {}
def save_state(state: dict):
"""Save current state to file."""
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def log_change(change_type: str, path: str):
"""Log a change to the log file."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{timestamp}] {change_type}: {path}\n"
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(line)
print(line.strip())
def diff_states(old: dict, new: dict) -> dict:
"""Compare states and return changes."""
changes = {'added': [], 'modified': [], 'deleted': []}
old_keys = set(old.keys())
new_keys = set(new.keys())
# Added files
for path in new_keys - old_keys:
changes['added'].append(path)
# Deleted files
for path in old_keys - new_keys:
changes['deleted'].append(path)
# Modified files
for path in old_keys & new_keys:
if old[path]['mtime'] != new[path]['mtime'] or old[path]['size'] != new[path]['size']:
changes['modified'].append(path)
return changes
def watch(interval: int = 5):
"""Watch for changes continuously."""
print(f"Watching {WORKSPACE}")
print(f"Interval: {interval}s")
print(f"Log: {LOG_FILE}")
print("Press Ctrl+C to stop\n")
state = get_file_state()
save_state(state)
try:
while True:
time.sleep(interval)
new_state = get_file_state()
changes = diff_states(state, new_state)
for path in changes['added']:
log_change("ADDED", path)
for path in changes['modified']:
log_change("MODIFIED", path)
for path in changes['deleted']:
log_change("DELETED", path)
if any(changes.values()):
save_state(new_state)
state = new_state
except KeyboardInterrupt:
print("\nStopped watching")
def check_once():
"""Check for changes once and exit."""
old_state = load_state()
new_state = get_file_state()
if not old_state:
print(f"First run - indexed {len(new_state)} files")
save_state(new_state)
return
changes = diff_states(old_state, new_state)
total = sum(len(v) for v in changes.values())
if total == 0:
print("No changes detected")
else:
print(f"Changes since last check:")
for path in changes['added']:
print(f" + {path}")
for path in changes['modified']:
print(f" ~ {path}")
for path in changes['deleted']:
print(f" - {path}")
save_state(new_state)
def main():
if len(sys.argv) > 1:
if sys.argv[1] == 'watch':
interval = int(sys.argv[2]) if len(sys.argv) > 2 else 5
watch(interval)
elif sys.argv[1] == 'check':
check_once()
else:
print("Usage:")
print(" watcher.py check - Check for changes once")
print(" watcher.py watch [interval] - Watch continuously")
else:
check_once()
if __name__ == "__main__":
main()