Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthpadakanti authored Apr 18, 2024
2 parents 664b51e + f5c3ef9 commit 51ed18f
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 166 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/detect-breaking-change.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: "Detect Breaking Changes"
on:
pull_request
pull_request:
branches-ignore:
- main # This branch represents a to-be-released version of OpenSearch where breaking changes are allowed

jobs:
detect-breaking-change:
Expand Down
4 changes: 4 additions & 0 deletions release-notes/opensearch.release-notes-1.3.16.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## 2024-04-18 Version 1.3.16 Release Notes

### Upgrades
- Bump `netty` from 4.1.107.Final to 4.1.109.Final ([#12924](https://github.com/opensearch-project/OpenSearch/pull/12924), [#13233](https://github.com/opensearch-project/OpenSearch/pull/13233))
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
---
"Test primary_only parameter":
- skip:
version: " - 2.99.99"
reason: "primary_only is available in 3.0+"
version: " - 2.12.99"
reason: "primary_only is available in 2.13.0+"

- do:
indices.create:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) {
}
}

public InboundMessage finishAggregation() throws IOException {
public NativeInboundMessage finishAggregation() throws IOException {
ensureOpen();
final ReleasableBytesReference releasableContent;
if (isFirstContent()) {
Expand All @@ -127,7 +128,7 @@ public InboundMessage finishAggregation() throws IOException {
}

final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
final NativeInboundMessage aggregated = new NativeInboundMessage(currentHeader, releasableContent, breakerControl);
boolean success = false;
try {
if (aggregated.getHeader().needsToReadVariableHeader()) {
Expand All @@ -142,7 +143,7 @@ public InboundMessage finishAggregation() throws IOException {
if (isShortCircuited()) {
aggregated.close();
success = true;
return new InboundMessage(aggregated.getHeader(), aggregationException);
return new NativeInboundMessage(aggregated.getHeader(), aggregationException);
} else {
success = true;
return aggregated;
Expand Down
108 changes: 0 additions & 108 deletions server/src/main/java/org/opensearch/transport/InboundMessage.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void messageReceived(
long slowLogThresholdMs,
TransportMessageListener messageListener
) throws IOException {
InboundMessage inboundMessage = (InboundMessage) message;
NativeInboundMessage inboundMessage = (NativeInboundMessage) message;
TransportLogger.logInboundMessage(channel, inboundMessage);
if (inboundMessage.isPing()) {
keepAlive.receiveKeepAlive(channel);
Expand All @@ -122,7 +123,7 @@ public void messageReceived(

private void handleMessage(
TcpChannel channel,
InboundMessage message,
NativeInboundMessage message,
long startTime,
long slowLogThresholdMs,
TransportMessageListener messageListener
Expand Down Expand Up @@ -194,7 +195,7 @@ private Map<String, Collection<String>> extractHeaders(Map<String, String> heade
private <T extends TransportRequest> void handleRequest(
TcpChannel channel,
Header header,
InboundMessage message,
NativeInboundMessage message,
TransportMessageListener messageListener
) throws IOException {
final String action = header.getActionName();
Expand Down
12 changes: 0 additions & 12 deletions server/src/main/java/org/opensearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -761,18 +761,6 @@ protected void serverAcceptedChannel(TcpChannel channel) {
*/
protected abstract void stopInternal();

/**
* @deprecated use {@link #inboundMessage(TcpChannel, ProtocolInboundMessage)}
* Handles inbound message that has been decoded.
*
* @param channel the channel the message is from
* @param message the message
*/
@Deprecated(since = "2.14.0", forRemoval = true)
public void inboundMessage(TcpChannel channel, InboundMessage message) {
inboundMessage(channel, (ProtocolInboundMessage) message);
}

/**
* Handles inbound message that has been decoded.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;

Expand All @@ -64,7 +65,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
}
}

static void logInboundMessage(TcpChannel channel, InboundMessage message) {
static void logInboundMessage(TcpChannel channel, NativeInboundMessage message) {
if (logger.isTraceEnabled()) {
try {
String logMessage = format(channel, message, "READ");
Expand Down Expand Up @@ -136,7 +137,7 @@ private static String format(TcpChannel channel, BytesReference message, String
return sb.toString();
}

private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
private static String format(TcpChannel channel, NativeInboundMessage message, String event) throws IOException {
final StringBuilder sb = new StringBuilder();
sb.append(channel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.transport.InboundAggregator;
import org.opensearch.transport.InboundBytesHandler;
import org.opensearch.transport.InboundDecoder;
import org.opensearch.transport.InboundMessage;
import org.opensearch.transport.ProtocolInboundMessage;
import org.opensearch.transport.StatsTracker;
import org.opensearch.transport.TcpChannel;
Expand All @@ -32,7 +31,7 @@
public class NativeInboundBytesHandler implements InboundBytesHandler {

private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);
private static final NativeInboundMessage PING_MESSAGE = new NativeInboundMessage(null, true);

private final ArrayDeque<ReleasableBytesReference> pending;
private final InboundDecoder decoder;
Expand Down Expand Up @@ -152,7 +151,7 @@ private void forwardFragments(
messageHandler.accept(channel, PING_MESSAGE);
} else if (fragment == InboundDecoder.END_CONTENT) {
assert aggregator.isAggregating();
try (InboundMessage aggregated = aggregator.finishAggregation()) {
try (NativeInboundMessage aggregated = aggregator.finishAggregation()) {
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.opensearch.test.ClusterServiceUtils.setState;
import static org.opensearch.test.VersionUtils.randomCompatibleVersion;
import static org.opensearch.test.VersionUtils.randomOpenSearchVersion;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -826,10 +824,9 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion
request.persistentSettings(intendedCompatibilityModeSettings);

// two different but compatible open search versions for the discovery nodes
final Version version1 = randomOpenSearchVersion(random());
final Version version2 = randomCompatibleVersion(random(), version1);
final Version version1 = Version.V_2_13_0;
final Version version2 = Version.V_2_13_1;

assert version1.equals(version2) == false : "current nodes in the cluster must be of different versions";
DiscoveryNode discoveryNode1 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testInboundAggregation() throws IOException {
}

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(aggregated.isPing());
Expand Down Expand Up @@ -138,7 +139,7 @@ public void testInboundUnknownAction() throws IOException {
assertEquals(0, content.refCount());

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertTrue(aggregated.isShortCircuit());
Expand All @@ -161,7 +162,7 @@ public void testCircuitBreak() throws IOException {
content1.close();

// Signal EOS
InboundMessage aggregated1 = aggregator.finishAggregation();
NativeInboundMessage aggregated1 = aggregator.finishAggregation();

assertEquals(0, content1.refCount());
assertThat(aggregated1, notNullValue());
Expand All @@ -180,7 +181,7 @@ public void testCircuitBreak() throws IOException {
content2.close();

// Signal EOS
InboundMessage aggregated2 = aggregator.finishAggregation();
NativeInboundMessage aggregated2 = aggregator.finishAggregation();

assertEquals(1, content2.refCount());
assertThat(aggregated2, notNullValue());
Expand All @@ -199,7 +200,7 @@ public void testCircuitBreak() throws IOException {
content3.close();

// Signal EOS
InboundMessage aggregated3 = aggregator.finishAggregation();
NativeInboundMessage aggregated3 = aggregator.finishAggregation();

assertEquals(1, content3.refCount());
assertThat(aggregated3, notNullValue());
Expand Down Expand Up @@ -263,7 +264,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException {
content.close();

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(header.needsToReadVariableHeader());
Expand Down
Loading

0 comments on commit 51ed18f

Please sign in to comment.