175 lines
4.9 KiB
Python
Executable File
175 lines
4.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Simple file watcher - monitors workspace for changes and logs them.
|
|
Uses polling (no external dependencies).
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
WORKSPACE = Path("/home/wdjones/.openclaw/workspace")
|
|
STATE_FILE = WORKSPACE / ".watcher-state.json"
|
|
LOG_FILE = WORKSPACE / "memory" / "file-changes.log"
|
|
|
|
IGNORE_PATTERNS = {'.git', '__pycache__', 'node_modules', '.venv', 'venv', '.watcher-state.json'}
|
|
IGNORE_EXTENSIONS = {'.pyc', '.pyo', '.log'}
|
|
|
|
def should_ignore(path: Path) -> bool:
|
|
"""Check if path should be ignored."""
|
|
parts = path.parts
|
|
for pattern in IGNORE_PATTERNS:
|
|
if pattern in parts:
|
|
return True
|
|
if path.suffix in IGNORE_EXTENSIONS:
|
|
return True
|
|
return False
|
|
|
|
def get_file_state() -> dict:
|
|
"""Get current state of all files."""
|
|
state = {}
|
|
for root, dirs, files in os.walk(WORKSPACE):
|
|
# Filter directories
|
|
dirs[:] = [d for d in dirs if d not in IGNORE_PATTERNS and not d.startswith('.')]
|
|
|
|
for fname in files:
|
|
fpath = Path(root) / fname
|
|
if should_ignore(fpath):
|
|
continue
|
|
|
|
try:
|
|
stat = fpath.stat()
|
|
rel_path = str(fpath.relative_to(WORKSPACE))
|
|
state[rel_path] = {
|
|
'mtime': stat.st_mtime,
|
|
'size': stat.st_size,
|
|
}
|
|
except:
|
|
pass
|
|
|
|
return state
|
|
|
|
def load_state() -> dict:
|
|
"""Load previous state from file."""
|
|
if STATE_FILE.exists():
|
|
try:
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
except:
|
|
pass
|
|
return {}
|
|
|
|
def save_state(state: dict):
|
|
"""Save current state to file."""
|
|
with open(STATE_FILE, 'w') as f:
|
|
json.dump(state, f)
|
|
|
|
def log_change(change_type: str, path: str):
|
|
"""Log a change to the log file."""
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
line = f"[{timestamp}] {change_type}: {path}\n"
|
|
|
|
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(LOG_FILE, 'a') as f:
|
|
f.write(line)
|
|
|
|
print(line.strip())
|
|
|
|
def diff_states(old: dict, new: dict) -> dict:
|
|
"""Compare states and return changes."""
|
|
changes = {'added': [], 'modified': [], 'deleted': []}
|
|
|
|
old_keys = set(old.keys())
|
|
new_keys = set(new.keys())
|
|
|
|
# Added files
|
|
for path in new_keys - old_keys:
|
|
changes['added'].append(path)
|
|
|
|
# Deleted files
|
|
for path in old_keys - new_keys:
|
|
changes['deleted'].append(path)
|
|
|
|
# Modified files
|
|
for path in old_keys & new_keys:
|
|
if old[path]['mtime'] != new[path]['mtime'] or old[path]['size'] != new[path]['size']:
|
|
changes['modified'].append(path)
|
|
|
|
return changes
|
|
|
|
def watch(interval: int = 5):
|
|
"""Watch for changes continuously."""
|
|
print(f"Watching {WORKSPACE}")
|
|
print(f"Interval: {interval}s")
|
|
print(f"Log: {LOG_FILE}")
|
|
print("Press Ctrl+C to stop\n")
|
|
|
|
state = get_file_state()
|
|
save_state(state)
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(interval)
|
|
new_state = get_file_state()
|
|
changes = diff_states(state, new_state)
|
|
|
|
for path in changes['added']:
|
|
log_change("ADDED", path)
|
|
for path in changes['modified']:
|
|
log_change("MODIFIED", path)
|
|
for path in changes['deleted']:
|
|
log_change("DELETED", path)
|
|
|
|
if any(changes.values()):
|
|
save_state(new_state)
|
|
state = new_state
|
|
except KeyboardInterrupt:
|
|
print("\nStopped watching")
|
|
|
|
def check_once():
|
|
"""Check for changes once and exit."""
|
|
old_state = load_state()
|
|
new_state = get_file_state()
|
|
|
|
if not old_state:
|
|
print(f"First run - indexed {len(new_state)} files")
|
|
save_state(new_state)
|
|
return
|
|
|
|
changes = diff_states(old_state, new_state)
|
|
|
|
total = sum(len(v) for v in changes.values())
|
|
if total == 0:
|
|
print("No changes detected")
|
|
else:
|
|
print(f"Changes since last check:")
|
|
for path in changes['added']:
|
|
print(f" + {path}")
|
|
for path in changes['modified']:
|
|
print(f" ~ {path}")
|
|
for path in changes['deleted']:
|
|
print(f" - {path}")
|
|
|
|
save_state(new_state)
|
|
|
|
def main():
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == 'watch':
|
|
interval = int(sys.argv[2]) if len(sys.argv) > 2 else 5
|
|
watch(interval)
|
|
elif sys.argv[1] == 'check':
|
|
check_once()
|
|
else:
|
|
print("Usage:")
|
|
print(" watcher.py check - Check for changes once")
|
|
print(" watcher.py watch [interval] - Watch continuously")
|
|
else:
|
|
check_once()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|