From 2a559fb4aadb3dc45e46403aefaa72325c86e92b Mon Sep 17 00:00:00 2001 From: minwoox Date: Mon, 16 Dec 2024 12:16:15 +0900 Subject: [PATCH 1/2] Fix Conflict for `NonBlocking` Interface in Reactor Integration Motivation: The `reactor.core.scheduler.NonBlocking` interface was introduced to `armeria-core` in #1665 to make `Schedulers.isInNonBlockingThread()` return `true` for an Armeria `EventLoop`. However, a conflict arises when building Java modules because the `NonBlocking` interface clashes with Reactor's own definition. Modifications: - Moved `NonBlocking` to `com.linecorp.armeria.common` to resolve the module conflict while retaining its utility for identifying non-blocking threads. - Updated to call `Schedulers.registerNonBlockingThreadPredicate` if Reactor is available in the classpath. - Adjusted `CoreBlockHoundIntegration` to update the non-blocking thread predicate. Result: - Resolved the Java module conflict with the `NonBlocking` interface. --- core/build.gradle | 5 ++- .../armeria/client/ClientFactory.java | 2 +- .../armeria/client/HttpChannelPool.java | 2 +- .../armeria/client/HttpClientFactory.java | 2 +- .../linecorp/armeria/common/CommonPools.java | 8 ++++ .../common/CoreBlockHoundIntegration.java | 2 + .../linecorp/armeria/common}/NonBlocking.java | 4 +- .../common/ReactorNonBlockingUtil.java} | 18 +++++---- .../common/util/EventLoopCheckingFuture.java | 3 +- .../internal/common/util/EventLoopThread.java | 2 +- .../reactor3/EventLoopNonBlockingTest.java | 37 +++++++++++++++++++ 11 files changed, 68 insertions(+), 17 deletions(-) rename core/src/main/java/{reactor/core/scheduler => com/linecorp/armeria/common}/NonBlocking.java (83%) rename core/src/main/java/{reactor/core/scheduler/package-info.java => com/linecorp/armeria/common/ReactorNonBlockingUtil.java} (64%) create mode 100644 reactor3/src/test/java/com/linecorp/armeria/common/reactor3/EventLoopNonBlockingTest.java diff --git a/core/build.gradle b/core/build.gradle index c01d1a9c3d2..054381732c2 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -139,6 +139,9 @@ dependencies { // JUnit Pioneer testImplementation libs.junit.pioneer + // Reactor for registering EventLoop as a non blocking thread. + optionalImplementation libs.reactor.core + // Reactive Streams api libs.reactivestreams testImplementation libs.reactivestreams.tck @@ -217,8 +220,6 @@ if (tasks.findByName('trimShadedJar')) { tasks.trimShadedJar.configure { // Keep all classes under com.linecorp.armeria, except the internal ones. keep "class !com.linecorp.armeria.internal.shaded.**,com.linecorp.armeria.** { *; }" - // Keep the 'NonBlocking' tag interface. - keep "class reactor.core.scheduler.NonBlocking { *; }" // Do not optimize the dependencies that access some fields via sun.misc.Unsafe or reflection only. keep "class com.linecorp.armeria.internal.shaded.caffeine.** { *; }" keep "class com.linecorp.armeria.internal.shaded.jctools.** { *; }" diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientFactory.java b/core/src/main/java/com/linecorp/armeria/client/ClientFactory.java index dc0b203afd2..d018a921d3d 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ClientFactory.java +++ b/core/src/main/java/com/linecorp/armeria/client/ClientFactory.java @@ -36,6 +36,7 @@ import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.common.FlagsProvider; +import com.linecorp.armeria.common.NonBlocking; import com.linecorp.armeria.common.Scheme; import com.linecorp.armeria.common.SerializationFormat; import com.linecorp.armeria.common.SessionProtocol; @@ -49,7 +50,6 @@ import io.micrometer.core.instrument.MeterRegistry; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import reactor.core.scheduler.NonBlocking; /** * Creates and manages clients. diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpChannelPool.java b/core/src/main/java/com/linecorp/armeria/client/HttpChannelPool.java index 2bd867e93f5..a05ed95cc99 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpChannelPool.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpChannelPool.java @@ -47,6 +47,7 @@ import com.linecorp.armeria.client.proxy.Socks4ProxyConfig; import com.linecorp.armeria.client.proxy.Socks5ProxyConfig; import com.linecorp.armeria.common.ClosedSessionException; +import com.linecorp.armeria.common.NonBlocking; import com.linecorp.armeria.common.SerializationFormat; import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.annotation.Nullable; @@ -76,7 +77,6 @@ import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import reactor.core.scheduler.NonBlocking; final class HttpChannelPool implements AsyncCloseable { diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java index 35ad761e790..1332cd3baff 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java @@ -42,6 +42,7 @@ import com.linecorp.armeria.client.proxy.ProxyConfigSelector; import com.linecorp.armeria.client.redirect.RedirectConfig; import com.linecorp.armeria.common.Http1HeaderNaming; +import com.linecorp.armeria.common.NonBlocking; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.Scheme; import com.linecorp.armeria.common.SerializationFormat; @@ -71,7 +72,6 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.resolver.AddressResolverGroup; import io.netty.util.concurrent.FutureListener; -import reactor.core.scheduler.NonBlocking; /** * A {@link ClientFactory} that creates an HTTP client. diff --git a/core/src/main/java/com/linecorp/armeria/common/CommonPools.java b/core/src/main/java/com/linecorp/armeria/common/CommonPools.java index 7690ccd09c0..3c48e094321 100644 --- a/core/src/main/java/com/linecorp/armeria/common/CommonPools.java +++ b/core/src/main/java/com/linecorp/armeria/common/CommonPools.java @@ -41,6 +41,14 @@ public final class CommonPools { MoreMeterBinders .eventLoopMetrics(WORKER_GROUP, new MeterIdPrefix("armeria.netty.common")) .bindTo(Flags.meterRegistry()); + + try { + Class.forName("reactor.core.scheduler.Schedulers", + true, CommonPools.class.getClassLoader()); + ReactorNonBlockingUtil.registerEventLoopAsNonBlocking(); + } catch (ClassNotFoundException e) { + // Do nothing. + } } /** diff --git a/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java b/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java index 3faf112ebac..7642c55662b 100644 --- a/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java +++ b/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java @@ -30,6 +30,8 @@ public final class CoreBlockHoundIntegration implements BlockHoundIntegration { @Override public void applyTo(Builder builder) { + builder.nonBlockingThreadPredicate(predicate -> predicate.or(NonBlocking.class::isInstance)); + // short locks builder.allowBlockingCallsInside("com.linecorp.armeria.client.HttpClientFactory", "pool"); diff --git a/core/src/main/java/reactor/core/scheduler/NonBlocking.java b/core/src/main/java/com/linecorp/armeria/common/NonBlocking.java similarity index 83% rename from core/src/main/java/reactor/core/scheduler/NonBlocking.java rename to core/src/main/java/com/linecorp/armeria/common/NonBlocking.java index 8dcdbce748f..dc33cd3a19c 100644 --- a/core/src/main/java/reactor/core/scheduler/NonBlocking.java +++ b/core/src/main/java/com/linecorp/armeria/common/NonBlocking.java @@ -13,9 +13,9 @@ * License for the specific language governing permissions and limitations * under the License. */ -package reactor.core.scheduler; +package com.linecorp.armeria.common; /** - * A dummy interface that makes Project Reactor recognize Armeria's event loop threads as non-blocking. + * A dummy interface that indicates non-blocking thread. */ public interface NonBlocking {} diff --git a/core/src/main/java/reactor/core/scheduler/package-info.java b/core/src/main/java/com/linecorp/armeria/common/ReactorNonBlockingUtil.java similarity index 64% rename from core/src/main/java/reactor/core/scheduler/package-info.java rename to core/src/main/java/com/linecorp/armeria/common/ReactorNonBlockingUtil.java index 0cafe36bc09..1b93980fb5a 100644 --- a/core/src/main/java/reactor/core/scheduler/package-info.java +++ b/core/src/main/java/com/linecorp/armeria/common/ReactorNonBlockingUtil.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2024 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,11 +13,15 @@ * License for the specific language governing permissions and limitations * under the License. */ +package com.linecorp.armeria.common; -/** - * Provides a dummy interface that makes Project Reactor recognize Armeria's event loop threads as non-blocking. - */ -@NonNullByDefault -package reactor.core.scheduler; +import reactor.core.scheduler.Schedulers; + +final class ReactorNonBlockingUtil { + + static void registerEventLoopAsNonBlocking() { + Schedulers.registerNonBlockingThreadPredicate(NonBlocking.class::isInstance); + } -import com.linecorp.armeria.common.annotation.NonNullByDefault; + private ReactorNonBlockingUtil() {} +} diff --git a/core/src/main/java/com/linecorp/armeria/common/util/EventLoopCheckingFuture.java b/core/src/main/java/com/linecorp/armeria/common/util/EventLoopCheckingFuture.java index 4c067d05af0..7f0de4de3c3 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/EventLoopCheckingFuture.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/EventLoopCheckingFuture.java @@ -31,10 +31,9 @@ import com.google.common.collect.MapMaker; import com.linecorp.armeria.common.Flags; +import com.linecorp.armeria.common.NonBlocking; import com.linecorp.armeria.common.annotation.Nullable; -import reactor.core.scheduler.NonBlocking; - /** * A {@link CompletableFuture} that warns the user if they call a method that blocks the event loop. */ diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java b/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java index f1059717154..c8d36aa43f3 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java @@ -15,11 +15,11 @@ */ package com.linecorp.armeria.internal.common.util; +import com.linecorp.armeria.common.NonBlocking; import com.linecorp.armeria.common.annotation.Nullable; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocalThread; -import reactor.core.scheduler.NonBlocking; /** * An event loop thread with support for {@link TemporaryThreadLocals}, Netty {@link FastThreadLocal} and diff --git a/reactor3/src/test/java/com/linecorp/armeria/common/reactor3/EventLoopNonBlockingTest.java b/reactor3/src/test/java/com/linecorp/armeria/common/reactor3/EventLoopNonBlockingTest.java new file mode 100644 index 00000000000..bb3b7011060 --- /dev/null +++ b/reactor3/src/test/java/com/linecorp/armeria/common/reactor3/EventLoopNonBlockingTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.common.reactor3; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.common.CommonPools; + +import io.netty.util.concurrent.Future; +import reactor.core.scheduler.Schedulers; + +final class EventLoopNonBlockingTest { + + /** + * Verifies that the current thread is registered a non-blocking thread via {@code ReactorNonBlockingUtil}. + */ + @Test + void checkEventLoopNonBlocking() throws Exception { + final Future submit = CommonPools.workerGroup().submit(Schedulers::isInNonBlockingThread); + assertThat(submit.get()).isTrue(); + } +} From 0f40a787f4b404bfc1d76c88856c0bd0f59152a6 Mon Sep 17 00:00:00 2001 From: minwoox Date: Mon, 16 Dec 2024 12:44:55 +0900 Subject: [PATCH 2/2] Update Javadoc of NonBlocking --- .../java/com/linecorp/armeria/common/NonBlocking.java | 11 ++++++++++- .../armeria/internal/common/util/EventLoopThread.java | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/common/NonBlocking.java b/core/src/main/java/com/linecorp/armeria/common/NonBlocking.java index dc33cd3a19c..cceeef41ff2 100644 --- a/core/src/main/java/com/linecorp/armeria/common/NonBlocking.java +++ b/core/src/main/java/com/linecorp/armeria/common/NonBlocking.java @@ -16,6 +16,15 @@ package com.linecorp.armeria.common; /** - * A dummy interface that indicates non-blocking thread. + * An interface that indicates a non-blocking thread. You can use this interface to check if the current + * thread is a non-blocking thread. For example: + *
{@code
+ * if (Thread.currentThread() instanceof NonBlocking) {
+ *     // Avoid blocking operations.
+ *     closeable.closeAsync();
+ * } else {
+ *     closeable.close();
+ * }
+ * }
*/ public interface NonBlocking {} diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java b/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java index c8d36aa43f3..ac082c940e3 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/util/EventLoopThread.java @@ -23,7 +23,7 @@ /** * An event loop thread with support for {@link TemporaryThreadLocals}, Netty {@link FastThreadLocal} and - * Project Reactor {@link NonBlocking}. + * {@link NonBlocking} interface. */ public final class EventLoopThread extends FastThreadLocalThread implements NonBlocking {