Skip to content

Commit

Permalink
Avoid spinning in between intermediate texture processors.
Browse files Browse the repository at this point in the history
This change adds a new method onReadyToAcceptInputFrame to
GlTextureProcesssor.InputListener and changes maybeQueueInputFrame
to queueInputFrame, removing the boolean return value.
This avoids the re-trying in ChainingGlTextureProcessorListener
by allowing it to only feed frames from the producing to the consuming
GlTextureProcessor when there is capacity.

MediaPipeProcessor still needs re-trying when processing isn't 1:1.

PiperOrigin-RevId: 466626369
  • Loading branch information
Googler authored and marcbaechinger committed Oct 19, 2022
1 parent 2d2926b commit 9c366b3
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,12 @@ private Transformer createTransformer(@Nullable Bundle bundle, String filePath)
Class.forName("com.google.android.exoplayer2.transformerdemo.MediaPipeProcessor");
Constructor<?> constructor =
clazz.getConstructor(
Context.class, boolean.class, String.class, String.class, String.class);
Context.class,
boolean.class,
String.class,
boolean.class,
String.class,
String.class);
effects.add(
(GlEffect)
(Context context, boolean useHdr) -> {
Expand All @@ -290,6 +295,7 @@ private Transformer createTransformer(@Nullable Bundle bundle, String filePath)
context,
useHdr,
/* graphName= */ "edge_detector_mediapipe_graph.binarypb",
/* isSingleFrameGraph= */ true,
/* inputStreamName= */ "input_video",
/* outputStreamName= */ "output_video");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,35 @@
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkState;
import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import android.content.Context;
import android.opengl.EGL14;
import androidx.annotation.Nullable;
import androidx.media3.common.FrameProcessingException;
import androidx.media3.effect.GlTextureProcessor;
import androidx.media3.effect.TextureInfo;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.util.LibraryLoader;
import com.google.android.exoplayer2.util.Util;
import com.google.mediapipe.components.FrameProcessor;
import com.google.mediapipe.framework.AppTextureFrame;
import com.google.mediapipe.framework.TextureFrame;
import com.google.mediapipe.glutil.EglManager;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/** Runs a MediaPipe graph on input frames. */
/* package */ final class MediaPipeProcessor implements GlTextureProcessor {

private static final String THREAD_NAME = "Demo:MediaPipeProcessor";
private static final long RELEASE_WAIT_TIME_MS = 100;
private static final long RETRY_WAIT_TIME_MS = 1;

private static final LibraryLoader LOADER =
new LibraryLoader("mediapipe_jni") {
@Override
Expand All @@ -55,6 +67,9 @@ protected void loadLibrary(String name) {

private final FrameProcessor frameProcessor;
private final ConcurrentHashMap<TextureInfo, TextureFrame> outputFrames;
private final boolean isSingleFrameGraph;
@Nullable private final ExecutorService singleThreadExecutorService;
private final Queue<Future<?>> futures;

private InputListener inputListener;
private OutputListener outputListener;
Expand All @@ -64,22 +79,34 @@ protected void loadLibrary(String name) {
/**
* Creates a new texture processor that wraps a MediaPipe graph.
*
* <p>If {@code isSingleFrameGraph} is {@code false}, the {@code MediaPipeProcessor} may waste CPU
* time by continuously attempting to queue input frames to MediaPipe until they are accepted or
* waste memory if MediaPipe accepts and stores many frames internally.
*
* @param context The {@link Context}.
* @param useHdr Whether input textures come from an HDR source. If {@code true}, colors will be
* in linear RGB BT.2020. If {@code false}, colors will be in gamma RGB BT.709.
* @param graphName Name of a MediaPipe graph asset to load.
* @param isSingleFrameGraph Whether the MediaPipe graph will eventually produce one output frame
* each time an input frame (and no other input) has been queued.
* @param inputStreamName Name of the input video stream in the graph.
* @param outputStreamName Name of the input video stream in the graph.
*/
public MediaPipeProcessor(
Context context,
boolean useHdr,
String graphName,
boolean isSingleFrameGraph,
String inputStreamName,
String outputStreamName) {
checkState(LOADER.isAvailable());
// TODO(b/227624622): Confirm whether MediaPipeProcessor could support HDR colors.
checkArgument(!useHdr, "MediaPipeProcessor does not support HDR colors.");

this.isSingleFrameGraph = isSingleFrameGraph;
singleThreadExecutorService =
isSingleFrameGraph ? null : Util.newSingleThreadExecutor(THREAD_NAME);
futures = new ArrayDeque<>();
inputListener = new InputListener() {};
outputListener = new OutputListener() {};
errorListener = (frameProcessingException) -> {};
Expand All @@ -96,6 +123,9 @@ public MediaPipeProcessor(
@Override
public void setInputListener(InputListener inputListener) {
this.inputListener = inputListener;
if (!isSingleFrameGraph || outputFrames.isEmpty()) {
inputListener.onReadyToAcceptInputFrame();
}
}

@Override
Expand All @@ -122,37 +152,131 @@ public void setErrorListener(ErrorListener errorListener) {
}

@Override
public boolean maybeQueueInputFrame(TextureInfo inputTexture, long presentationTimeUs) {
acceptedFrame = false;
public void queueInputFrame(TextureInfo inputTexture, long presentationTimeUs) {
AppTextureFrame appTextureFrame =
new AppTextureFrame(inputTexture.texId, inputTexture.width, inputTexture.height);
// TODO(b/238302213): Handle timestamps restarting from 0 when applying effects to a playlist.
// MediaPipe will fail if the timestamps are not monotonically increasing.
// Also make sure that a MediaPipe graph producing additional frames only starts producing
// frames for the next MediaItem after receiving the first frame of that MediaItem as input
// to avoid MediaPipe producing extra frames after the last MediaItem has ended.
appTextureFrame.setTimestamp(presentationTimeUs);
if (isSingleFrameGraph) {
boolean acceptedFrame = maybeQueueInputFrameSynchronous(appTextureFrame, inputTexture);
checkState(
acceptedFrame,
"queueInputFrame must only be called when a new input frame can be accepted");
return;
}

// TODO(b/241782273): Avoid retrying continuously until the frame is accepted by using a
// currently non-existent MediaPipe API to be notified when MediaPipe has capacity to accept a
// new frame.
queueInputFrameAsynchronous(appTextureFrame, inputTexture);
}

private boolean maybeQueueInputFrameSynchronous(
AppTextureFrame appTextureFrame, TextureInfo inputTexture) {
acceptedFrame = false;
frameProcessor.onNewFrame(appTextureFrame);
try {
appTextureFrame.waitUntilReleasedWithGpuSync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
errorListener.onFrameProcessingError(new FrameProcessingException(e));
}
inputListener.onInputFrameProcessed(inputTexture);
if (acceptedFrame) {
inputListener.onInputFrameProcessed(inputTexture);
}
return acceptedFrame;
}

private void queueInputFrameAsynchronous(
AppTextureFrame appTextureFrame, TextureInfo inputTexture) {
removeFinishedFutures();
futures.add(
checkStateNotNull(singleThreadExecutorService)
.submit(
() -> {
while (!maybeQueueInputFrameSynchronous(appTextureFrame, inputTexture)) {
try {
Thread.sleep(RETRY_WAIT_TIME_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (errorListener != null) {
errorListener.onFrameProcessingError(new FrameProcessingException(e));
}
}
}
inputListener.onReadyToAcceptInputFrame();
}));
}

@Override
public void releaseOutputFrame(TextureInfo outputTexture) {
checkStateNotNull(outputFrames.get(outputTexture)).release();
if (isSingleFrameGraph) {
inputListener.onReadyToAcceptInputFrame();
}
}

@Override
public void release() {
if (isSingleFrameGraph) {
frameProcessor.close();
return;
}

Queue<Future<?>> futures = checkStateNotNull(this.futures);
while (!futures.isEmpty()) {
futures.remove().cancel(/* mayInterruptIfRunning= */ false);
}
ExecutorService singleThreadExecutorService =
checkStateNotNull(this.singleThreadExecutorService);
singleThreadExecutorService.shutdown();
try {
if (!singleThreadExecutorService.awaitTermination(RELEASE_WAIT_TIME_MS, MILLISECONDS)) {
errorListener.onFrameProcessingError(new FrameProcessingException("Release timed out"));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
errorListener.onFrameProcessingError(new FrameProcessingException(e));
}

frameProcessor.close();
}

@Override
public final void signalEndOfCurrentInputStream() {
frameProcessor.waitUntilIdle();
outputListener.onCurrentOutputStreamEnded();
if (isSingleFrameGraph) {
frameProcessor.waitUntilIdle();
outputListener.onCurrentOutputStreamEnded();
return;
}

removeFinishedFutures();
futures.add(
checkStateNotNull(singleThreadExecutorService)
.submit(
() -> {
frameProcessor.waitUntilIdle();
outputListener.onCurrentOutputStreamEnded();
}));
}

private void removeFinishedFutures() {
while (!futures.isEmpty()) {
if (!futures.element().isDone()) {
return;
}
try {
futures.remove().get();
} catch (ExecutionException e) {
errorListener.onFrameProcessingError(new FrameProcessingException(e));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
errorListener.onFrameProcessingError(new FrameProcessingException(e));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package androidx.media3.effect;

import android.util.Pair;
import androidx.annotation.GuardedBy;
import androidx.annotation.Nullable;
import androidx.media3.effect.GlTextureProcessor.InputListener;
import androidx.media3.effect.GlTextureProcessor.OutputListener;
import com.google.android.exoplayer2.C;
import java.util.ArrayDeque;
import java.util.Queue;

Expand All @@ -33,8 +36,13 @@
private final GlTextureProcessor producingGlTextureProcessor;
private final GlTextureProcessor consumingGlTextureProcessor;
private final FrameProcessingTaskExecutor frameProcessingTaskExecutor;

@GuardedBy("this")
private final Queue<Pair<TextureInfo, Long>> availableFrames;

@GuardedBy("this")
private int nextGlTextureProcessorInputCapacity;

/**
* Creates a new instance.
*
Expand All @@ -58,33 +66,52 @@ public ChainingGlTextureProcessorListener(
}

@Override
public void onInputFrameProcessed(TextureInfo inputTexture) {
frameProcessingTaskExecutor.submit(
() -> producingGlTextureProcessor.releaseOutputFrame(inputTexture));
public synchronized void onReadyToAcceptInputFrame() {
@Nullable Pair<TextureInfo, Long> pendingFrame = availableFrames.poll();
if (pendingFrame == null) {
nextGlTextureProcessorInputCapacity++;
return;
}

long presentationTimeUs = pendingFrame.second;
if (presentationTimeUs == C.TIME_END_OF_SOURCE) {
frameProcessingTaskExecutor.submit(
consumingGlTextureProcessor::signalEndOfCurrentInputStream);
} else {
frameProcessingTaskExecutor.submit(
() ->
consumingGlTextureProcessor.queueInputFrame(
/* inputTexture= */ pendingFrame.first, presentationTimeUs));
}
}

@Override
public void onOutputFrameAvailable(TextureInfo outputTexture, long presentationTimeUs) {
public void onInputFrameProcessed(TextureInfo inputTexture) {
frameProcessingTaskExecutor.submit(
() -> {
availableFrames.add(new Pair<>(outputTexture, presentationTimeUs));
processFrameNowOrLater();
});
() -> producingGlTextureProcessor.releaseOutputFrame(inputTexture));
}

private void processFrameNowOrLater() {
Pair<TextureInfo, Long> pendingFrame = availableFrames.element();
TextureInfo outputTexture = pendingFrame.first;
long presentationTimeUs = pendingFrame.second;
if (consumingGlTextureProcessor.maybeQueueInputFrame(outputTexture, presentationTimeUs)) {
availableFrames.remove();
@Override
public synchronized void onOutputFrameAvailable(
TextureInfo outputTexture, long presentationTimeUs) {
if (nextGlTextureProcessorInputCapacity > 0) {
frameProcessingTaskExecutor.submit(
() ->
consumingGlTextureProcessor.queueInputFrame(
/* inputTexture= */ outputTexture, presentationTimeUs));
nextGlTextureProcessorInputCapacity--;
} else {
frameProcessingTaskExecutor.submit(this::processFrameNowOrLater);
availableFrames.add(new Pair<>(outputTexture, presentationTimeUs));
}
}

@Override
public void onCurrentOutputStreamEnded() {
frameProcessingTaskExecutor.submit(consumingGlTextureProcessor::signalEndOfCurrentInputStream);
public synchronized void onCurrentOutputStreamEnded() {
if (!availableFrames.isEmpty()) {
availableFrames.add(new Pair<>(TextureInfo.UNSET, C.TIME_END_OF_SOURCE));
} else {
frameProcessingTaskExecutor.submit(
consumingGlTextureProcessor::signalEndOfCurrentInputStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,11 @@
* android.graphics.SurfaceTexture#getTransformMatrix(float[]) transform matrix}.
*/
void setTextureTransformMatrix(float[] textureTransformMatrix);

/**
* Returns whether another input frame can be {@linkplain #queueInputFrame(TextureInfo, long)
* queued}.
*/
// TODO(b/227625423): Remove this method and use the input listener instead.
boolean acceptsInputFrame();
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public FinalMatrixTransformationProcessorWrapper(
@Override
public void setInputListener(InputListener inputListener) {
this.inputListener = inputListener;
inputListener.onReadyToAcceptInputFrame();
}

@Override
Expand All @@ -134,13 +135,19 @@ public void setErrorListener(ErrorListener errorListener) {
}

@Override
public boolean maybeQueueInputFrame(TextureInfo inputTexture, long presentationTimeUs) {
public boolean acceptsInputFrame() {
return true;
}

@Override
public void queueInputFrame(TextureInfo inputTexture, long presentationTimeUs) {
checkState(!streamOffsetUsQueue.isEmpty(), "No input stream specified.");

try {
synchronized (this) {
if (!ensureConfigured(inputTexture.width, inputTexture.height)) {
return false;
inputListener.onInputFrameProcessed(inputTexture);
return; // Drop frames when there is no output surface.
}

EGLSurface outputEglSurface = this.outputEglSurface;
Expand Down Expand Up @@ -181,7 +188,7 @@ public boolean maybeQueueInputFrame(TextureInfo inputTexture, long presentationT
}
}
inputListener.onInputFrameProcessed(inputTexture);
return true;
inputListener.onReadyToAcceptInputFrame();
}

@EnsuresNonNullIf(
Expand Down
Loading

0 comments on commit 9c366b3

Please sign in to comment.