-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchopchop.py
114 lines (96 loc) · 5.23 KB
/
chopchop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from subprocess import call
import os
import shutil
import wave
import struct
from itertools import groupby
class ChopChop:
def __init__(self):
# Init Parameters
self.WORKING_DIRECTORY = "CACHE"
self.AUDIO_CHUNK_SIZE_MS = 200 # 200 ms
self.MINIMUM_SOUND_CHUNK_THRESHOLD = 3 # 600 ms
self.MINIMUM_VOLUME_THRESHOLD = 0.01
def jumpcut(self, input_file, output_file):
print("Creating Working Directory...")
self.create_working_directory()
print("Exporting Audio...")
self.export_audio(input_file)
print("Creating Audio Timestamp Map for Jumpcuts...")
self.create_jumpcut_timestamps(input_file, self.WORKING_DIRECTORY+"/exported_audio.wav")
print("Concatenating New Video...")
self.concatenate_video(output_file)
print("Removing Working Directory...")
self.remove_working_directory()
print("Done!")
def create_working_directory(self):
if not os.path.exists(self.WORKING_DIRECTORY):
os.makedirs(self.WORKING_DIRECTORY, exist_ok=True)
def export_audio(self, input_file):
command = "ffmpeg -hide_banner -loglevel panic -i {0} -ab 160k -ac 2 -vn {1}/exported_audio.wav".format(input_file, self.WORKING_DIRECTORY)
call(command, shell=True)
def create_jumpcut_timestamps(self, video_file, audio_file):
AUDIO_PARAMS = self.get_audio_params(audio_file)
AUDIO_AMPLITUDE = self.get_amplitude_from_file(audio_file)
# Create x ms chunks to smooth things out
framerate = AUDIO_PARAMS[2]
audio_frames_per_threshold_ms = int(framerate / (1000 / self.AUDIO_CHUNK_SIZE_MS))
AUDIO_AMPLITUDE = self.create_array_chunks(AUDIO_AMPLITUDE, audio_frames_per_threshold_ms)
# Group sound and silence
audio_silence_map = self.map_sound_and_silence(AUDIO_AMPLITUDE, self.MINIMUM_VOLUME_THRESHOLD)
grouped_audio_chunks = [list(j) for i, j in groupby(audio_silence_map)]
# Find and evaluate timestamps
full_audio_index = 0
timestamps = []
for i in range(len(grouped_audio_chunks)):
chunk_length = len(grouped_audio_chunks[i])
if(grouped_audio_chunks[i][0] == 1 and len(grouped_audio_chunks[i]) >= self.MINIMUM_SOUND_CHUNK_THRESHOLD):
timestamps.append((video_file, full_audio_index * self.AUDIO_CHUNK_SIZE_MS, (full_audio_index + chunk_length) * self.AUDIO_CHUNK_SIZE_MS))
full_audio_index += chunk_length
# Write Timestamp File
self.write_timestamp_file(timestamps)
def get_amplitude_from_file(self, audio_file):
wav = wave.open(audio_file, mode="r")
wavframes = wav.readframes(wav.getnframes())
# Convert from raw PCM (binary chunks) to short tuple. Change out "h" if not using 16-bit audio.
AUDIO_CONTENT = struct.unpack("{0}h".format(wav.getnframes() * wav.getnchannels()), wavframes)
# Result will be numbers in the range -32768 to 32767. Convert this to an amplitude (-1 to 1)
AUDIO_AMPLITUDE = [float(value) / pow(2, 15) for value in AUDIO_CONTENT]
if(wav.getnchannels() > 1):
# If stereo then just use the left channel for now (1::2 would be equivalent to the right channel)
AUDIO_AMPLITUDE = AUDIO_AMPLITUDE[0::2]
wav.close()
return (AUDIO_AMPLITUDE)
def map_sound_and_silence(self, AUDIO_AMPLITUDE, amplitude_threshold):
# Determine where there is sound, and where there is silence
has_audio = [0] * len(AUDIO_AMPLITUDE)
for i in range(len(AUDIO_AMPLITUDE)):
if(self.get_max_abs(AUDIO_AMPLITUDE[i]) >= amplitude_threshold):
has_audio[i] = 1
return has_audio
def write_timestamp_file(self, timestamps):
# Write timestamps to file
with open(self.WORKING_DIRECTORY + "/timestamps.txt", "w", encoding="utf-8") as timestamp_file:
for timestamp in timestamps:
video_file, starttime, endtime = timestamp
timestamp_file.write("file '../{0}'\n".format(video_file))
timestamp_file.write("inpoint {0}\n".format(str(starttime / 1000)))
timestamp_file.write("outpoint {0}\n".format(str(endtime / 1000)))
def get_audio_params(self, audio_file):
wav = wave.open(audio_file, mode="r")
params = wav.getparams()
wav.close()
return params
def get_max_abs(self, array):
maxv = max(array)
minv = min(array)
return max(maxv, -minv)
def create_array_chunks(self, array, chunk_length):
return [array[i:i + chunk_length] for i in range(0, len(array), chunk_length)]
def concatenate_video(self, output_file):
# Output packets may not be entirely contained in the selected timestamp interval. Therefore...
# Skip frames generated by the concat demuxer that are not contained in the selected interval by setting the select filter.
command = "ffmpeg -hide_banner -safe 0 -f concat -i ./{0}/timestamps.txt -vf select=concatdec_select -af aselect=concatdec_select,aresample=async=1 -qscale:v 3 {1}".format(self.WORKING_DIRECTORY, output_file)
call(command, shell=True)
def remove_working_directory(self):
shutil.rmtree(self.WORKING_DIRECTORY)