Skip to content

Commit

Permalink
Ensure support of the transport-nio by security plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Oct 29, 2024
1 parent 08dc3bb commit 8aafff0
Show file tree
Hide file tree
Showing 19 changed files with 933 additions and 42 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Ensure support of the transport-nio by security plugin ([#16474](https://github.com/opensearch-project/OpenSearch/pull/16474))

### Dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,11 @@ private static class CountDownLatchHandlerHttp2 extends AwaitableChannelInitiali

private final CountDownLatch latch;
private final Collection<FullHttpResponse> content;
private final boolean secure;
private Http2SettingsHandler settingsHandler;

CountDownLatchHandlerHttp2(final CountDownLatch latch, final Collection<FullHttpResponse> content, final boolean secure) {
this.latch = latch;
this.content = content;
this.secure = secure;
}

@Override
Expand Down
5 changes: 1 addition & 4 deletions plugins/transport-nio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
}

tasks.named("dependencyLicenses").configure {
Expand Down Expand Up @@ -151,10 +152,6 @@ thirdPartyAudit {
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',

// from io.netty.channel.unix (netty)
'io.netty.channel.unix.FileDescriptor',
'io.netty.channel.unix.UnixChannel',

'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
d1171bb99411f282068f49d780cedf8c9adeabfd
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testThatNioHttpServerSupportsPipelining() throws Exception {
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (NioHttpClient nettyHttpClient = new NioHttpClient()) {
try (NioHttpClient nettyHttpClient = NioHttpClient.http()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
assertThat(responses, hasSize(5));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.http.nio;

import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpPipelinedRequest;
Expand All @@ -44,6 +45,8 @@
import org.opensearch.nio.TaskScheduler;
import org.opensearch.nio.WriteOperation;

import javax.net.ssl.SSLEngine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -58,6 +61,9 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;

public class HttpReadWriteHandler implements NioChannelHandler {

Expand All @@ -77,6 +83,17 @@ public HttpReadWriteHandler(
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock
) {
this(nioHttpChannel, transport, settings, taskScheduler, nanoClock, null /* no SSL/TLS */);
}

HttpReadWriteHandler(
NioHttpChannel nioHttpChannel,
NioHttpServerTransport transport,
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock,
@Nullable SSLEngine sslEngine
) {
this.nioHttpChannel = nioHttpChannel;
this.transport = transport;
Expand All @@ -85,6 +102,12 @@ public HttpReadWriteHandler(
this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis());

List<ChannelHandler> handlers = new ArrayList<>(8);

SslHandler sslHandler = null;
if (sslEngine != null) {
sslHandler = new SslHandler(sslEngine);
}

HttpRequestDecoder decoder = new HttpRequestDecoder(
settings.getMaxInitialLineLength(),
settings.getMaxHeaderSize(),
Expand All @@ -101,8 +124,9 @@ public HttpReadWriteHandler(
handlers.add(new NioHttpRequestCreator());
handlers.add(new NioHttpResponseCreator());
handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));
handlers.add(new LoggingHandler(LogLevel.DEBUG));

adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
adaptor = new NettyAdaptor(sslHandler, handlers.toArray(new ChannelHandler[0]));
adaptor.addCloseListener((v, e) -> nioHttpChannel.close());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,21 @@
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.ssl.SslHandler;

class NettyAdaptor {

private final EmbeddedChannel nettyChannel;
private final LinkedList<FlushOperation> flushOperations = new LinkedList<>();

NettyAdaptor(ChannelHandler... handlers) {
nettyChannel = new EmbeddedChannel();
nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
this(null, handlers);
}

NettyAdaptor(SslHandler sslHandler, ChannelHandler... handlers) {
this.nettyChannel = new EmbeddedChannel();

nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// This is a little tricky. The embedded channel will complete the promise once it writes the message
Expand All @@ -75,19 +80,32 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
});
if (sslHandler != null) {
nettyChannel.pipeline().addAfter("write_captor", "ssl_handler", sslHandler);
}
nettyChannel.pipeline().addLast(handlers);
}

public void close() throws Exception {
assert flushOperations.isEmpty() : "Should close outbound operations before calling close";

ChannelFuture closeFuture = nettyChannel.close();
// This should be safe as we are not a real network channel
closeFuture.await();
if (closeFuture.isSuccess() == false) {
Throwable cause = closeFuture.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
throw (Exception) cause;
if (nettyChannel.pipeline().get("ssl_handler") != null) {
closeFuture.addListener(f -> {
if (f.isSuccess() == false) {
Throwable cause = closeFuture.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
throw (Exception) cause;
}
});
} else {
// This should be safe as we are not a real network channel
closeFuture.await();
if (closeFuture.isSuccess() == false) {
Throwable cause = closeFuture.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
throw (Exception) cause;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -47,6 +48,7 @@
import org.opensearch.http.AbstractHttpServerTransport;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.nio.ssl.SslUtils;
import org.opensearch.nio.BytesChannelContext;
import org.opensearch.nio.ChannelFactory;
import org.opensearch.nio.Config;
Expand All @@ -56,11 +58,15 @@
import org.opensearch.nio.NioSocketChannel;
import org.opensearch.nio.ServerChannelContext;
import org.opensearch.nio.SocketChannelContext;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.nio.NioGroupFactory;
import org.opensearch.transport.nio.PageAllocator;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
Expand Down Expand Up @@ -97,6 +103,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {

private volatile NioGroup nioGroup;
private ChannelFactory<NioHttpServerChannel, NioHttpChannel> channelFactory;
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;

public NioHttpServerTransport(
Settings settings,
Expand All @@ -109,6 +116,34 @@ public NioHttpServerTransport(
NioGroupFactory nioGroupFactory,
ClusterSettings clusterSettings,
Tracer tracer
) {
this(
settings,
networkService,
bigArrays,
pageCacheRecycler,
threadPool,
xContentRegistry,
dispatcher,
nioGroupFactory,
clusterSettings,
null,
tracer
);
}

public NioHttpServerTransport(
Settings settings,
NetworkService networkService,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
ThreadPool threadPool,
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
NioGroupFactory nioGroupFactory,
ClusterSettings clusterSettings,
@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
Tracer tracer
) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer);
this.pageAllocator = new PageAllocator(pageCacheRecycler);
Expand All @@ -127,6 +162,7 @@ public NioHttpServerTransport(
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
this.tcpSendBufferSize = Math.toIntExact(SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings).getBytes());
this.tcpReceiveBufferSize = Math.toIntExact(SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes());
this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;

logger.debug(
"using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}],"
Expand Down Expand Up @@ -178,17 +214,18 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOExcep
return httpServerChannel;
}

protected ChannelFactory<NioHttpServerChannel, NioHttpChannel> channelFactory() {
return new HttpChannelFactory();
protected ChannelFactory<NioHttpServerChannel, NioHttpChannel> channelFactory() throws SSLException {
return new HttpChannelFactory(secureHttpTransportSettingsProvider);
}

protected void acceptChannel(NioSocketChannel socketChannel) {
super.serverAcceptedChannel((HttpChannel) socketChannel);
}

private class HttpChannelFactory extends ChannelFactory<NioHttpServerChannel, NioHttpChannel> {
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;

private HttpChannelFactory() {
private HttpChannelFactory(@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider) {
super(
tcpNoDelay,
tcpKeepAlive,
Expand All @@ -199,17 +236,25 @@ private HttpChannelFactory() {
tcpSendBufferSize,
tcpReceiveBufferSize
);
this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;
}

@Override
public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) {
public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) throws IOException {
SSLEngine engine = null;
if (secureHttpTransportSettingsProvider != null) {
engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, NioHttpServerTransport.this)
.orElseGet(SslUtils::createDefaultServerSSLEngine);
}

NioHttpChannel httpChannel = new NioHttpChannel(channel);
HttpReadWriteHandler handler = new HttpReadWriteHandler(
httpChannel,
NioHttpServerTransport.this,
handlingSettings,
selector.getTaskScheduler(),
threadPool::relativeTimeInMillis
threadPool::relativeTimeInMillis,
engine
);
Consumer<Exception> exceptionHandler = (e) -> onException(httpChannel, e);
SocketChannelContext context = new BytesChannelContext(
Expand Down Expand Up @@ -244,6 +289,5 @@ public NioHttpServerChannel createServerChannel(
httpServerChannel.setContext(context);
return httpServerChannel;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.http.nio.ssl;

import org.opensearch.OpenSearchSecurityException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import java.security.NoSuchAlgorithmException;

public class SslUtils {
private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };

private SslUtils() {

}

public static SSLEngine createDefaultServerSSLEngine() {
try {
final SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS);
engine.setUseClientMode(false);
return engine;
} catch (final NoSuchAlgorithmException ex) {
throw new OpenSearchSecurityException("Unable to initialize default server SSL engine", ex);
}
}

public static SSLEngine createDefaultClientSSLEngine() {
try {
final SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS);
engine.setUseClientMode(true);
return engine;
} catch (final NoSuchAlgorithmException ex) {
throw new OpenSearchSecurityException("Unable to initialize default client SSL engine", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* SSL supporting utility classes
*/
package org.opensearch.http.nio.ssl;
Loading

0 comments on commit 8aafff0

Please sign in to comment.