#!/usr/bin/env python3 import json import sys import subprocess from pathlib import Path def extract_text_content(content): """Extract text from message content, handling various content types.""" if isinstance(content, str): return content text_parts = [] if isinstance(content, list): for item in content: if isinstance(item, dict): if item.get('type') == 'text' and 'text' in item: text_parts.append(item['text']) elif item.get('type') == 'toolCall': # Include tool call descriptions for context tool_name = item.get('name', 'unknown') text_parts.append(f"[Used tool: {tool_name}]") elif isinstance(item, str): text_parts.append(item) return ' '.join(text_parts).strip() def process_session_file(filepath): """Process session file and extract user-assistant pairs.""" messages = [] try: with open(filepath, 'r') as f: for line in f: try: data = json.loads(line.strip()) if data.get('type') == 'message' and 'message' in data: msg = data['message'] role = msg.get('role') content = msg.get('content', '') if role in ['user', 'assistant']: text_content = extract_text_content(content) if text_content: # Only include non-empty messages messages.append({ 'role': role, 'content': text_content, 'timestamp': data.get('timestamp', '') }) except json.JSONDecodeError: continue except Exception as e: print(f"Error reading file: {e}", file=sys.stderr) return [] # Find user-assistant pairs and extract last 10 assistant turns pairs = [] current_user_msg = None for msg in messages: if msg['role'] == 'user': current_user_msg = msg['content'] elif msg['role'] == 'assistant' and current_user_msg: pairs.append({ 'user': current_user_msg, 'assistant': msg['content'] }) current_user_msg = None # Return last 10 pairs return pairs[-10:] if len(pairs) >= 10 else pairs def main(): if len(sys.argv) != 2: print("Usage: auto-memory-processor.py ", file=sys.stderr) sys.exit(1) session_file = sys.argv[1] pairs = process_session_file(session_file) print(f"Processing {len(pairs)} assistant turns from {session_file}", file=sys.stderr) # Pipe each pair to the auto-memory hook for pair in pairs: entry = { "user": pair['user'], "assistant": pair['assistant'], "agent_id": "case", "session": "main" } try: proc = subprocess.run([ 'python3', '/home/wdjones/.openclaw/workspace/tools/auto-memory-hook.py' ], input=json.dumps(entry), text=True, capture_output=True) if proc.returncode != 0: print(f"Warning: auto-memory-hook failed for one entry: {proc.stderr}", file=sys.stderr) except Exception as e: print(f"Error calling auto-memory-hook: {e}", file=sys.stderr) if __name__ == '__main__': main()