Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock issue in CompressionUtils #104

Closed
timducheyne opened this issue Mar 2, 2020 · 6 comments · Fixed by #107
Closed

Deadlock issue in CompressionUtils #104

timducheyne opened this issue Mar 2, 2020 · 6 comments · Fixed by #107
Assignees
Labels
bug Something isn't working
Milestone

Comments

@timducheyne
Copy link

A deadlock can occur when sending to multiple receivers of which at least one has an invalid endpoint.

For example 2 receivers:

  • one valid endpoint
  • one relative url eg "/as4" or "null" (there are cases like this)

now send 30 documents to those 2 receivers at random (can be sequentially)
A deadlock occurs and no other documents can be sent anymore.

This is caused by the CompressionUtil code: AS4 4.1.5

final PipedInputStream resultStream = new PipedInputStream();
final PipedOutputStream intermediateStream = new PipedOutputStream(resultStream);
final OutputStream zipperOutStream = new GZIPOutputStream(intermediateStream);

Runnable copyTask = () -> {
     try {
           IOUtils.copy(sourceStream, zipperOutStream);
           zipperOutStream.flush();
     } catch (IOException e) {
           IOUtils.closeQuietly(resultStream);  // close it on error case only
           throw new RuntimeException(e);
     } finally {
           // close source stream and intermediate streams
           IOUtils.closeQuietly(sourceStream);
           IOUtils.closeQuietly(zipperOutStream);
           IOUtils.closeQuietly(intermediateStream);
      }
 };
 pool.submit(copyTask);

This code tries not to take the full payload in memory. But there are several issues with this.
If the other side of the piped stream is not receiving anymore (eg in some cases when the send fails), then this side of the piped stream will block forever.
Note that to have this behavior, there has to be some load, the zip runnable needs to start before the cxf is consuming it.

Also note 2 things about the GZIPOutputStream:

  • the constructor is writing to the stream, so now the piped stream is writing from 2 different threads (the caller and the runnable which can cause issues)
  • the close is also writing to the stream (it calls finish) so even when it crashes with an exception it will go into deadlock since the finally closes it

The piped stream is passed around as an instance var in the cxf attachment. There is currently no code that guarantees that it is always closed properly.

A simple solution here is to just do this compression in memory and avoid the complex threading and piped streams:

final ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
try (final OutputStream zipperOutStream = new GZIPOutputStream(byteOut)) {
      IOUtils.copy(sourceStream, zipperOutStream);
      zipperOutStream.flush();
} catch (IOException e) {
      throw new RuntimeException(e);
} finally {
      IOUtils.closeQuietly(sourceStream);
}
return new ByteArrayInputStream(byteOut.toByteArray());

Stack traces from blocked threads:

"pool-1-thread-1@12640" prio=5 tid=0x3d nid=NA waiting
  java.lang.Thread.State: WAITING
	  at java.lang.Object.wait(Object.java:-1)
	  at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:273)
	  at java.io.PipedInputStream.receive(PipedInputStream.java:231)
	  at java.io.PipedOutputStream.write(PipedOutputStream.java:149)
	  at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:169)
	  at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:238)
	  at no.difi.oxalis.as4.util.CompressionUtil.lambda$getCompressedStream$0(CompressionUtil.java:45)
	  at no.difi.oxalis.as4.util.CompressionUtil$$Lambda$824.1616677560.run(Unknown Source:-1)
	  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	  at java.util.concurrent.FutureTask.run(FutureTask.java:-1)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	  at java.lang.Thread.run(Thread.java:834)


"http-nio-8086-exec-5@10245" daemon prio=5 tid=0x28 nid=NA waiting
  java.lang.Thread.State: WAITING
	  at java.lang.Object.wait(Object.java:-1)
	  at java.io.PipedInputStream.read(PipedInputStream.java:326)
	  at java.io.PipedInputStream.read(PipedInputStream.java:377)
	  at org.apache.cxf.attachment.DelegatingInputStream.read(DelegatingInputStream.java:93)
	  at java.io.BufferedInputStream.fill(BufferedInputStream.java:252)
	  at java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
	  at java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	  - locked <merged>(a java.io.BufferedInputStream)
	  at java.io.FilterInputStream.read(FilterInputStream.java:133)
	  at java.io.FilterInputStream.read(FilterInputStream.java:107)
	  at org.apache.wss4j.dom.transform.AttachmentContentSignatureTransform.processAttachment(AttachmentContentSignatureTransform.java:218)
	  at org.apache.wss4j.dom.transform.AttachmentContentSignatureTransform.transform(AttachmentContentSignatureTransform.java:122)
	  at org.apache.jcp.xml.dsig.internal.dom.DOMTransform.transform(DOMTransform.java:169)
	  at org.apache.jcp.xml.dsig.internal.dom.DOMReference.transform(DOMReference.java:453)
	  at org.apache.jcp.xml.dsig.internal.dom.DOMReference.digest(DOMReference.java:356)
	  at org.apache.jcp.xml.dsig.internal.dom.DOMXMLSignature.digestReference(DOMXMLSignature.java:486)
	  at org.apache.jcp.xml.dsig.internal.dom.DOMXMLSignature.sign(DOMXMLSignature.java:371)
	  at org.apache.wss4j.dom.message.WSSecSignature.computeSignature(WSSecSignature.java:614)
	  at org.apache.cxf.ws.security.wss4j.policyhandlers.AsymmetricBindingHandler.doSignature(AsymmetricBindingHandler.java:743)
	  at org.apache.cxf.ws.security.wss4j.policyhandlers.AsymmetricBindingHandler.doSignBeforeEncrypt(AsymmetricBindingHandler.java:180)
	  at org.apache.cxf.ws.security.wss4j.policyhandlers.AsymmetricBindingHandler.handleBinding(AsymmetricBindingHandler.java:117)
	  at org.apache.cxf.ws.security.wss4j.PolicyBasedWSS4JOutInterceptor$PolicyBasedWSS4JOutInterceptorInternal.handleMessageInternal(PolicyBasedWSS4JOutInterceptor.java:185)
	  at org.apache.cxf.ws.security.wss4j.PolicyBasedWSS4JOutInterceptor$PolicyBasedWSS4JOutInterceptorInternal.handleMessage(PolicyBasedWSS4JOutInterceptor.java:109)
	  at org.apache.cxf.ws.security.wss4j.PolicyBasedWSS4JOutInterceptor$PolicyBasedWSS4JOutInterceptorInternal.handleMessage(PolicyBasedWSS4JOutInterceptor.java:96)
	  at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308)
	  - locked <merged>(a org.apache.cxf.phase.PhaseInterceptorChain)
	  at org.apache.cxf.endpoint.ClientImpl.doInvoke(ClientImpl.java:530)
	  at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:441)
	  at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:356)
	  at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:314)
	  at org.apache.cxf.endpoint.ClientImpl.invokeWrapped(ClientImpl.java:349)
	  at org.apache.cxf.jaxws.DispatchImpl.invoke(DispatchImpl.java:322)
	  at org.apache.cxf.jaxws.DispatchImpl.invoke(DispatchImpl.java:241)
	  at no.difi.oxalis.as4.outbound.As4MessageSender.invoke(As4MessageSender.java:93)
	  at no.difi.oxalis.as4.outbound.As4MessageSender.send(As4MessageSender.java:85)
	  at no.difi.oxalis.as4.outbound.As4MessageSenderFacade.send(As4MessageSenderFacade.java:20)
	  at no.difi.oxalis.api.outbound.MessageSender.send(MessageSender.java:59)
	  at no.difi.oxalis.outbound.transmission.DefaultTransmitter.perform(DefaultTransmitter.java:149)
	  at no.difi.oxalis.outbound.transmission.DefaultTransmitter.transmit(DefaultTransmitter.java:106)
@FrodeBjerkholt FrodeBjerkholt added the bug Something isn't working label Mar 3, 2020
@FrodeBjerkholt
Copy link
Contributor

Thanks for a thorough bug report - I will be looking into it soon.

@FrodeBjerkholt
Copy link
Contributor

The implementation needs to support huge files, so a plain byte array is too simple - But the idea is good. I think that it is possible to avoid the piped streams by utilizing CXF's CachedOutputStream and do exactly what you are suggesting, and still support huge files.

@FrodeBjerkholt
Copy link
Contributor

I have a fix for this that also support large files, without using piped streams, It will be in the 4.1.8 release.

@timducheyne
Copy link
Author

Ok thanks,
I will have a look at the code

@FrodeBjerkholt
Copy link
Contributor

I just pushed the code now - You can take a look at it in the hotfix-4.1.8 branch.

@FrodeBjerkholt FrodeBjerkholt linked a pull request Mar 5, 2020 that will close this issue
@FrodeBjerkholt FrodeBjerkholt added this to the 4.1.8 milestone Mar 5, 2020
@FrodeBjerkholt FrodeBjerkholt self-assigned this Mar 5, 2020
@timducheyne
Copy link
Author

Just performed a test with the new code.
The issue no longer occurs.
Thanks for the fix

dhayalcbe added a commit to ZiriusAS/Oxalis-AS4 that referenced this issue Sep 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants