Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Data Prepper process threads stop when processors throw exceptions #4103

Closed
dlvenable opened this issue Feb 10, 2024 · 1 comment · Fixed by #4155
Closed

[BUG] Data Prepper process threads stop when processors throw exceptions #4103

dlvenable opened this issue Feb 10, 2024 · 1 comment · Fixed by #4155
Assignees
Labels
bug Something isn't working
Milestone

Comments

@dlvenable
Copy link
Member

dlvenable commented Feb 10, 2024

Describe the bug

When a processor throws an exception, the current Data Prepper processor thread stops processing. Data Prepper is not shutting down the pipeline, nor is it attempting to restart the thread.

To Reproduce

Create a pipeline with a processor that will throw an exception. The date processor throws an exception when the date_when value has in valid expression.

Run Data Prepper and send data through.

The threads stop.

Expected behavior

There are two things I'd expect:

  • The whole pipeline shuts down
  • Data Prepper attempts to restart the thread

Analysis

The exact cause is this line:

} catch (final Exception e) {
LOG.error("Encountered exception during pipeline {} processing", pipeline.getName(), e);
}

The ProcessWorker catches the exception. Then the ProcessWorker::run method exits. After this, the thread remains, but it is waiting on a task to run.

You can see that the thread remains with a thread dump:

"my-pipeline-pipeline-processor-worker-1-thread-1" #41 prio=5 os_prio=0 cpu=2889321.30ms elapsed=70643.67s tid=0x0000aaaadebd3310 nid=0xa8 waiting on condition  [0x0000ffff08bdf000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.22/Native Method)
	- parking to wait for  <0x00000005e1d1c890> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.22/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.22/AbstractQueuedSynchronizer.java:2081)
	at java.util.concurrent.LinkedBlockingQueue.take(java.base@11.0.22/LinkedBlockingQueue.java:433)
	at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.22/ThreadPoolExecutor.java:1054)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.22/ThreadPoolExecutor.java:1114)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.22/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(java.base@11.0.22/Thread.java:829)

Environment (please complete the following information):

Data Prepper 2.6.1

@dlvenable
Copy link
Member Author

I propose a couple of solutions:

  1. Catch exceptions from the processor and then drop that batch.

We can have a try-catch around this code:

List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((ArrayList<Record<Event>>)records).stream().map(Record::getData).collect(Collectors.toList());
}
records = processor.execute(records);
if (inputEvents != null) {
processAcknowledgements(inputEvents, records);
}

  1. When the processor thread fails, let it throw an exception. It will be handled by the PipelineThreadPoolExecutor.

public void afterExecute(final Runnable runnable, final Throwable throwable) {
super.afterExecute(runnable, throwable);
// If submit() method is used instead of execute(), the exceptions are wrapped in Future
// Processor or Sink failures will enter into this loop
if (throwable == null && runnable instanceof Future<?>) {
try {
((Future<?>) runnable).get();
} catch (CancellationException | ExecutionException ex) {
LOG.error("Pipeline [{}] process worker encountered a fatal exception, " +
"cannot proceed further", pipeline.getName(), ex);
pipeline.shutdown();
} catch (InterruptedException ex) {
LOG.error("Pipeline [{}] process worker encountered a fatal exception, terminating", pipeline.getName(), ex);
pipeline.shutdown();
Thread.currentThread().interrupt();
}
}
// If we ever use the execute instead of submit
else if (throwable != null) {
LOG.error("Pipeline {} encountered a fatal exception, terminating", pipeline.getName(), throwable);
pipeline.shutdown(); //Stop the source and wait for processors to finish draining the buffer
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Development

Successfully merging a pull request may close this issue.

2 participants