-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathengine.py
129 lines (111 loc) · 4.93 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import pandas as pd
import os
import cv2
from moviepy.editor import VideoFileClip
import tempfile
from PyPDF2 import PdfReader
from utils.audio.audio_engine import AudioEngine
from utils.video.photo_engine import PhotoEngine
from utils.embeddings.embeddings_engine import EmbeddingsEngine
from utils.clustering.clustering_engine import ClusteringEngine
from utils.summarization.summary_engine import SummaryEngine
from utils.search.search_engine import SearchEngine
from PIL import Image
class VideoSearchEngine:
def __init__(self):
print("Initializing Audio Engine")
self.audio_engine = AudioEngine()
print("Initializing Photo Engine")
self.photo_engine = PhotoEngine("default")
print("Initializing Embeddings Engine")
self.embeddings_engine = EmbeddingsEngine("default")
print("Initializing Summary Engine")
self.summary_engine = SummaryEngine()
print("Initializing Chroma Search Engine")
self.search_engine = SearchEngine()
self.video_fragments_dir = "store"
os.makedirs(self.video_fragments_dir, exist_ok=True)
self.interval = 60
self.csv_filename = "extracted.csv"
self.load_existing_videos()
def load_existing_videos(self):
if os.path.exists(self.csv_filename):
df = pd.read_csv(self.csv_filename)
video_ids = df['Video ID'].tolist()
self.existing_videos = {video_id: 1 for video_id in video_ids}
else:
self.existing_videos = {}
def extract_frames(self, video_path, interval, fps):
print("Extracting Frames")
cap = cv2.VideoCapture(video_path)
frames = []
frame_count = 0
seconds_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count == int(seconds_count * fps):
frames.append((seconds_count, frame))
seconds_count += interval
frame_count += 1
cap.release()
return frames
def process_video(self, video_path):
video_clip = VideoFileClip(video_path)
fps = video_clip.fps
frames = self.extract_frames(video_path, self.interval, fps)
for seconds, frame in frames:
video_id = f"{video_path}::{seconds}"
if self.search_engine.exists_in_collection(video_id):
print(f"Skipping already indexed frame: {video_id}")
continue
description = self.photo_engine.describe_image(frame)
start_time = seconds
duration = self.interval
end_time = min(start_time + duration, video_clip.duration)
if end_time > start_time:
video_fragment = video_clip.subclip(start_time, end_time)
with tempfile.NamedTemporaryFile(delete=True, suffix='.wav') as temp_audio_file:
video_fragment.audio.write_audiofile(temp_audio_file.name)
transcription = self.audio_engine.transcribe(temp_audio_file.name)
summary = self.summary_engine.summarize(transcription)
concatenated_description = f"{description} {summary}"
self.search_engine.add(concatenated_description, seconds, video_path)
def process_image(self, image_path):
with Image.open(image_path) as img:
description = self.photo_engine.describe_image(img)
self.search_engine.add(description, 0, image_path)
def process_text(self, text_path):
with open(text_path, "r") as f:
text = f.read()
text_words = text.split()
text_groups = [text_words[i:i+100] for i in range(0, len(text_words), 100)]
for idx, group in enumerate(text_groups):
group_text = ' '.join(group)
print(group_text)
if group_text:
self.search_engine.add(group_text, idx * 100, text_path)
def process_pdf(self, pdf_path):
pdf = PdfReader(pdf_path)
for idx, page in enumerate(pdf.pages):
text = page.extract_text()
self.search_engine.add(text, idx, pdf_path)
def process_all_files(self, directory_path):
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
if file_path.endswith(".mp4"):
self.process_video(file_path)
elif file_path.endswith((".jpg", ".jpeg", ".png")):
self.process_image(file_path)
elif file_path.endswith((".md", ".txt")):
self.process_text(file_path)
elif file_path.endswith(".pdf"):
self.process_pdf(file_path)
else:
print(f"Skipping unsupported file type: {file_path}")
engine = VideoSearchEngine()
engine.process_all_files("input")