#!/usr/bin/env python3 """ Video-to-Knowledge Pipeline =========================== Extracts audio from video files, transcribes via Faster Whisper, generates a clean markdown document, and indexes into ChromaDB for RAG. Usage: # Single video python3 video-to-knowledge.py /path/to/video.mp4 # Directory (recursive) python3 video-to-knowledge.py /path/to/course/ # Custom output dir python3 video-to-knowledge.py /path/to/video.mp4 --output /path/to/output/ # Custom collection name python3 video-to-knowledge.py /path/to/video.mp4 --collection "real-estate-course" # Skip RAG indexing (just transcribe + markdown) python3 video-to-knowledge.py /path/to/video.mp4 --no-rag # Use a specific Whisper model size python3 video-to-knowledge.py /path/to/video.mp4 --model large-v3 """ import argparse import json import os import re import subprocess import sys import hashlib from pathlib import Path from datetime import timedelta # Defaults CHROMADB_HOST = "192.168.86.25" CHROMADB_PORT = 8000 OLLAMA_HOST = "192.168.86.40" OLLAMA_PORT = 11434 EMBED_MODEL = "nomic-embed-text" DEFAULT_COLLECTION = "video-knowledge" WHISPER_MODEL = "base.en" CHUNK_SIZE = 1000 # chars per RAG chunk CHUNK_OVERLAP = 200 VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.avi', '.mov', '.webm', '.flv', '.wmv', '.m4v', '.ts'} def log(msg): print(f" → {msg}") def extract_audio(video_path: Path, output_dir: Path) -> Path: """Extract audio from video as WAV (16kHz mono for Whisper).""" audio_path = output_dir / f"{video_path.stem}.wav" if audio_path.exists(): log(f"Audio already extracted: {audio_path.name}") return audio_path log(f"Extracting audio from {video_path.name}...") subprocess.run([ "ffmpeg", "-y", "-i", str(video_path), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", str(audio_path) ], capture_output=True, check=True) return audio_path def transcribe(audio_path: Path, model_size: str = WHISPER_MODEL) -> list[dict]: """Transcribe audio using faster-whisper. Returns list of segments.""" log(f"Transcribing with faster-whisper ({model_size})...") from faster_whisper import WhisperModel model = WhisperModel(model_size, device="cpu", compute_type="int8") segments_raw, info = model.transcribe(str(audio_path), beam_size=5) segments = [] for seg in segments_raw: segments.append({ "start": seg.start, "end": seg.end, "text": seg.text.strip() }) log(f"Transcribed {len(segments)} segments, language: {info.language} ({info.language_probability:.0%})") return segments def format_timestamp(seconds: float) -> str: """Format seconds as HH:MM:SS.""" td = timedelta(seconds=int(seconds)) hours, remainder = divmod(td.seconds, 3600) minutes, secs = divmod(remainder, 60) if hours: return f"{hours:02d}:{minutes:02d}:{secs:02d}" return f"{minutes:02d}:{secs:02d}" def segments_to_markdown(segments: list[dict], video_name: str, video_path: str) -> str: """Convert transcript segments into a clean, readable markdown document.""" lines = [ f"# {video_name}", f"", f"**Source:** `{video_path}` ", f"**Segments:** {len(segments)} ", f"**Duration:** {format_timestamp(segments[-1]['end']) if segments else 'N/A'}", f"", f"---", f"", ] # Group segments into ~2 minute chapters for readability chapter_duration = 120 # seconds current_chapter_start = 0 chapter_num = 1 chapter_text = [] for seg in segments: if seg["start"] >= current_chapter_start + chapter_duration and chapter_text: # Write chapter ts = format_timestamp(current_chapter_start) lines.append(f"## [{ts}] Section {chapter_num}") lines.append("") lines.append(" ".join(chapter_text)) lines.append("") chapter_num += 1 current_chapter_start = seg["start"] chapter_text = [] chapter_text.append(seg["text"]) # Final chapter if chapter_text: ts = format_timestamp(current_chapter_start) lines.append(f"## [{ts}] Section {chapter_num}") lines.append("") lines.append(" ".join(chapter_text)) lines.append("") return "\n".join(lines) def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: """Split text into overlapping chunks for RAG indexing.""" chunks = [] start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end] if chunk.strip(): chunks.append(chunk.strip()) start = end - overlap return chunks def get_embedding(text: str) -> list[float]: """Get embedding from Ollama.""" import requests resp = requests.post( f"http://{OLLAMA_HOST}:{OLLAMA_PORT}/api/embed", json={"model": EMBED_MODEL, "input": text}, timeout=30 ) resp.raise_for_status() data = resp.json() # Handle both single and batch responses if "embeddings" in data: return data["embeddings"][0] return data["embedding"] def index_to_chromadb(chunks: list[str], video_name: str, video_path: str, collection_name: str): """Index text chunks into ChromaDB.""" import chromadb log(f"Connecting to ChromaDB at {CHROMADB_HOST}:{CHROMADB_PORT}...") client = chromadb.HttpClient(host=CHROMADB_HOST, port=CHROMADB_PORT) collection = client.get_or_create_collection( name=collection_name, metadata={"hnsw:space": "cosine"} ) # Generate a stable ID prefix from video path video_hash = hashlib.md5(video_path.encode()).hexdigest()[:8] log(f"Indexing {len(chunks)} chunks into collection '{collection_name}'...") batch_size = 20 for i in range(0, len(chunks), batch_size): batch = chunks[i:i + batch_size] ids = [f"{video_hash}-{i + j}" for j in range(len(batch))] embeddings = [get_embedding(chunk) for chunk in batch] metadatas = [{ "source": video_path, "video": video_name, "chunk_index": i + j, "total_chunks": len(chunks) } for j in range(len(batch))] collection.upsert( ids=ids, embeddings=embeddings, documents=batch, metadatas=metadatas ) log(f" Indexed {min(i + batch_size, len(chunks))}/{len(chunks)} chunks") log(f"āœ… Indexed into '{collection_name}' (total docs: {collection.count()})") def process_video(video_path: Path, output_dir: Path, collection: str, model_size: str, skip_rag: bool) -> dict: """Full pipeline for a single video.""" video_name = video_path.stem print(f"\n{'='*60}") print(f"šŸ“¹ Processing: {video_path.name}") print(f"{'='*60}") # Create output subdir mirroring source structure vid_output = output_dir / video_name vid_output.mkdir(parents=True, exist_ok=True) # 1. Extract audio audio_path = extract_audio(video_path, vid_output) # 2. Transcribe segments = transcribe(audio_path, model_size) # 3. Save raw transcript JSON transcript_path = vid_output / f"{video_name}_transcript.json" with open(transcript_path, "w") as f: json.dump(segments, f, indent=2) log(f"Saved transcript: {transcript_path.name}") # 4. Generate markdown markdown = segments_to_markdown(segments, video_name, str(video_path)) md_path = vid_output / f"{video_name}.md" with open(md_path, "w") as f: f.write(markdown) log(f"Saved markdown: {md_path.name}") # 5. Index to RAG if not skip_rag: full_text = " ".join(seg["text"] for seg in segments) chunks = chunk_text(full_text) try: index_to_chromadb(chunks, video_name, str(video_path), collection) except Exception as e: log(f"āš ļø RAG indexing failed: {e}") log("Transcript and markdown were still saved successfully.") else: log("Skipping RAG indexing (--no-rag)") # Clean up audio (large file) if audio_path.exists(): audio_path.unlink() log("Cleaned up extracted audio") return { "video": str(video_path), "segments": len(segments), "markdown": str(md_path), "transcript": str(transcript_path) } def find_videos(path: Path) -> list[Path]: """Find all video files in path (recursive if directory).""" if path.is_file(): if path.suffix.lower() in VIDEO_EXTENSIONS: return [path] else: print(f"āŒ Not a recognized video file: {path}") return [] videos = [] for ext in VIDEO_EXTENSIONS: videos.extend(path.rglob(f"*{ext}")) videos.extend(path.rglob(f"*{ext.upper()}")) videos = sorted(set(videos)) return videos def main(): parser = argparse.ArgumentParser( description="Video-to-Knowledge Pipeline: Transcribe videos → Markdown + RAG", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument("input", help="Video file or directory to process") parser.add_argument("--output", "-o", help="Output directory (default: ./video-knowledge-output/)") parser.add_argument("--collection", "-c", default=DEFAULT_COLLECTION, help=f"ChromaDB collection name (default: {DEFAULT_COLLECTION})") parser.add_argument("--model", "-m", default=WHISPER_MODEL, help=f"Whisper model size (default: {WHISPER_MODEL})") parser.add_argument("--no-rag", action="store_true", help="Skip ChromaDB indexing (just transcribe + markdown)") parser.add_argument("--chunk-size", type=int, default=CHUNK_SIZE, help=f"RAG chunk size in chars (default: {CHUNK_SIZE})") args = parser.parse_args() input_path = Path(args.input).resolve() if not input_path.exists(): print(f"āŒ Path not found: {input_path}") sys.exit(1) output_dir = Path(args.output) if args.output else Path("./video-knowledge-output") output_dir.mkdir(parents=True, exist_ok=True) videos = find_videos(input_path) if not videos: print(f"āŒ No video files found in: {input_path}") sys.exit(1) print(f"šŸŽ¬ Found {len(videos)} video(s) to process") print(f"šŸ“‚ Output: {output_dir}") print(f"🧠 Collection: {args.collection}") print(f"šŸŽ™ļø Whisper model: {args.model}") results = [] for video in videos: try: result = process_video(video, output_dir, args.collection, args.model, args.no_rag) results.append(result) except Exception as e: print(f"āŒ Failed on {video.name}: {e}") results.append({"video": str(video), "error": str(e)}) # Summary print(f"\n{'='*60}") print(f"āœ… COMPLETE — {len([r for r in results if 'error' not in r])}/{len(results)} videos processed") print(f"{'='*60}") for r in results: if "error" in r: print(f" āŒ {Path(r['video']).name}: {r['error']}") else: print(f" āœ… {Path(r['video']).name}: {r['segments']} segments → {r['markdown']}") # Save manifest manifest_path = output_dir / "manifest.json" with open(manifest_path, "w") as f: json.dump(results, f, indent=2) print(f"\nšŸ“‹ Manifest: {manifest_path}") if __name__ == "__main__": main()