Skip to content

Commit

Permalink
Make ByteBufferDataChunk.isReleased and ByteBufDataChunk.isReleased t…
Browse files Browse the repository at this point in the history
…hread-safe (#6899)

Fixes #6894
  • Loading branch information
romain-grecourt authored May 24, 2023
1 parent 3d4157e commit ed4eaab
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
* Copyright (c) 2021, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
*/
package io.helidon.common.http;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -25,11 +27,21 @@
*/
final class ByteBufferDataChunk implements DataChunk {

private static final VarHandle IS_RELEASED;

static {
try {
IS_RELEASED = MethodHandles.lookup().findVarHandle(ByteBufferDataChunk.class, "isReleased", int.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw (Error) new ExceptionInInitializerError(e.getMessage()).initCause(e);
}
}

private final ByteBuffer[] byteBuffers;
private final boolean flush;
private final boolean readOnly;
private final Runnable releaseCallback;
private boolean isReleased = false;
private volatile int isReleased;
private CompletableFuture<DataChunk> writeFuture;

/**
Expand All @@ -47,6 +59,7 @@ final class ByteBufferDataChunk implements DataChunk {

/**
* Create a new data chunk.
*
* @param flush a signal that this chunk should be written and flushed from any cache if possible
* @param readOnly indicates underlying buffers are not reused
* @param releaseCallback a callback which is called when this chunk is completely processed and instance is free for reuse
Expand All @@ -71,7 +84,7 @@ public ByteBuffer[] data() {

@Override
public boolean isReleased() {
return isReleased;
return isReleased != 0;
}

@Override
Expand All @@ -81,11 +94,10 @@ public boolean isReadOnly() {

@Override
public void release() {
if (!isReleased) {
if (IS_RELEASED.compareAndSet(this, 0, 1)) {
if (releaseCallback != null) {
releaseCallback.run();
}
isReleased = true;
}
}

Expand Down
1 change: 1 addition & 0 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<module>restclient</module>
<module>oidc</module>
<module>gh-5792</module>
<module>se-gh-6845</module>
</modules>

<profiles>
Expand Down
70 changes: 70 additions & 0 deletions tests/integration/se-gh-6845/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2023 Oracle and/or its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.helidon.applications</groupId>
<artifactId>helidon-se</artifactId>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../../../applications/se/pom.xml</relativePath>
</parent>
<groupId>io.helidon.tests.integration</groupId>
<artifactId>helidon-tests-integration-se-gh-6845</artifactId>
<name>Helidon Tests Integration SE GH 6845</name>
<description>Reproducer for GitHub issue #6845 - MultiPart IllegalReferenceCountException</description>

<properties>
<mainClass>io.helidon.tests.integration.gh6845.Main</mainClass>
</properties>

<dependencies>
<dependency>
<groupId>io.helidon.webserver</groupId>
<artifactId>helidon-webserver</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.media</groupId>
<artifactId>helidon-media-multipart</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.webclient</groupId>
<artifactId>helidon-webclient</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-libs</id>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.tests.integration.gh6845;

import java.util.Arrays;

import io.helidon.common.LogConfig;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Single;
import io.helidon.media.multipart.ContentDisposition;
import io.helidon.media.multipart.MultiPartSupport;
import io.helidon.media.multipart.WriteableBodyPart;
import io.helidon.media.multipart.WriteableBodyPartHeaders;
import io.helidon.media.multipart.WriteableMultiPart;
import io.helidon.webserver.Routing;
import io.helidon.webserver.WebServer;

import static io.helidon.common.http.MediaType.TEXT_PLAIN;

/**
* Main class of this integration test.
*/
public final class Main {

/**
* Cannot be instantiated.
*/
private Main() {
}

/**
* Application main entry point.
*
* @param args command line arguments.
*/
public static void main(String[] args) {
startServer();
}

/**
* Start the server.
*
* @return the created {@link io.helidon.webserver.WebServer} instance
*/
static WebServer startServer() {
// load logging configuration
LogConfig.configureRuntime();

WebServer server = WebServer.builder()
.routing(createRouting())
.addMediaSupport(MultiPartSupport.create())
.printFeatureDetails(true)
.build();

// Try to start the server. If successful, print some info and arrange to
// print a message at shutdown. If unsuccessful, print the exception.
server.start()
.thenAccept(ws -> {
System.out.println(
"WEB server is up! http://localhost:" + ws.port() + "/greet");
ws.whenShutdown().thenRun(()
-> System.out.println("WEB server is DOWN. Good bye!"));
})
.exceptionally(t -> {
System.err.println("Startup failed: " + t.getMessage());
t.printStackTrace(System.err);
return null;
});

// Server threads are not daemon. No need to block. Just react.
return server;
}

/**
* Creates new {@link io.helidon.webserver.Routing}.
*
* @return routing configured
*/
private static Routing createRouting() {
return Routing.builder()
.get("/", (req, res) -> {
byte[] contents = new byte[21 * 1024 * 1024]; // 21 MiB
Arrays.fill(contents, (byte) 0x20); // fill with spaces
res.send(WriteableMultiPart
.builder()
.bodyPart(WriteableBodyPart
.builder()
.publisher(Single.just(DataChunk.create(contents)))
.headers(WriteableBodyPartHeaders
.builder()
.contentType(TEXT_PLAIN)
.contentDisposition(ContentDisposition
.builder()
.name("too-big")
.build())
.build())
.build())
.build());
})
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.tests.integration.gh6845;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.helidon.common.http.MediaType;
import io.helidon.common.reactive.Single;
import io.helidon.media.multipart.MultiPartSupport;
import io.helidon.media.multipart.ReadableBodyPart;
import io.helidon.webclient.WebClient;
import io.helidon.webserver.WebServer;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/**
* See the related <a href="https://github.com/helidon-io/helidon/issues/6845">issue</a>.
*/
class ChunkReleaseTest {

private static WebServer webServer;
private static WebClient webClient;

@BeforeAll
public static void startTheServer() {
webServer = Main.startServer().start().await(10, TimeUnit.SECONDS);
webClient = WebClient.builder()
.baseUri("http://localhost:" + webServer.port())
.addMediaSupport(MultiPartSupport.create())
.build();
}

@AfterAll
public static void stopServer() {
webServer.shutdown().await(10, TimeUnit.SECONDS);
}

@Test
void testReleasing() {
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get()
.accept(MediaType.MULTIPART_FORM_DATA)
.submit()
.flatMap(response -> response.content().asStream(ReadableBodyPart.class))
.flatMap(part -> part.content().as(InputStream.class).observeOn(executor))
.forEach(is -> {
try {
is.readAllBytes();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.onErrorResumeWithSingle(th -> {
if (th.getMessage().equals("Invalid state: END_MESSAGE")) {
// work-around for https://github.com/helidon-io/helidon/issues/6828
return Single.empty();
}
return Single.error(th);
})
.await();
}
}
Loading

0 comments on commit ed4eaab

Please sign in to comment.