-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
134 lines (110 loc) · 4.74 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import ffmpeg
import whisper
from tqdm import tqdm
import yt_dlp
from slugify import slugify
import json
def download_youtube_video(url, download_path="downloads", username=None, password=None):
print("Downloading video from YouTube...")
ydl_opts = {
'format': 'bestvideo+bestaudio/best',
'outtmpl': os.path.join(download_path, '%(id)s.%(ext)s')
}
if username and password:
ydl_opts['username'] = username
ydl_opts['password'] = password
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=True)
video_id = info_dict.get('id')
ext = info_dict.get('ext', 'webm') # Default to webm if not found
downloaded_file = os.path.join(download_path, f"{video_id}.{ext}")
video_title = slugify(info_dict.get('title', 'video'))
new_file = os.path.join(download_path, f"{video_title}.mp4")
# Convert WebM to MP4 if necessary
# if ext == 'webm':
# convert_webm_to_mp4(downloaded_file, new_file)
# os.remove(downloaded_file) # Remove the original WebM file
# else:
# os.rename(downloaded_file, new_file)
os.rename(downloaded_file, new_file)
print("Download and conversion completed.")
return new_file, video_title
def extract_audio_with_ffmpeg(video_path, audio_path):
print("Extracting audio from video...")
input_stream = ffmpeg.input(video_path)
output_stream = ffmpeg.output(input_stream, audio_path, format='wav', acodec='pcm_s16le', ac=1, ar='16000')
ffmpeg.run(output_stream, overwrite_output=True)
print("Audio extraction completed.")
def split_audio(audio, segment_length=30, sample_rate=16000):
segments = []
audio_length = len(audio) // sample_rate
for start in range(0, audio_length, segment_length):
end = min(start + segment_length, audio_length)
segment = audio[start * sample_rate:end * sample_rate]
segments.append((segment, start, end))
return segments
def convert_audio_to_text_whisper(model, audio_path, output_format):
print("Transcribing audio...")
# load audio
audio = whisper.load_audio(audio_path)
audio_length = len(audio) / whisper.audio.SAMPLE_RATE
# split audio into 30-second segments
segments = split_audio(audio, segment_length=30)
# initialize the text result
result_list = []
result_txt = ''
for segment, start, end in tqdm(segments, desc="Transcribing segments"):
# check if segment shape is correct
if len(segment) < 16000 * 30:
segment = whisper.pad_or_trim(segment)
# make log-Mel spectrogram and move to the same device as the model
mel = whisper.log_mel_spectrogram(segment).to(model.device)
# detect the spoken language
_, probs = model.detect_language(mel)
detected_language = max(probs, key=probs.get)
print(f"Detected language: {detected_language}")
# decode the audio
options = whisper.DecodingOptions()
result = whisper.decode(model, mel, options)
if output_format == 'txt':
result_txt += ' ' + result.text.strip()
else:
result_list.append({
"start": format_timestamp(start),
"end": format_timestamp(end),
"text": result.text.strip()
})
print("Transcription completed.")
if output_format == 'txt':
return result_txt
else:
return result_list
def format_timestamp(seconds):
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
milliseconds = (seconds - int(seconds)) * 1000
return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02},{int(milliseconds):03}"
def save_srt_to_file(data, file_path):
with open(file_path, 'w', encoding='utf-8') as file:
for i, entry in enumerate(data, 1):
start_time = format_timestamp(entry["start"])
end_time = format_timestamp(entry["end"])
text = entry["text"]
file.write(f"{i}\n{start_time} --> {end_time}\n{text}\n\n")
print(f"SRT saved to file: {file_path}")
def save_json_to_file(data, file_path):
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
print(f"JSON saved to file: {file_path}")
def save_txt_to_file(data, file_path):
with open(file_path, "w") as outfile:
outfile.write(data)
print(f"TXT saved to file: {file_path}")
def convert_webm_to_mp4(input_path, output_path):
try:
ffmpeg.input(input_path).output(output_path).run(overwrite_output=True)
print(f"Conversion completed: {output_path}")
except ffmpeg.Error as e:
print(f"Error occurred: {e.stderr.decode()}")