#!/usr/bin/env python3 """ Parse OpenClaw session transcripts and extract conversation turns """ import json import sys from typing import List, Dict, Any def extract_conversation_turns(session_file: str) -> List[Dict[str, str]]: """Extract conversation turns from a session file""" turns = [] current_user_message = None try: with open(session_file, 'r') as f: for line in f: try: data = json.loads(line.strip()) if data.get('type') == 'message' and 'message' in data: message = data['message'] role = message.get('role') content = message.get('content', []) if role == 'user': # Extract text content from user message text_content = [] for item in content: if item.get('type') == 'text': text_content.append(item.get('text', '')) current_user_message = '\n'.join(text_content) elif role == 'assistant' and current_user_message: # Extract text content from assistant message (skip tool calls) text_content = [] for item in content: if item.get('type') == 'text': text_content.append(item.get('text', '')) if text_content: assistant_text = '\n'.join(text_content) turns.append({ 'user': current_user_message, 'assistant': assistant_text, 'agent_id': 'case', 'session': 'main' }) current_user_message = None except json.JSONDecodeError: continue except Exception as e: print(f"Error reading session file: {e}", file=sys.stderr) return [] return turns def main(): if len(sys.argv) != 2: print("Usage: parse-session.py ") sys.exit(1) session_file = sys.argv[1] turns = extract_conversation_turns(session_file) # Get last 10 assistant turns last_turns = turns[-10:] if len(turns) >= 10 else turns # Output as JSON for the auto-memory hook for turn in last_turns: print(json.dumps(turn)) if __name__ == "__main__": main()