Add journal and backup tools - journal.py: Structured journaling with prompts (morning/evening) - backup.py: Workspace backup, restore, and data export - Updated ws CLI with new commands - 18 tools total now
This commit is contained in:
197
tools/backup.py
Executable file
197
tools/backup.py
Executable file
@ -0,0 +1,197 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
backup - Workspace backup and export
|
||||
|
||||
Create snapshots of the workspace for safekeeping.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tarfile
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
WORKSPACE = Path("/home/wdjones/.openclaw/workspace")
|
||||
BACKUP_DIR = Path.home() / ".openclaw" / "backups"
|
||||
|
||||
EXCLUDE_PATTERNS = {
|
||||
'.git',
|
||||
'__pycache__',
|
||||
'*.pyc',
|
||||
'node_modules',
|
||||
'.venv',
|
||||
'venv',
|
||||
}
|
||||
|
||||
def should_exclude(path: str) -> bool:
|
||||
"""Check if path should be excluded."""
|
||||
for pattern in EXCLUDE_PATTERNS:
|
||||
if pattern.startswith('*'):
|
||||
if path.endswith(pattern[1:]):
|
||||
return True
|
||||
elif pattern in path.split(os.sep):
|
||||
return True
|
||||
return False
|
||||
|
||||
def create_backup(output_dir: Path = None):
|
||||
"""Create a backup of the workspace."""
|
||||
if output_dir is None:
|
||||
output_dir = BACKUP_DIR
|
||||
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_name = f"workspace_{timestamp}"
|
||||
backup_path = output_dir / f"{backup_name}.tar.gz"
|
||||
|
||||
print(f"📦 Creating backup...")
|
||||
print(f" Source: {WORKSPACE}")
|
||||
print(f" Output: {backup_path}")
|
||||
|
||||
file_count = 0
|
||||
total_size = 0
|
||||
|
||||
with tarfile.open(backup_path, "w:gz") as tar:
|
||||
for root, dirs, files in os.walk(WORKSPACE):
|
||||
# Filter directories
|
||||
dirs[:] = [d for d in dirs if not should_exclude(d)]
|
||||
|
||||
for file in files:
|
||||
if should_exclude(file):
|
||||
continue
|
||||
|
||||
file_path = Path(root) / file
|
||||
arc_name = file_path.relative_to(WORKSPACE)
|
||||
|
||||
if not should_exclude(str(arc_name)):
|
||||
tar.add(file_path, arcname=arc_name)
|
||||
file_count += 1
|
||||
total_size += file_path.stat().st_size
|
||||
|
||||
backup_size = backup_path.stat().st_size / 1024 # KB
|
||||
|
||||
print(f"\n✓ Backup complete!")
|
||||
print(f" Files: {file_count}")
|
||||
print(f" Original: {total_size / 1024:.1f} KB")
|
||||
print(f" Compressed: {backup_size:.1f} KB")
|
||||
print(f" Ratio: {backup_size / (total_size / 1024) * 100:.0f}%")
|
||||
|
||||
return backup_path
|
||||
|
||||
def list_backups():
|
||||
"""List available backups."""
|
||||
if not BACKUP_DIR.exists():
|
||||
print("No backups found")
|
||||
return
|
||||
|
||||
backups = sorted(BACKUP_DIR.glob("workspace_*.tar.gz"), reverse=True)
|
||||
|
||||
if not backups:
|
||||
print("No backups found")
|
||||
return
|
||||
|
||||
print(f"\n📦 Backups ({len(backups)} total)")
|
||||
print("=" * 50)
|
||||
|
||||
for backup in backups[:10]:
|
||||
size = backup.stat().st_size / 1024
|
||||
mtime = datetime.fromtimestamp(backup.stat().st_mtime)
|
||||
print(f" {backup.name}")
|
||||
print(f" {mtime.strftime('%Y-%m-%d %H:%M')} | {size:.0f} KB")
|
||||
|
||||
def restore_backup(backup_path: str, target_dir: Path = None):
|
||||
"""Restore from a backup."""
|
||||
backup_file = Path(backup_path)
|
||||
|
||||
if not backup_file.exists():
|
||||
# Check in backup dir
|
||||
backup_file = BACKUP_DIR / backup_path
|
||||
if not backup_file.exists():
|
||||
print(f"Backup not found: {backup_path}")
|
||||
return
|
||||
|
||||
if target_dir is None:
|
||||
target_dir = WORKSPACE.parent / "workspace_restored"
|
||||
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f"📦 Restoring backup...")
|
||||
print(f" From: {backup_file}")
|
||||
print(f" To: {target_dir}")
|
||||
|
||||
with tarfile.open(backup_file, "r:gz") as tar:
|
||||
tar.extractall(target_dir)
|
||||
|
||||
print(f"\n✓ Restore complete!")
|
||||
print(f" Location: {target_dir}")
|
||||
|
||||
def export_data():
|
||||
"""Export key data files as JSON bundle."""
|
||||
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
export_path = BACKUP_DIR / f"data_export_{timestamp}.json"
|
||||
|
||||
data = {
|
||||
'exported': datetime.now().isoformat(),
|
||||
'workspace': str(WORKSPACE),
|
||||
'files': {}
|
||||
}
|
||||
|
||||
# Collect JSON data files
|
||||
data_files = [
|
||||
'data/habits.json',
|
||||
'data/timetrack.json',
|
||||
'data/wisdom.json',
|
||||
'inbox/captures.json',
|
||||
'docs/clips.json',
|
||||
]
|
||||
|
||||
for file_path in data_files:
|
||||
full_path = WORKSPACE / file_path
|
||||
if full_path.exists():
|
||||
try:
|
||||
with open(full_path) as f:
|
||||
data['files'][file_path] = json.load(f)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Also include memory files
|
||||
memory_dir = WORKSPACE / "memory"
|
||||
if memory_dir.exists():
|
||||
data['memory'] = {}
|
||||
for f in sorted(memory_dir.glob("*.md"))[-30:]: # Last 30 days
|
||||
data['memory'][f.name] = f.read_text()
|
||||
|
||||
with open(export_path, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
print(f"✓ Data exported to: {export_path}")
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage:")
|
||||
print(" backup create [dir] - Create workspace backup")
|
||||
print(" backup list - List backups")
|
||||
print(" backup restore <file> - Restore from backup")
|
||||
print(" backup export - Export data as JSON")
|
||||
list_backups()
|
||||
return
|
||||
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == 'create':
|
||||
output = Path(sys.argv[2]) if len(sys.argv) > 2 else None
|
||||
create_backup(output)
|
||||
elif cmd == 'list':
|
||||
list_backups()
|
||||
elif cmd == 'restore' and len(sys.argv) > 2:
|
||||
restore_backup(sys.argv[2])
|
||||
elif cmd == 'export':
|
||||
export_data()
|
||||
else:
|
||||
print("Unknown command. Run 'backup' for help.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user