From b579251e90c2fe5c443a46a5a4b82c836636a9cd Mon Sep 17 00:00:00 2001 From: Case Date: Fri, 30 Jan 2026 23:19:19 -0600 Subject: [PATCH] Add advanced tools - dashboard.py: Web dashboard on localhost:8080 - briefing.py: Morning briefing generator - scaffold.py: Project scaffolding (python, node, script, docs, experiment) - watcher.py: File change monitor - focus.py: Pomodoro-style focus timer --- tools/briefing.py | 132 ++++++++++++++++++++++++++++++++ tools/dashboard.py | 187 +++++++++++++++++++++++++++++++++++++++++++++ tools/focus.py | 137 +++++++++++++++++++++++++++++++++ tools/scaffold.py | 127 ++++++++++++++++++++++++++++++ tools/watcher.py | 174 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 757 insertions(+) create mode 100755 tools/briefing.py create mode 100755 tools/dashboard.py create mode 100755 tools/focus.py create mode 100755 tools/scaffold.py create mode 100755 tools/watcher.py diff --git a/tools/briefing.py b/tools/briefing.py new file mode 100755 index 0000000..b344772 --- /dev/null +++ b/tools/briefing.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +Morning briefing generator - creates a daily summary for the human. +""" + +import os +import json +from datetime import datetime, timedelta +from pathlib import Path + +WORKSPACE = Path("/home/wdjones/.openclaw/workspace") + +def get_weather_stub(): + """Placeholder for weather - would need API.""" + return "β˜€οΈ Weather check not configured" + +def get_tasks_summary(): + """Get task summary.""" + tasks_file = WORKSPACE / "TASKS.md" + if not tasks_file.exists(): + return None + + summary = {'in_progress': [], 'waiting': [], 'inbox': []} + section = None + + with open(tasks_file) as f: + for line in f: + if "## Inbox" in line: section = 'inbox' + elif "## In Progress" in line: section = 'in_progress' + elif "## Waiting" in line: section = 'waiting' + elif "## Done" in line: section = None + elif section and line.strip().startswith("- ["): + task = line.strip()[6:].strip() # Remove "- [ ] " + if task: + summary[section].append(task) + + return summary + +def get_yesterday_notes(): + """Get yesterday's notes summary.""" + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + yesterday_file = WORKSPACE / "memory" / f"{yesterday}.md" + + if not yesterday_file.exists(): + return None + + content = yesterday_file.read_text() + lines = [l.strip() for l in content.split('\n') if l.strip() and not l.startswith('#')] + return lines[:5] # First 5 non-header lines + +def get_inbox_items(): + """Check inbox for pending items.""" + inbox_dir = WORKSPACE / "inbox" + if not inbox_dir.exists(): + return [] + + items = [] + for f in inbox_dir.iterdir(): + if f.is_file() and not f.name.startswith('.'): + items.append(f.name) + return items + +def get_clips_recent(): + """Get recent clips.""" + clips_file = WORKSPACE / "docs" / "clips.json" + if not clips_file.exists(): + return [] + + try: + with open(clips_file) as f: + clips = json.load(f) + return clips[:3] + except: + return [] + +def generate_briefing(): + """Generate the morning briefing.""" + now = datetime.now() + + lines = [] + lines.append(f"# πŸ“‹ Briefing - {now.strftime('%A, %B %d, %Y')}") + lines.append(f"Generated at {now.strftime('%H:%M')}") + lines.append("") + + # Tasks + tasks = get_tasks_summary() + if tasks: + lines.append("## 🎯 Tasks") + if tasks['in_progress']: + lines.append(f"**In Progress ({len(tasks['in_progress'])}):**") + for t in tasks['in_progress'][:3]: + lines.append(f" β€’ {t[:60]}...") + if tasks['waiting']: + lines.append(f"**Waiting ({len(tasks['waiting'])}):** {len(tasks['waiting'])} items") + if tasks['inbox']: + lines.append(f"**Inbox ({len(tasks['inbox'])}):** {len(tasks['inbox'])} items to triage") + lines.append("") + + # Yesterday + yesterday = get_yesterday_notes() + if yesterday: + lines.append("## πŸ“ Yesterday") + for note in yesterday[:3]: + lines.append(f" β€’ {note[:60]}") + lines.append("") + + # Inbox + inbox = get_inbox_items() + if inbox: + lines.append("## πŸ“¬ Inbox") + lines.append(f"{len(inbox)} items waiting: {', '.join(inbox[:3])}") + lines.append("") + + # Clips + clips = get_clips_recent() + if clips: + lines.append("## πŸ“Ž Recent Clips") + for clip in clips: + lines.append(f" β€’ {clip.get('title', 'Untitled')}") + lines.append("") + + lines.append("---") + lines.append("*What's the focus today?*") + + return "\n".join(lines) + +def main(): + briefing = generate_briefing() + print(briefing) + +if __name__ == "__main__": + main() diff --git a/tools/dashboard.py b/tools/dashboard.py new file mode 100755 index 0000000..59e3130 --- /dev/null +++ b/tools/dashboard.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +Simple workspace dashboard - serves a status page on localhost. +""" + +import http.server +import socketserver +import json +import os +from datetime import datetime, timedelta +from pathlib import Path +from urllib.parse import parse_qs, urlparse + +WORKSPACE = Path("/home/wdjones/.openclaw/workspace") +PORT = 8080 + +def get_workspace_stats(): + """Gather workspace statistics.""" + stats = { + 'generated': datetime.now().isoformat(), + 'files': {'total': 0, 'by_type': {}}, + 'recent': [], + 'tasks': {'inbox': 0, 'in_progress': 0, 'waiting': 0, 'done_today': 0}, + 'memory': {'today': None, 'days_logged': 0} + } + + # Count files + for root, dirs, files in os.walk(WORKSPACE): + dirs[:] = [d for d in dirs if not d.startswith('.')] + for fname in files: + if fname.startswith('.'): + continue + stats['files']['total'] += 1 + ext = Path(fname).suffix or 'no-ext' + stats['files']['by_type'][ext] = stats['files']['by_type'].get(ext, 0) + 1 + + # Track recent files + fpath = Path(root) / fname + try: + mtime = datetime.fromtimestamp(fpath.stat().st_mtime) + if datetime.now() - mtime < timedelta(hours=24): + rel = str(fpath.relative_to(WORKSPACE)) + stats['recent'].append({'path': rel, 'modified': mtime.isoformat()}) + except: + pass + + stats['recent'] = sorted(stats['recent'], key=lambda x: x['modified'], reverse=True)[:10] + + # Parse tasks + tasks_file = WORKSPACE / "TASKS.md" + if tasks_file.exists(): + section = None + today = datetime.now().strftime("%Y-%m-%d") + with open(tasks_file) as f: + for line in f: + if "## Inbox" in line: section = 'inbox' + elif "## In Progress" in line: section = 'in_progress' + elif "## Waiting" in line: section = 'waiting' + elif "## Done" in line: section = 'done' + elif line.strip().startswith("- ["): + if section == 'done' and today in line: + stats['tasks']['done_today'] += 1 + elif section and section != 'done': + stats['tasks'][section] += 1 + + # Memory stats + memory_dir = WORKSPACE / "memory" + if memory_dir.exists(): + days = [f for f in memory_dir.iterdir() if f.suffix == '.md'] + stats['memory']['days_logged'] = len(days) + today_file = memory_dir / f"{datetime.now().strftime('%Y-%m-%d')}.md" + if today_file.exists(): + stats['memory']['today'] = len(today_file.read_text().split('\n')) + + return stats + +def generate_html(stats): + """Generate dashboard HTML.""" + return f""" + + + Workspace Dashboard + + + + +

πŸ–€ Case Dashboard

+

Generated {stats['generated'][:19]}

+ +
+
+

πŸ“ Files

+
{stats['files']['total']}
+
total files in workspace
+
+ {''.join(f'{ext}: {count}' for ext, count in sorted(stats['files']['by_type'].items(), key=lambda x: -x[1])[:5])} +
+
+ +
+

βœ… Tasks

+
+
{stats['tasks']['in_progress']}
in progress
+
{stats['tasks']['waiting']}
waiting
+
{stats['tasks']['inbox']}
inbox
+
{stats['tasks']['done_today']}
done today
+
+
+ +
+

πŸ“ Memory

+
{stats['memory']['days_logged']}
+
days logged
+
+ Today: {stats['memory']['today'] or 0} lines +
+
+ +
+

πŸ• Recent Activity

+
    + {''.join(f"
  • {f['path']}
  • " for f in stats['recent'][:8]) or '
  • No recent activity
  • '} +
+
+
+ +""" + +class DashboardHandler(http.server.SimpleHTTPRequestHandler): + def do_GET(self): + if self.path == '/' or self.path == '/dashboard': + stats = get_workspace_stats() + html = generate_html(stats) + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + self.wfile.write(html.encode()) + elif self.path == '/api/stats': + stats = get_workspace_stats() + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(stats, indent=2).encode()) + else: + self.send_error(404) + + def log_message(self, format, *args): + pass # Suppress logs + +def main(): + with socketserver.TCPServer(("", PORT), DashboardHandler) as httpd: + print(f"Dashboard running at http://localhost:{PORT}") + print("Press Ctrl+C to stop") + try: + httpd.serve_forever() + except KeyboardInterrupt: + print("\nShutting down...") + +if __name__ == "__main__": + main() diff --git a/tools/focus.py b/tools/focus.py new file mode 100755 index 0000000..f5bce1a --- /dev/null +++ b/tools/focus.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +Focus timer - simple pomodoro-style timer with logging. +""" + +import sys +import time +import json +from datetime import datetime +from pathlib import Path + +WORKSPACE = Path("/home/wdjones/.openclaw/workspace") +LOG_FILE = WORKSPACE / "memory" / "focus-log.json" + +def load_log() -> list: + """Load focus session log.""" + if LOG_FILE.exists(): + try: + with open(LOG_FILE) as f: + return json.load(f) + except: + return [] + return [] + +def save_log(log: list): + """Save focus session log.""" + LOG_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(LOG_FILE, 'w') as f: + json.dump(log, f, indent=2) + +def format_time(seconds: int) -> str: + """Format seconds as MM:SS.""" + mins, secs = divmod(seconds, 60) + return f"{mins:02d}:{secs:02d}" + +def timer(minutes: int, task: str = None): + """Run a focus timer.""" + total_seconds = minutes * 60 + remaining = total_seconds + + start_time = datetime.now() + print(f"\n🎯 Focus session started: {minutes} minutes") + if task: + print(f" Task: {task}") + print() + + try: + while remaining > 0: + print(f"\r ⏱️ {format_time(remaining)} remaining ", end='', flush=True) + time.sleep(1) + remaining -= 1 + + print(f"\r βœ… Session complete! ") + print("\nπŸ”” Time's up! Take a break.\n") + + # Log the session + log = load_log() + log.append({ + 'start': start_time.isoformat(), + 'end': datetime.now().isoformat(), + 'duration_minutes': minutes, + 'task': task, + 'completed': True + }) + save_log(log) + + except KeyboardInterrupt: + elapsed = total_seconds - remaining + print(f"\r ⏸️ Stopped after {format_time(elapsed)} ") + + if elapsed > 60: # Log if > 1 minute + log = load_log() + log.append({ + 'start': start_time.isoformat(), + 'end': datetime.now().isoformat(), + 'duration_minutes': round(elapsed / 60, 1), + 'task': task, + 'completed': False + }) + save_log(log) + print(" Session logged (partial)") + +def stats(): + """Show focus statistics.""" + log = load_log() + + if not log: + print("No focus sessions logged yet.") + return + + today = datetime.now().strftime("%Y-%m-%d") + today_sessions = [s for s in log if s['start'].startswith(today)] + + total_minutes = sum(s['duration_minutes'] for s in log) + today_minutes = sum(s['duration_minutes'] for s in today_sessions) + completed = len([s for s in log if s.get('completed')]) + + print("\nπŸ“Š Focus Stats") + print(f" Total sessions: {len(log)}") + print(f" Completed: {completed}") + print(f" Total time: {total_minutes:.0f} minutes ({total_minutes/60:.1f} hours)") + print(f" Today: {len(today_sessions)} sessions, {today_minutes:.0f} minutes") + + if today_sessions: + print("\n Today's sessions:") + for s in today_sessions[-5:]: + start = s['start'][11:16] + status = "βœ…" if s.get('completed') else "⏸️" + task = s.get('task', '')[:30] or 'No task' + print(f" {status} {start} - {s['duration_minutes']}min - {task}") + print() + +def main(): + if len(sys.argv) < 2: + print("Usage:") + print(" focus.py [task] - Start a focus session") + print(" focus.py stats - Show statistics") + print("\nExamples:") + print(" focus.py 25 'Write documentation'") + print(" focus.py 15") + sys.exit(0) + + if sys.argv[1] == 'stats': + stats() + return + + try: + minutes = int(sys.argv[1]) + except ValueError: + print("Minutes must be a number") + sys.exit(1) + + task = ' '.join(sys.argv[2:]) if len(sys.argv) > 2 else None + timer(minutes, task) + +if __name__ == "__main__": + main() diff --git a/tools/scaffold.py b/tools/scaffold.py new file mode 100755 index 0000000..e74f44e --- /dev/null +++ b/tools/scaffold.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Project scaffolding tool - creates project structures for different types. +""" + +import os +import sys +from pathlib import Path +from datetime import datetime + +WORKSPACE = Path("/home/wdjones/.openclaw/workspace") +PROJECTS = WORKSPACE / "projects" + +TEMPLATES = { + 'python': { + 'files': { + 'README.md': '# {name}\n\n{description}\n\n## Setup\n\n```bash\npython -m venv venv\nsource venv/bin/activate\npip install -r requirements.txt\n```\n\n## Usage\n\n```bash\npython main.py\n```\n', + 'main.py': '#!/usr/bin/env python3\n"""\n{name} - {description}\n"""\n\ndef main():\n print("Hello from {name}")\n\nif __name__ == "__main__":\n main()\n', + 'requirements.txt': '# Add dependencies here\n', + '.gitignore': 'venv/\n__pycache__/\n*.pyc\n.env\n*.egg-info/\ndist/\nbuild/\n', + }, + 'dirs': ['src', 'tests'], + }, + 'node': { + 'files': { + 'README.md': '# {name}\n\n{description}\n\n## Setup\n\n```bash\npnpm install\n```\n\n## Usage\n\n```bash\npnpm start\n```\n', + 'package.json': '{{\n "name": "{name}",\n "version": "0.1.0",\n "description": "{description}",\n "main": "index.js",\n "scripts": {{\n "start": "node index.js"\n }}\n}}\n', + 'index.js': '// {name}\n// {description}\n\nconsole.log("Hello from {name}");\n', + '.gitignore': 'node_modules/\n.env\ndist/\n', + }, + 'dirs': ['src', 'lib'], + }, + 'script': { + 'files': { + 'README.md': '# {name}\n\n{description}\n\n## Usage\n\n```bash\n./script.sh\n```\n', + 'script.sh': '#!/bin/bash\n# {name} - {description}\n\nset -euo pipefail\n\necho "Running {name}"\n', + }, + 'dirs': [], + }, + 'docs': { + 'files': { + 'README.md': '# {name}\n\n{description}\n\n## Contents\n\n- [Overview](overview.md)\n', + 'overview.md': '# Overview\n\nAdd documentation here.\n', + }, + 'dirs': ['images', 'guides'], + }, + 'experiment': { + 'files': { + 'README.md': '# {name}\n\n**Experiment started:** {date}\n\n## Hypothesis\n\n{description}\n\n## Method\n\n\n## Results\n\n\n## Conclusion\n\n', + 'notes.md': '# Experiment Notes\n\n## {date}\n\n- Started experiment\n', + }, + 'dirs': ['data', 'output'], + }, +} + +def scaffold(name: str, template: str = 'python', description: str = ''): + """Create a new project from template.""" + if template not in TEMPLATES: + print(f"Unknown template: {template}") + print(f"Available: {', '.join(TEMPLATES.keys())}") + return False + + project_dir = PROJECTS / name + if project_dir.exists(): + print(f"Project already exists: {name}") + return False + + tmpl = TEMPLATES[template] + date = datetime.now().strftime("%Y-%m-%d") + + # Create directories + project_dir.mkdir(parents=True) + for d in tmpl['dirs']: + (project_dir / d).mkdir() + + # Create files + for fname, content in tmpl['files'].items(): + fpath = project_dir / fname + formatted = content.format(name=name, description=description or 'No description', date=date) + fpath.write_text(formatted) + + # Make scripts executable + for f in project_dir.glob('*.sh'): + os.chmod(f, 0o755) + for f in project_dir.glob('*.py'): + if f.read_text().startswith('#!/'): + os.chmod(f, 0o755) + + print(f"βœ“ Created {template} project: {name}") + print(f" Location: {project_dir}") + print(f" Files: {len(tmpl['files'])}") + if tmpl['dirs']: + print(f" Dirs: {', '.join(tmpl['dirs'])}") + + return True + +def list_templates(): + """List available templates.""" + print("Available templates:\n") + for name, tmpl in TEMPLATES.items(): + files = ', '.join(tmpl['files'].keys()) + print(f" {name}:") + print(f" Files: {files}") + if tmpl['dirs']: + print(f" Dirs: {', '.join(tmpl['dirs'])}") + print() + +def main(): + if len(sys.argv) < 2: + print("Usage:") + print(" scaffold.py [template] [description]") + print(" scaffold.py --list") + print("\nTemplates: python, node, script, docs, experiment") + sys.exit(1) + + if sys.argv[1] == '--list': + list_templates() + return + + name = sys.argv[1] + template = sys.argv[2] if len(sys.argv) > 2 else 'python' + description = ' '.join(sys.argv[3:]) if len(sys.argv) > 3 else '' + + scaffold(name, template, description) + +if __name__ == "__main__": + main() diff --git a/tools/watcher.py b/tools/watcher.py new file mode 100755 index 0000000..6a5fea3 --- /dev/null +++ b/tools/watcher.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Simple file watcher - monitors workspace for changes and logs them. +Uses polling (no external dependencies). +""" + +import os +import sys +import time +import json +from datetime import datetime +from pathlib import Path +from collections import defaultdict + +WORKSPACE = Path("/home/wdjones/.openclaw/workspace") +STATE_FILE = WORKSPACE / ".watcher-state.json" +LOG_FILE = WORKSPACE / "memory" / "file-changes.log" + +IGNORE_PATTERNS = {'.git', '__pycache__', 'node_modules', '.venv', 'venv', '.watcher-state.json'} +IGNORE_EXTENSIONS = {'.pyc', '.pyo', '.log'} + +def should_ignore(path: Path) -> bool: + """Check if path should be ignored.""" + parts = path.parts + for pattern in IGNORE_PATTERNS: + if pattern in parts: + return True + if path.suffix in IGNORE_EXTENSIONS: + return True + return False + +def get_file_state() -> dict: + """Get current state of all files.""" + state = {} + for root, dirs, files in os.walk(WORKSPACE): + # Filter directories + dirs[:] = [d for d in dirs if d not in IGNORE_PATTERNS and not d.startswith('.')] + + for fname in files: + fpath = Path(root) / fname + if should_ignore(fpath): + continue + + try: + stat = fpath.stat() + rel_path = str(fpath.relative_to(WORKSPACE)) + state[rel_path] = { + 'mtime': stat.st_mtime, + 'size': stat.st_size, + } + except: + pass + + return state + +def load_state() -> dict: + """Load previous state from file.""" + if STATE_FILE.exists(): + try: + with open(STATE_FILE) as f: + return json.load(f) + except: + pass + return {} + +def save_state(state: dict): + """Save current state to file.""" + with open(STATE_FILE, 'w') as f: + json.dump(state, f) + +def log_change(change_type: str, path: str): + """Log a change to the log file.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"[{timestamp}] {change_type}: {path}\n" + + LOG_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(LOG_FILE, 'a') as f: + f.write(line) + + print(line.strip()) + +def diff_states(old: dict, new: dict) -> dict: + """Compare states and return changes.""" + changes = {'added': [], 'modified': [], 'deleted': []} + + old_keys = set(old.keys()) + new_keys = set(new.keys()) + + # Added files + for path in new_keys - old_keys: + changes['added'].append(path) + + # Deleted files + for path in old_keys - new_keys: + changes['deleted'].append(path) + + # Modified files + for path in old_keys & new_keys: + if old[path]['mtime'] != new[path]['mtime'] or old[path]['size'] != new[path]['size']: + changes['modified'].append(path) + + return changes + +def watch(interval: int = 5): + """Watch for changes continuously.""" + print(f"Watching {WORKSPACE}") + print(f"Interval: {interval}s") + print(f"Log: {LOG_FILE}") + print("Press Ctrl+C to stop\n") + + state = get_file_state() + save_state(state) + + try: + while True: + time.sleep(interval) + new_state = get_file_state() + changes = diff_states(state, new_state) + + for path in changes['added']: + log_change("ADDED", path) + for path in changes['modified']: + log_change("MODIFIED", path) + for path in changes['deleted']: + log_change("DELETED", path) + + if any(changes.values()): + save_state(new_state) + state = new_state + except KeyboardInterrupt: + print("\nStopped watching") + +def check_once(): + """Check for changes once and exit.""" + old_state = load_state() + new_state = get_file_state() + + if not old_state: + print(f"First run - indexed {len(new_state)} files") + save_state(new_state) + return + + changes = diff_states(old_state, new_state) + + total = sum(len(v) for v in changes.values()) + if total == 0: + print("No changes detected") + else: + print(f"Changes since last check:") + for path in changes['added']: + print(f" + {path}") + for path in changes['modified']: + print(f" ~ {path}") + for path in changes['deleted']: + print(f" - {path}") + + save_state(new_state) + +def main(): + if len(sys.argv) > 1: + if sys.argv[1] == 'watch': + interval = int(sys.argv[2]) if len(sys.argv) > 2 else 5 + watch(interval) + elif sys.argv[1] == 'check': + check_once() + else: + print("Usage:") + print(" watcher.py check - Check for changes once") + print(" watcher.py watch [interval] - Watch continuously") + else: + check_once() + +if __name__ == "__main__": + main()