Skip to content

Commit

Permalink
Merge branch 'main' into st0
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
  • Loading branch information
sandeshkr419 authored Oct 19, 2024
2 parents 2d034c8 + 0f7d572 commit bdabc51
Show file tree
Hide file tree
Showing 62 changed files with 1,606 additions and 264 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
- Add support to dynamically resize threadpools size. ([#16236](https://github.com/opensearch-project/OpenSearch/pull/16236))
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
- New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915))
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))
- Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993))
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387)
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))

### Dependencies
Expand Down Expand Up @@ -82,6 +84,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix warnings from SLF4J on startup when repository-s3 is installed ([#16194](https://github.com/opensearch-project/OpenSearch/pull/16194))
- Fix protobuf-java leak through client library dependencies ([#16254](https://github.com/opensearch-project/OpenSearch/pull/16254))
- Fix multi-search with template doesn't return status code ([#16265](https://github.com/opensearch-project/OpenSearch/pull/16265))
- [Streaming Indexing] Fix intermittent 'The bulk request must be terminated by a newline [\n]' failures [#16337](https://github.com/opensearch-project/OpenSearch/pull/16337))
- Fix wrong default value when setting `index.number_of_routing_shards` to null on index creation ([#16331](https://github.com/opensearch-project/OpenSearch/pull/16331))
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)
- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))

### Security

Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ netty = 4.1.114.Final
joda = 2.12.7

# project reactor
reactor_netty = 1.1.22
reactor_netty = 1.1.23
reactor = 3.5.20

# client dependencies
Expand Down
4 changes: 4 additions & 0 deletions distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ ${error.file}

# HDFS ForkJoinPool.common() support by SecurityManager
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=org.opensearch.secure_sm.SecuredForkJoinWorkerThreadFactory

# See please https://bugs.openjdk.org/browse/JDK-8341127 (openjdk/jdk#21283)
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.setAsTypeCache
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.asTypeUncached
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static <R extends Reader<?>> void registerReader(final byte ordinal, fina

public static void registerClassAlias(final Class<?> classInstance, final Class<?> classGeneric) {
if (WRITER_CUSTOM_CLASS_MAP.putIfAbsent(classInstance, classGeneric) != null) {
throw new IllegalArgumentException("Streamable custom class already registered [" + classInstance.getClass() + "]");
throw new IllegalArgumentException("Streamable custom class already registered [" + classInstance.getName() + "]");
}
}

Expand All @@ -96,7 +96,7 @@ public static <W extends Writer<?>> W getWriter(final Class<?> clazz) {
}

/**
* Returns the ristered reader keyed by the unique ordinal
* Returns the registered reader keyed by the unique ordinal
*/
@SuppressWarnings("unchecked")
public static <R extends Reader<?>> R getReader(final byte b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
public final class MediaTypeRegistry {
private static Map<String, MediaType> formatToMediaType = Map.of();
private static Map<String, MediaType> typeWithSubtypeToMediaType = Map.of();
private static Map<String, MediaType> knownStringsToMediaType = Map.of();

// Default mediaType singleton
private static MediaType DEFAULT_MEDIA_TYPE;
Expand Down Expand Up @@ -84,6 +85,8 @@ private static void register(MediaType[] acceptedMediaTypes, Map<String, MediaTy
// ensures the map is not overwritten:
Map<String, MediaType> typeMap = new HashMap<>(typeWithSubtypeToMediaType);
Map<String, MediaType> formatMap = new HashMap<>(formatToMediaType);
Map<String, MediaType> knownStringMap = new HashMap<>(knownStringsToMediaType);

for (MediaType mediaType : acceptedMediaTypes) {
if (formatMap.containsKey(mediaType.format())) {
throw new IllegalArgumentException("unable to register mediaType: [" + mediaType.format() + "]. Type already exists.");
Expand All @@ -107,13 +110,24 @@ private static void register(MediaType[] acceptedMediaTypes, Map<String, MediaTy
MediaType mediaType = entry.getValue();
typeMap.put(typeWithSubtype, mediaType);
formatMap.putIfAbsent(mediaType.format(), mediaType); // ignore if the additional type mapping already exists
knownStringMap.put(mediaType.mediaType(), mediaType);
knownStringMap.put(mediaType.mediaTypeWithoutParameters(), mediaType);
}

formatToMediaType = Map.copyOf(formatMap);
typeWithSubtypeToMediaType = Map.copyOf(typeMap);
knownStringsToMediaType = Map.copyOf(knownStringMap);
}

public static MediaType fromMediaType(String mediaType) {
if (mediaType == null) {
return null;
}
// Skip parsing if the string is an exact match for any known string value
final MediaType knownMediaType = knownStringsToMediaType.get(mediaType);
if (knownMediaType != null) {
return knownMediaType;
}
ParsedMediaType parsedMediaType = parseMediaType(mediaType);
return parsedMediaType != null ? parsedMediaType.getMediaType() : null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.common.io.stream;

import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;

import java.util.concurrent.atomic.AtomicInteger;

public class WriteableTests extends OpenSearchTestCase {

public void testRegisterClassAlias() {
Writeable.WriteableRegistry.registerClassAlias(StringBuilder.class, AtomicInteger.class);
try {
Writeable.WriteableRegistry.registerClassAlias(StringBuilder.class, AtomicInteger.class);
Assert.fail("expected exception not thrown");
} catch (IllegalArgumentException illegalArgumentException) {
Assert.assertEquals(
"Streamable custom class already registered [java.lang.StringBuilder]",
illegalArgumentException.getMessage()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,19 +510,25 @@ static Result selectBestResult(Result result1, Result result2) {
}

private static int minTermLength(Set<QueryExtraction> extractions) {
// In case there are only range extractions, then we return Integer.MIN_VALUE,
// so that selectBestExtraction(...) we are likely to prefer the extractions that contains at least a single extraction
if (extractions.stream().filter(queryExtraction -> queryExtraction.term != null).count() == 0
&& extractions.stream().filter(queryExtraction -> queryExtraction.range != null).count() > 0) {
return Integer.MIN_VALUE;
}

boolean hasTerm = false;
boolean hasRange = false;
int min = Integer.MAX_VALUE;

for (QueryExtraction qt : extractions) {
if (qt.term != null) {
hasTerm = true;
min = Math.min(min, qt.bytes().length);
}
if (qt.range != null) {
hasRange = true;
}
}

// If there are no terms but there are ranges, return Integer.MIN_VALUE
if (!hasTerm && hasRange) {
return Integer.MIN_VALUE;
}

return min;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ public SSLServerChannelInitializer(String name) {
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);

final boolean dualModeEnabled = NetworkModule.TRANSPORT_SSL_DUAL_MODE_ENABLED.get(settings);
final boolean dualModeEnabled = secureTransportSettingsProvider.parameters(settings)
.map(SecureTransportSettingsProvider.SecureTransportParameters::dualModeEnabled)
.orElse(false);
if (dualModeEnabled) {
logger.info("SSL Dual mode enabled, using port unification handler");
final ChannelHandler portUnificationHandler = new DualModeSslHandler(
settings,
secureTransportSettingsProvider,
Expand Down Expand Up @@ -258,7 +259,9 @@ protected class SSLClientChannelInitializer extends Netty4Transport.ClientChanne
public SSLClientChannelInitializer(DiscoveryNode node) {
this.node = node;

final boolean dualModeEnabled = NetworkModule.TRANSPORT_SSL_DUAL_MODE_ENABLED.get(settings);
final boolean dualModeEnabled = secureTransportSettingsProvider.parameters(settings)
.map(SecureTransportSettingsProvider.SecureTransportParameters::dualModeEnabled)
.orElse(false);
hostnameVerificationEnabled = NetworkModule.TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION.get(settings);
hostnameVerificationResolveHostName = NetworkModule.TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION_RESOLVE_HOST_NAME.get(settings);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a7059b0c18ab7aa0fa9e08b48cb6a20b15c11478

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
94b294fa90aee2e88ad4337251e278aaac21362c

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a7059b0c18ab7aa0fa9e08b48cb6a20b15c11478

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
94b294fa90aee2e88ad4337251e278aaac21362c
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,36 @@ public void testStreamingLargeDocument() throws IOException {
String.format(
Locale.getDefault(),
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n",
randomAlphaOfLength(5000)
randomAlphaOfLength(7000)
)
);

final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());
}

public void testStreamingLargeDocumentThatExceedsChunkSize() throws IOException {
final Stream<String> stream = Stream.of(
String.format(
Locale.getDefault(),
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n",
randomAlphaOfLength(9000) /* the default chunk size limit is set 8k */
)
);

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti
spec -> spec.maxChunkSize(maxChunkSize.bytesAsInt())
.maxHeaderSize(maxHeaderSize.bytesAsInt())
.maxInitialLineLength(maxInitialLineLength.bytesAsInt())
.allowPartialChunks(false)
)
.handle((req, res) -> incomingRequest(req, res))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,33 @@
properties:
"":
type: keyword

---
"Create index with setting index.number_of_routing_shards to null":
- skip:
version: " - 2.17.99"
reason: "fixed in 2.18.0"
- do:
indices.create:
index: test_index
body:
settings:
number_of_routing_shards: null
- do:
cluster.state:
metric: [ metadata ]
index: test_index
- match : { metadata.indices.test_index.routing_num_shards: 1024 }

- do:
indices.create:
index: test_index1
body:
settings:
number_of_routing_shards: null
number_of_shards: 3
- do:
cluster.state:
metric: [ metadata ]
index: test_index1
- match : { metadata.indices.test_index1.routing_num_shards: 768 }
Loading

0 comments on commit bdabc51

Please sign in to comment.