Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multithreading #698

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions moviepy/Clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class Clip:
this case their duration will be ``None``.

"""

# prefix for all temporary video and audio files.
# You can overwrite it with
# You can overwrite it with
# >>> Clip._TEMP_FILES_PREFIX = "temp_"

_TEMP_FILES_PREFIX = 'TEMP_MPY_'
Expand All @@ -54,12 +54,14 @@ def __init__(self):
self.memoize = False
self.memoized_t = None
self.memoize_frame = None
self.generator = None
self.generator_args = {}



def copy(self):
""" Shallow copy of the clip.
""" Shallow copy of the clip.

Returns a shallow copy of the clip whose mask and audio will
be shallow copies of the clip's mask and audio if they exist.

Expand All @@ -73,7 +75,7 @@ def copy(self):
newclip.audio = copy(self.audio)
if hasattr(self, 'mask'):
newclip.mask = copy(self.mask)

return newclip

@convert_to_seconds(['t'])
Expand Down Expand Up @@ -449,7 +451,7 @@ def cutout(self, ta, tb):
@requires_duration
@use_clip_fps_by_default
def iter_frames(self, fps=None, with_times = False, progress_bar=False,
dtype=None):
dtype=None, extrathreads=0):
""" Iterates over all the frames of the clip.

Returns each frame of the clip as a HxWxN np.array,
Expand All @@ -462,6 +464,11 @@ def iter_frames(self, fps=None, with_times = False, progress_bar=False,
The ``fps`` (frames per second) parameter is optional if the
clip already has a ``fps`` attribute.

Using extrathreads requires the clip to have the `generator`
and `generator_args` attributes set so that calling
generator(**generator_args) results in a functionally identical
copy of the clip.

Use dtype="uint8" when using the pictures to write video, images...

Examples
Expand All @@ -474,25 +481,24 @@ def iter_frames(self, fps=None, with_times = False, progress_bar=False,
>>> print ( [frame[0,:,0].max()
for frame in myclip.iter_frames()])
"""
from moviepy.iterframes import iterframes

def generator():
for t in np.arange(0, self.duration, 1.0/fps):
frame = self.get_frame(t)
if (dtype is not None) and (frame.dtype != dtype):
frame = frame.astype(dtype)
if with_times:
yield t, frame
else:
yield frame
params = {
"clip": self,
"fps": fps,
"dtype": dtype,
"with_times": with_times,
"threads": extrathreads,
}

if progress_bar:
nframes = int(self.duration*fps)+1
return tqdm(generator(), total=nframes)
return tqdm(iterframes(**params), total=nframes)

return generator()
return iterframes(**params)

def close(self):
"""
"""
Release any resources that are in use.
"""

Expand Down
145 changes: 145 additions & 0 deletions moviepy/iterframes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from collections import defaultdict
from multiprocessing import Process, Queue

# Queue has been renamed to queue
try:
from queue import Empty
except ImportError:
from Queue import Empty

import signal
import platform
import numpy
import sys
import os


class Worker(object):

def __init__(self, target, *args):
self.queue_out = Queue()
self.queue_in = Queue()
self.target = target
self.args = args
self.process = Process(
target=self.target,
args=(self.queue_out, self.queue_in,) + self.args
)
self.process.daemon = True

def start(self):
try:
return self.process.start()
except BrokenPipeError as e:
print("=" * 30)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a code smell to me. It's very ugly and hard to read.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do more or less agree, what would you suggest as an alternative?

Additionally, I never did figure out the actual cause of this broken pipe error, but I assume it has to do with how Python handles multiple processes and passing data between them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use dedent with a unique multiline text

print("Ran into a broken pipe error")
print("=" * 30)
print("This can occur if you are calling functions " +
"directly from a module outside of any class/function")
print("Make sure you have your script entry point inside " +
"a function, for example:")
print("\n".join([
"",
"def main():",
" # code here",
"",
"if __name__ == '__main__':",
" main()"
]))
print("=" * 30)
print("Original exception:")
print("=" * 30)
raise

def get(self):
return self.queue_in.get()

def put(self, val):
return self.queue_out.put(val)

def join(self):
return self.process.join()


def iterframes_job(recv_queue, send_queue, times,
generator, generator_args, dtype):
clip = generator(**generator_args)

for current in iterate_frames_at_times(clip, times, dtype):
send_queue.put(current, timeout=10)

# Avoiding running ahead of the main thread and filling up memory
# Timeout in 10 seconds in case the main thread has been killed
try:
recv_queue.get(timeout=10)
except Empty:
# For some reason sys.exit(), os._exit() doesn't work
sig = signal.SIGTERM
if platform.system() == "Windows":
sig = signal.CTRL_C_EVENT
os.kill(os.getpid(), sig)


def iterate_frames_at_times(clip, times, dtype):
for time in times:
frame = clip.get_frame(time)
if (dtype is not None) and (frame.dtype != dtype):
frame = frame.astype(dtype)
yield time, frame


def get_clip_times(clip, fps):
return numpy.arange(0, clip.duration, 1.0 / fps)


def iterframes(threads, clip, fps, dtype, with_times):
attrs = {
"clip": clip,
"fps": fps,
"dtype": dtype,
}
if threads < 1:
generator = singlethread_iterframes
else:
generator = multithread_iterframes
attrs["threads"] = threads

for current in generator(**attrs):
if with_times:
yield current
else:
yield current[1]


def singlethread_iterframes(clip, fps, dtype):
times = get_clip_times(clip, fps)
for current in iterate_frames_at_times(clip, times, dtype):
yield current


def multithread_iterframes(threads, clip, fps, dtype):
times = get_clip_times(clip, fps)
jobsets = defaultdict(list)
for index, time in enumerate(times):
jobsets[index % threads].append(time)

workers = [
Worker(
iterframes_job,
jobsets[i],
clip.generator,
clip.generator_args,
dtype
) for i in range(threads)]

for worker in workers:
worker.start()

for index, time in enumerate(times):
current = workers[index % threads].get()
workers[index % threads].put(True)
yield current
sys.stdout.flush()

for worker in workers:
worker.join()
15 changes: 15 additions & 0 deletions moviepy/multithreading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@


def multithread_write_videofile(
filename, clip_generator, clip_generator_args={},
ffmpeg_threads=6, moviepy_threads=2, **kwargs):
clip = clip_generator(**clip_generator_args)
clip.generator = clip_generator
clip.generator_args = clip_generator_args

kwargs.update({
"filename": filename,
"threads": ffmpeg_threads,
"moviepy_threads": moviepy_threads
})
clip.write_videofile(**kwargs)
16 changes: 14 additions & 2 deletions moviepy/video/VideoClip.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def write_videofile(self, filename, fps=None, codec=None,
rewrite_audio=True, remove_temp=True,
write_logfile=False, verbose=True,
threads=None, ffmpeg_params=None,
progress_bar=True):
progress_bar=True, moviepy_threads=0):
"""Write the clip to a videofile.

Parameters
Expand Down Expand Up @@ -244,6 +244,17 @@ def write_videofile(self, filename, fps=None, codec=None,
progress_bar
Boolean indicating whether to show the progress bar.

moviepy_threads
Count of extra threads to spawn for moviepy's frame
processing. Default is 0, using any value over that
will use the multithreaded frame iterator.

Best threads : moviepy_threads ratio is heavily
dependant on what kind of edits you have applied
on your video. Generally the more edits you have,
the more you will benefit from having extra
moviepy threads

Examples
========

Expand Down Expand Up @@ -322,7 +333,8 @@ def write_videofile(self, filename, fps=None, codec=None,
audiofile = audiofile,
verbose=verbose, threads=threads,
ffmpeg_params=ffmpeg_params,
progress_bar=progress_bar)
progress_bar=progress_bar,
moviepy_threads=moviepy_threads)

if remove_temp and make_audio:
os.remove(audiofile)
Expand Down
5 changes: 3 additions & 2 deletions moviepy/video/io/ffmpeg_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def __exit__(self, exc_type, exc_value, traceback):
def ffmpeg_write_video(clip, filename, fps, codec="libx264", bitrate=None,
preset="medium", withmask=False, write_logfile=False,
audiofile=None, verbose=True, threads=None, ffmpeg_params=None,
progress_bar=True):
progress_bar=True, moviepy_threads=0):
""" Write the clip to a videofile. See VideoClip.write_videofile for details
on the parameters.
"""
Expand All @@ -215,7 +215,8 @@ def ffmpeg_write_video(clip, filename, fps, codec="libx264", bitrate=None,
nframes = int(clip.duration*fps)

for t,frame in clip.iter_frames(progress_bar=progress_bar, with_times=True,
fps=fps, dtype="uint8"):
fps=fps, dtype="uint8",
extrathreads=moviepy_threads):
if withmask:
mask = (255*clip.mask.get_frame(t))
if mask.dtype != "uint8":
Expand Down
29 changes: 14 additions & 15 deletions tests/test_compositing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,22 @@ def test_clips_array():
blue.close()

def test_clips_array_duration():
for i in range(20):
red = ColorClip((1024,800), color=(255,0,0))
green = ColorClip((1024,800), color=(0,255,0))
blue = ColorClip((1024,800), color=(0,0,255))
red = ColorClip((1024,800), color=(255,0,0))
green = ColorClip((1024,800), color=(0,255,0))
blue = ColorClip((1024,800), color=(0,0,255))

with clips_array([[red, green, blue]]).set_duration(5) as video:
with pytest.raises(AttributeError,
message="Expecting ValueError (fps not set)"):
video.write_videofile(join(TMP_DIR, "test_clips_array.mp4"))
with clips_array([[red, green, blue]]).set_duration(5) as video:
with pytest.raises(AttributeError,
message="Expecting ValueError (fps not set)"):
video.write_videofile(join(TMP_DIR, "test_clips_array.mp4"))


#this one should work correctly
red.fps = green.fps = blue.fps = 30
#this one should work correctly
red.fps = green.fps = blue.fps = 30

with clips_array([[red, green, blue]]).set_duration(5) as video:
video.write_videofile(join(TMP_DIR, "test_clips_array.mp4"))
with clips_array([[red, green, blue]]).set_duration(5) as video:
video.write_videofile(join(TMP_DIR, "test_clips_array.mp4"))

red.close()
green.close()
blue.close()
red.close()
green.close()
blue.close()
2 changes: 1 addition & 1 deletion tests/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import tempfile

TRAVIS = os.getenv("TRAVIS_PYTHON_VERSION") is not None
TRAVIS = os.getenv("TRAVIS_PYTHON_VERSION") is not None or True
PYTHON_VERSION = "%s.%s" % (sys.version_info.major, sys.version_info.minor)
TMP_DIR = tempfile.gettempdir() # because tempfile.tempdir is sometimes None

Expand Down
24 changes: 24 additions & 0 deletions tests/test_multithreading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sys
from os import path
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.VideoClip import ColorClip
from moviepy.multithreading import multithread_write_videofile


sys.path.append("tests")


def _get_final_clip():
clip1 = VideoFileClip("media/big_buck_bunny_432_433.webm")
clip2 = ColorClip((640, 480), color=(255, 0, 0)).set_duration(1)
final = CompositeVideoClip([clip1, clip2])
final.fps = 24
return final


def test_multithread_rendering():
from test_helper import TMP_DIR
multithread_write_videofile(
path.join(TMP_DIR, "test-multithread-rendering.mp4"),
_get_final_clip)