diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 0461af4966e92..378c0f52da3ad 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -40,6 +40,7 @@ BWC_VERSION: - "1.3.1" - "1.3.2" - "1.3.3" + - "1.3.4" - "2.0.0" - "2.0.1" - "2.1.0" diff --git a/buildSrc/version.properties b/buildSrc/version.properties index fe2cfe6a63ee6..87dbad73229b4 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -1,5 +1,5 @@ opensearch = 3.0.0 -lucene = 9.3.0-snapshot-823df23 +lucene = 9.3.0-snapshot-b7231bb bundled_jdk_vendor = adoptium bundled_jdk = 17.0.3+7 diff --git a/modules/lang-expression/licenses/lucene-expressions-9.3.0-snapshot-823df23.jar.sha1 b/modules/lang-expression/licenses/lucene-expressions-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 540a48bf7415f..0000000000000 --- a/modules/lang-expression/licenses/lucene-expressions-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -610ec9bb8001a2d2ea88e3384eb516017504139e \ No newline at end of file diff --git a/modules/lang-expression/licenses/lucene-expressions-9.3.0-snapshot-b7231bb.jar.sha1 b/modules/lang-expression/licenses/lucene-expressions-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..f527a3b68b6a3 --- /dev/null +++ b/modules/lang-expression/licenses/lucene-expressions-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +57ae445a0050ad492ef494b692b486dfe718b564 \ No newline at end of file diff --git a/plugins/analysis-icu/licenses/lucene-analysis-icu-9.3.0-snapshot-823df23.jar.sha1 b/plugins/analysis-icu/licenses/lucene-analysis-icu-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 7bc128d4562fa..0000000000000 --- a/plugins/analysis-icu/licenses/lucene-analysis-icu-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -43f2ea45a2d12b4c75c7ac11b85ec736c73bc07f \ No newline at end of file diff --git a/plugins/analysis-icu/licenses/lucene-analysis-icu-9.3.0-snapshot-b7231bb.jar.sha1 b/plugins/analysis-icu/licenses/lucene-analysis-icu-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..51cbf51d90626 --- /dev/null +++ b/plugins/analysis-icu/licenses/lucene-analysis-icu-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +b10e5bdae6df879b770060e0006bbc1c780c886d \ No newline at end of file diff --git a/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.3.0-snapshot-823df23.jar.sha1 b/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index bad2a0bdcfa2a..0000000000000 --- a/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -fb46807684a5b0e28a02b2a1ea3d528e4c25aa05 \ No newline at end of file diff --git a/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.3.0-snapshot-b7231bb.jar.sha1 b/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..ff57bbc283385 --- /dev/null +++ b/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +f0ddc3072fd16012dafc74928f87fdfd7669ea4a \ No newline at end of file diff --git a/plugins/analysis-nori/licenses/lucene-analysis-nori-9.3.0-snapshot-823df23.jar.sha1 b/plugins/analysis-nori/licenses/lucene-analysis-nori-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index b2c62bcbbade1..0000000000000 --- a/plugins/analysis-nori/licenses/lucene-analysis-nori-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -be94b15085b6390ed64a8e8a4f5afbcb2d4d5181 \ No newline at end of file diff --git a/plugins/analysis-nori/licenses/lucene-analysis-nori-9.3.0-snapshot-b7231bb.jar.sha1 b/plugins/analysis-nori/licenses/lucene-analysis-nori-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..13dd3c8a8bb24 --- /dev/null +++ b/plugins/analysis-nori/licenses/lucene-analysis-nori-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +d761fa983d9c21099c433731d5519651737750c1 \ No newline at end of file diff --git a/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.3.0-snapshot-823df23.jar.sha1 b/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index c7f8fd797c589..0000000000000 --- a/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -3a6f705a7df2007f5583215420da0725f844ac4f \ No newline at end of file diff --git a/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.3.0-snapshot-b7231bb.jar.sha1 b/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..5cba6f6700769 --- /dev/null +++ b/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +43abbbe7c3c789ac448f898981acf54e487407a6 \ No newline at end of file diff --git a/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.3.0-snapshot-823df23.jar.sha1 b/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 28424c2dd1c7a..0000000000000 --- a/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ea9931a34288fa6cbd894e244a101e86926ebfb8 \ No newline at end of file diff --git a/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.3.0-snapshot-b7231bb.jar.sha1 b/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..62097dc39ae20 --- /dev/null +++ b/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +55df9442a35fe09d4f3f98bd2dda4d1a1dbfd996 \ No newline at end of file diff --git a/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.3.0-snapshot-823df23.jar.sha1 b/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index d7c4b20a29db2..0000000000000 --- a/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -c339ce0a3b02d92a804081f5ff44b99f7a468caf \ No newline at end of file diff --git a/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.3.0-snapshot-b7231bb.jar.sha1 b/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..1666e4aae21a6 --- /dev/null +++ b/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +102cbb1d619b96e1f3e524520658b9327a93aba1 \ No newline at end of file diff --git a/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.3.0-snapshot-823df23.jar.sha1 b/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index b4a9090408165..0000000000000 --- a/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a8faa5faa38ab8f545e12cf3dd914e934a2f2bfe \ No newline at end of file diff --git a/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.3.0-snapshot-b7231bb.jar.sha1 b/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..3a2d3cec6b952 --- /dev/null +++ b/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +0d5dc4dfb74d698e51dc9b95268faf6dde4b0815 \ No newline at end of file diff --git a/plugins/discovery-azure-classic/build.gradle b/plugins/discovery-azure-classic/build.gradle index 575b8858b16ba..5755ff55bfff9 100644 --- a/plugins/discovery-azure-classic/build.gradle +++ b/plugins/discovery-azure-classic/build.gradle @@ -59,7 +59,7 @@ dependencies { api "com.sun.jersey:jersey-client:${versions.jersey}" api "com.sun.jersey:jersey-core:${versions.jersey}" api "com.sun.jersey:jersey-json:${versions.jersey}" - api 'org.codehaus.jettison:jettison:1.4.1' + api 'org.codehaus.jettison:jettison:1.5.0' api 'com.sun.xml.bind:jaxb-impl:2.2.3-1' // HACK: javax.xml.bind was removed from default modules in java 9, so we pull the api in here, diff --git a/plugins/discovery-azure-classic/licenses/jettison-1.4.1.jar.sha1 b/plugins/discovery-azure-classic/licenses/jettison-1.4.1.jar.sha1 deleted file mode 100644 index 815d87d917f2e..0000000000000 --- a/plugins/discovery-azure-classic/licenses/jettison-1.4.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8d16bbcbac93446942c9e5da04530159afbe3e65 \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/jettison-1.5.0.jar.sha1 b/plugins/discovery-azure-classic/licenses/jettison-1.5.0.jar.sha1 new file mode 100644 index 0000000000000..ec93f83474541 --- /dev/null +++ b/plugins/discovery-azure-classic/licenses/jettison-1.5.0.jar.sha1 @@ -0,0 +1 @@ +933c7df7a4b78c9a9322f431014ea699b1fc0cc0 \ No newline at end of file diff --git a/plugins/discovery-gce/build.gradle b/plugins/discovery-gce/build.gradle index 983a2907e4e67..c8b52d3afcd45 100644 --- a/plugins/discovery-gce/build.gradle +++ b/plugins/discovery-gce/build.gradle @@ -24,7 +24,7 @@ versions << [ dependencies { api "com.google.apis:google-api-services-compute:v1-rev160-${versions.google}" api "com.google.api-client:google-api-client:${versions.google}" - api "com.google.oauth-client:google-oauth-client:1.34.0" + api "com.google.oauth-client:google-oauth-client:1.34.1" api "com.google.http-client:google-http-client:${versions.google}" api "com.google.http-client:google-http-client-jackson2:${versions.google}" api 'com.google.code.findbugs:jsr305:3.0.2' diff --git a/plugins/discovery-gce/licenses/google-oauth-client-1.34.0.jar.sha1 b/plugins/discovery-gce/licenses/google-oauth-client-1.34.0.jar.sha1 deleted file mode 100644 index 57c5c16b34deb..0000000000000 --- a/plugins/discovery-gce/licenses/google-oauth-client-1.34.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a0dc471bd498c62280120037a42d410c0e36f5d6 \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/google-oauth-client-1.34.1.jar.sha1 b/plugins/discovery-gce/licenses/google-oauth-client-1.34.1.jar.sha1 new file mode 100644 index 0000000000000..a8434bd380761 --- /dev/null +++ b/plugins/discovery-gce/licenses/google-oauth-client-1.34.1.jar.sha1 @@ -0,0 +1 @@ +4a4f88c5e13143f882268c98239fb85c3b2c6cb2 \ No newline at end of file diff --git a/plugins/ingest-attachment/build.gradle b/plugins/ingest-attachment/build.gradle index 456b652ff82a3..86694b9bc9da7 100644 --- a/plugins/ingest-attachment/build.gradle +++ b/plugins/ingest-attachment/build.gradle @@ -79,7 +79,7 @@ dependencies { api "org.apache.poi:poi:${versions.poi}" api "org.apache.poi:poi-ooxml-lite:${versions.poi}" api "commons-codec:commons-codec:${versions.commonscodec}" - api 'org.apache.xmlbeans:xmlbeans:5.0.3' + api 'org.apache.xmlbeans:xmlbeans:5.1.0' api 'org.apache.commons:commons-collections4:4.4' // MS Office api "org.apache.poi:poi-scratchpad:${versions.poi}" diff --git a/plugins/ingest-attachment/licenses/xmlbeans-5.0.3.jar.sha1 b/plugins/ingest-attachment/licenses/xmlbeans-5.0.3.jar.sha1 deleted file mode 100644 index 7451ee17640d6..0000000000000 --- a/plugins/ingest-attachment/licenses/xmlbeans-5.0.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e1ef1382ae9dfb2438b82b6dd575566355c2f30f \ No newline at end of file diff --git a/plugins/ingest-attachment/licenses/xmlbeans-5.1.0.jar.sha1 b/plugins/ingest-attachment/licenses/xmlbeans-5.1.0.jar.sha1 new file mode 100644 index 0000000000000..85f757b61048c --- /dev/null +++ b/plugins/ingest-attachment/licenses/xmlbeans-5.1.0.jar.sha1 @@ -0,0 +1 @@ +3534ab896663e6f6d8a2cf46882d7407641d7a31 \ No newline at end of file diff --git a/plugins/repository-gcs/build.gradle b/plugins/repository-gcs/build.gradle index 0e1c2125f5d81..097e96fcd8fdc 100644 --- a/plugins/repository-gcs/build.gradle +++ b/plugins/repository-gcs/build.gradle @@ -58,7 +58,7 @@ dependencies { api 'com.google.cloud:google-cloud-core:2.5.10' runtimeOnly 'com.google.guava:guava:30.1.1-jre' api 'com.google.guava:failureaccess:1.0.1' - api 'com.google.http-client:google-http-client:1.35.0' + api 'com.google.http-client:google-http-client:1.42.0' api "commons-logging:commons-logging:${versions.commonslogging}" api "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}" api "commons-codec:commons-codec:${versions.commonscodec}" @@ -82,7 +82,7 @@ dependencies { api 'io.grpc:grpc-context:1.46.0' api 'io.opencensus:opencensus-api:0.18.0' api 'io.opencensus:opencensus-contrib-http-util:0.18.0' - api 'com.google.apis:google-api-services-storage:v1-rev20200814-1.30.10' + api 'com.google.apis:google-api-services-storage:v1-rev20220608-1.32.1' testImplementation project(':test:fixtures:gcs-fixture') } diff --git a/plugins/repository-gcs/licenses/google-api-services-storage-v1-rev20200814-1.30.10.jar.sha1 b/plugins/repository-gcs/licenses/google-api-services-storage-v1-rev20200814-1.30.10.jar.sha1 deleted file mode 100644 index e399aa5865413..0000000000000 --- a/plugins/repository-gcs/licenses/google-api-services-storage-v1-rev20200814-1.30.10.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -fe3b480958961fc7144da10ce3653065d5eb5490 \ No newline at end of file diff --git a/plugins/repository-gcs/licenses/google-api-services-storage-v1-rev20220608-1.32.1.jar.sha1 b/plugins/repository-gcs/licenses/google-api-services-storage-v1-rev20220608-1.32.1.jar.sha1 new file mode 100644 index 0000000000000..07aaadb2664b2 --- /dev/null +++ b/plugins/repository-gcs/licenses/google-api-services-storage-v1-rev20220608-1.32.1.jar.sha1 @@ -0,0 +1 @@ +74724addc6cecac408dad3a6a26423b7647b3724 \ No newline at end of file diff --git a/plugins/repository-gcs/licenses/google-http-client-1.35.0.jar.sha1 b/plugins/repository-gcs/licenses/google-http-client-1.35.0.jar.sha1 deleted file mode 100644 index 802a6ab3a8d04..0000000000000 --- a/plugins/repository-gcs/licenses/google-http-client-1.35.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -f2348dd57d5417c29388bd430f5055dca863c600 \ No newline at end of file diff --git a/plugins/repository-gcs/licenses/google-http-client-1.42.0.jar.sha1 b/plugins/repository-gcs/licenses/google-http-client-1.42.0.jar.sha1 new file mode 100644 index 0000000000000..9c20d9f12d4b0 --- /dev/null +++ b/plugins/repository-gcs/licenses/google-http-client-1.42.0.jar.sha1 @@ -0,0 +1 @@ +4f319ce80ba6888d04a38234916c43d5486842a5 \ No newline at end of file diff --git a/release-notes/opensearch.release-notes-1.3.3.md b/release-notes/opensearch.release-notes-1.3.3.md new file mode 100644 index 0000000000000..fd80e526166f0 --- /dev/null +++ b/release-notes/opensearch.release-notes-1.3.3.md @@ -0,0 +1,10 @@ +## Version 1.3.3 Release Notes + +### Upgrades +* Upgrade google-oauth-client to 1.33.3 ([#3502](https://github.com/opensearch-project/OpenSearch/pull/3502)) +* Upgrade log4j-core to 2.17.1 ([#3508](https://github.com/opensearch-project/OpenSearch/pull/3508)) +* Upgrade jdom2 to 2.0.6.1 ([#3509](https://github.com/opensearch-project/OpenSearch/pull/3509)) + +### Bug Fixes +* Fixing org.opensearch.monitor.os.OsProbeTests::testLogWarnCpuMessageOnlyOnes when CGroups are not available ([#2101](https://github.com/opensearch-project/OpenSearch/pull/2101)) +* Fixing org.opensearch.monitor.os.OsProbeTests > testLogWarnCpuMessageOnlyOnes when cgroups are available but cgroup stats is not ([#3448](https://github.com/opensearch-project/OpenSearch/pull/3448)) diff --git a/server/licenses/lucene-analysis-common-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-analysis-common-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index ab4abfd7d6a49..0000000000000 --- a/server/licenses/lucene-analysis-common-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8dbb5828e79780989a8758b7cbb5a1aacac0004f \ No newline at end of file diff --git a/server/licenses/lucene-analysis-common-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-analysis-common-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..4cb292ad20c1f --- /dev/null +++ b/server/licenses/lucene-analysis-common-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +6c6a9569777e4f01c90ed840e5a04234dfcaf42e \ No newline at end of file diff --git a/server/licenses/lucene-backward-codecs-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-backward-codecs-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 8ff6a25c9547e..0000000000000 --- a/server/licenses/lucene-backward-codecs-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -68ebd183f1e9edde9f2f37c60f784e4f03555eec \ No newline at end of file diff --git a/server/licenses/lucene-backward-codecs-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-backward-codecs-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..3878ed346c9ce --- /dev/null +++ b/server/licenses/lucene-backward-codecs-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +a7ef963f9f9f15fc5018c5fa68bae5cf65692ca9 \ No newline at end of file diff --git a/server/licenses/lucene-core-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-core-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 2ec15eb0012c5..0000000000000 --- a/server/licenses/lucene-core-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ea3cb640597d93168765174207542c6765c1fe15 \ No newline at end of file diff --git a/server/licenses/lucene-core-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-core-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..9f9f6be85c57c --- /dev/null +++ b/server/licenses/lucene-core-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +da113c963d62f0c8786d7c294dbbb63d5d7953ab \ No newline at end of file diff --git a/server/licenses/lucene-grouping-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-grouping-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 7b6c561ddeedf..0000000000000 --- a/server/licenses/lucene-grouping-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ab2bcdbade5976e127c7e9393bf7a7e25a957d9a \ No newline at end of file diff --git a/server/licenses/lucene-grouping-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-grouping-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..92d0c41c6f4d2 --- /dev/null +++ b/server/licenses/lucene-grouping-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +54f65917cfa6c9c54cd0354ba333aa7e0f2980e5 \ No newline at end of file diff --git a/server/licenses/lucene-highlighter-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-highlighter-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index b2aa53fcdfb83..0000000000000 --- a/server/licenses/lucene-highlighter-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -31ce6ff9188dea49dc4b4d082b498332cc7b86e7 \ No newline at end of file diff --git a/server/licenses/lucene-highlighter-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-highlighter-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..ecab2abeb6220 --- /dev/null +++ b/server/licenses/lucene-highlighter-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +d73ebe32147c9a12d321c0b1273d5e5d797b705f \ No newline at end of file diff --git a/server/licenses/lucene-join-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-join-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 7918597d46763..0000000000000 --- a/server/licenses/lucene-join-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -c387884f0bc00fb1c064754a69e1e81dff12c755 \ No newline at end of file diff --git a/server/licenses/lucene-join-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-join-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..725fc883c272b --- /dev/null +++ b/server/licenses/lucene-join-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +797c92ffe35af37ab1783906fb93ed95a145a701 \ No newline at end of file diff --git a/server/licenses/lucene-memory-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-memory-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index a87d3de9e2310..0000000000000 --- a/server/licenses/lucene-memory-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e278a2cfe1500b76da770aa29ecd487fea5f8dc3 \ No newline at end of file diff --git a/server/licenses/lucene-memory-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-memory-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..312a65edb6e24 --- /dev/null +++ b/server/licenses/lucene-memory-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +5714d64c39021c65dece8ee979d9ea39a327bb87 \ No newline at end of file diff --git a/server/licenses/lucene-misc-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-misc-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 18a165097d2be..0000000000000 --- a/server/licenses/lucene-misc-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -77933cdffbcd0f56888a50fd1d9fb39cf6148f1a \ No newline at end of file diff --git a/server/licenses/lucene-misc-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-misc-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..b701384ab601d --- /dev/null +++ b/server/licenses/lucene-misc-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +4d401c55114367e574ed51e914661f0a97f91e88 \ No newline at end of file diff --git a/server/licenses/lucene-queries-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-queries-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 4d148f3a840c8..0000000000000 --- a/server/licenses/lucene-queries-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8d521efa3a111e2feab1a7f07a0cc944bbdcddf4 \ No newline at end of file diff --git a/server/licenses/lucene-queries-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-queries-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..ec2f7508d35cc --- /dev/null +++ b/server/licenses/lucene-queries-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +0f165ff86546565d32a508c82ca80ac2840bcf38 \ No newline at end of file diff --git a/server/licenses/lucene-queryparser-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-queryparser-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index c6e913767696a..0000000000000 --- a/server/licenses/lucene-queryparser-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -30d6f8f757a007248804ed5db624a125ada24154 \ No newline at end of file diff --git a/server/licenses/lucene-queryparser-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-queryparser-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..40a125ccada21 --- /dev/null +++ b/server/licenses/lucene-queryparser-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +d6fb5af1873628dc026e18b5438042143a9a9824 \ No newline at end of file diff --git a/server/licenses/lucene-sandbox-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-sandbox-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 22b7769ee3b4d..0000000000000 --- a/server/licenses/lucene-sandbox-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8dd68761fade2dc4d2ea0d9d476a5172cfd22cd2 \ No newline at end of file diff --git a/server/licenses/lucene-sandbox-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-sandbox-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..b4784be40d072 --- /dev/null +++ b/server/licenses/lucene-sandbox-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +c48ab8982e6bf9429eded6a06d640db922eb2b69 \ No newline at end of file diff --git a/server/licenses/lucene-spatial-extras-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-spatial-extras-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 22d9211a3b623..0000000000000 --- a/server/licenses/lucene-spatial-extras-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -044ac03b461aaae4568f64948f783e87dae85a8b \ No newline at end of file diff --git a/server/licenses/lucene-spatial-extras-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-spatial-extras-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..6f39582081758 --- /dev/null +++ b/server/licenses/lucene-spatial-extras-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +d757dc379fee639f54d0574443c5a6fd0b70613a \ No newline at end of file diff --git a/server/licenses/lucene-spatial3d-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-spatial3d-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index 66998393ed970..0000000000000 --- a/server/licenses/lucene-spatial3d-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -53a02ec5b0eabe7fdf97fea1b19eeca5a6cf1122 \ No newline at end of file diff --git a/server/licenses/lucene-spatial3d-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-spatial3d-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..b5986970cb4da --- /dev/null +++ b/server/licenses/lucene-spatial3d-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +6a4e6de9b40cd027233a3ed00774810c36457a6c \ No newline at end of file diff --git a/server/licenses/lucene-suggest-9.3.0-snapshot-823df23.jar.sha1 b/server/licenses/lucene-suggest-9.3.0-snapshot-823df23.jar.sha1 deleted file mode 100644 index e5aca63b21732..0000000000000 --- a/server/licenses/lucene-suggest-9.3.0-snapshot-823df23.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a57b91ee1c6f3f666dcac697ce6a7de9bd5abba7 \ No newline at end of file diff --git a/server/licenses/lucene-suggest-9.3.0-snapshot-b7231bb.jar.sha1 b/server/licenses/lucene-suggest-9.3.0-snapshot-b7231bb.jar.sha1 new file mode 100644 index 0000000000000..682a0ee88868f --- /dev/null +++ b/server/licenses/lucene-suggest-9.3.0-snapshot-b7231bb.jar.sha1 @@ -0,0 +1 @@ +e793761c4a4292de0d52f066787ab5f3133382cd \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java index 224db09d99a99..2b73c5da27606 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java @@ -45,14 +45,17 @@ import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; @@ -351,4 +354,140 @@ public void testAwarenessZonesIncrementalNodes() { assertThat(counts.get(B_1), equalTo(2)); assertThat(counts.get(noZoneNode), equalTo(2)); } + + public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception { + int nodeCountPerAZ = 5; + int numOfShards = 30; + int numOfReplica = 1; + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.allocation.load_awareness.skew_factor", "0.0") + .put("cluster.routing.allocation.load_awareness.provisioned_capacity", Integer.toString(nodeCountPerAZ * 3)) + .build(); + + logger.info("--> starting 15 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + // Creating index with 30 primary and 1 replica + createIndex( + "test-1", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica) + .build() + ); + + ClusterHealthResponse health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1") + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + ObjectIntHashMap counts = new ObjectIntHashMap<>(); + + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1); + } + } + } + + assertThat(counts.size(), equalTo(nodeCountPerAZ * 3)); + // All shards should be started + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(numOfShards * (numOfReplica + 1))); + + // stopping half nodes in zone a + int nodesToStop = nodeCountPerAZ / 2; + List nodeDataPathSettings = new ArrayList<>(); + for (int i = 0; i < nodesToStop; i++) { + nodeDataPathSettings.add(internalCluster().dataPathSettings(nodes_in_zone_a.get(i))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(i))); + } + + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1") + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + // Creating another index with 30 primary and 1 replica + createIndex( + "test-2", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica) + .build() + ); + + health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1", "test-2") + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + assertFalse(health.isTimedOut()); + + // Restarting the nodes back + for (int i = 0; i < nodesToStop; i++) { + internalCluster().startNode( + Settings.builder() + .put("node.name", nodes_in_zone_a.get(i)) + .put(nodeDataPathSettings.get(i)) + .put(commonSettings) + .put("node.attr.zone", "a") + .build() + ); + } + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + + health = client().admin() + .cluster() + .prepareHealth() + .setIndices("test-1", "test-2") + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCountPerAZ * 3)) + .setWaitForGreenStatus() + .setWaitForActiveShards(2 * numOfShards * (numOfReplica + 1)) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .execute() + .actionGet(); + clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + // All shards should be started now and cluster health should be green + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1))); + assertThat(health.isTimedOut(), equalTo(false)); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 888881d43eb11..2bf73b34247b3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -675,7 +675,8 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); } diff --git a/server/src/main/java/org/opensearch/Version.java b/server/src/main/java/org/opensearch/Version.java index 04907ee5d054b..2cc8cde2cf0f3 100644 --- a/server/src/main/java/org/opensearch/Version.java +++ b/server/src/main/java/org/opensearch/Version.java @@ -87,9 +87,10 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_1_3_1 = new Version(1030199, org.apache.lucene.util.Version.LUCENE_8_10_1); public static final Version V_1_3_2 = new Version(1030299, org.apache.lucene.util.Version.LUCENE_8_10_1); public static final Version V_1_3_3 = new Version(1030399, org.apache.lucene.util.Version.LUCENE_8_10_1); + public static final Version V_1_3_4 = new Version(1030499, org.apache.lucene.util.Version.LUCENE_8_10_1); public static final Version V_2_0_0 = new Version(2000099, org.apache.lucene.util.Version.LUCENE_9_1_0); public static final Version V_2_0_1 = new Version(2000199, org.apache.lucene.util.Version.LUCENE_9_1_0); - public static final Version V_2_1_0 = new Version(2010099, org.apache.lucene.util.Version.LUCENE_9_2_0); + public static final Version V_2_1_0 = new Version(2010099, org.apache.lucene.util.Version.LUCENE_9_3_0); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_3_0); public static final Version CURRENT = V_3_0_0; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index ec70e642ababc..442137fb70e1f 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -283,6 +283,17 @@ public Iterator> settings() { Property.Final ); + public static final String SETTING_REMOTE_STORE = "index.remote_store"; + /** + * Used to specify if the index data should be persisted in the remote store. + */ + public static final Setting INDEX_REMOTE_STORE_SETTING = Setting.boolSetting( + SETTING_REMOTE_STORE, + false, + Property.IndexScope, + Property.Final + ); + public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index a873129723577..3d7ba09c839fc 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -33,11 +33,14 @@ package org.opensearch.cluster.routing.allocation.decider; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; @@ -207,12 +210,14 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout int numberOfAttributes = nodesPerAttribute.size(); List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); + if (fullValues != null) { - for (String fullValue : fullValues) { - if (shardPerAttribute.containsKey(fullValue) == false) { - numberOfAttributes++; - } + // If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes))) + Set attributesSet = new HashSet<>(fullValues); + for (ObjectCursor stringObjectCursor : nodesPerAttribute.keys()) { + attributesSet.add(stringObjectCursor.value); } + numberOfAttributes = attributesSet.size(); } // TODO should we remove ones that are not part of full list? diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java index 8e2824163709d..c43fb3be214a9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java @@ -33,11 +33,13 @@ * *

* and prevent allocation on the surviving nodes of the under capacity cluster - * based on overload factor defined as a percentage by + * based on overload factor defined as a percentage and flat skew as absolute allowed skewness by + *

*
  * cluster.routing.allocation.load_awareness.skew_factor: X
+ * cluster.routing.allocation.load_awareness.flat_skew: N
  * 
- * The total limit per node based on skew_factor doesn't limit primaries that previously + * The total limit per node based on skew_factor and flat_skew doesn't limit primaries that previously * existed on the disk as those shards are force allocated by * {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)} * however new primaries due to index creation, snapshot restore etc can be controlled via the below settings. @@ -74,6 +76,13 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider { Setting.Property.Dynamic, Property.NodeScope ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING = Setting.intSetting( + "cluster.routing.allocation.load_awareness.flat_skew", + 2, + 2, + Property.Dynamic, + Property.NodeScope + ); private volatile int provisionedCapacity; @@ -81,12 +90,15 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider { private volatile boolean allowUnassignedPrimaries; + private volatile int flatSkew; + private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class); public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) { this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings); this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings); this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings); + this.flatSkew = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, this::setSkewFactor); clusterSettings.addSettingsUpdateConsumer( CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, @@ -96,6 +108,7 @@ public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings cluster CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, this::setAllowUnassignedPrimaries ); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, this::setFlatSkew); } private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) { @@ -110,6 +123,10 @@ private void setProvisionedCapacity(int provisionedCapacity) { this.provisionedCapacity = provisionedCapacity; } + private void setFlatSkew(int flatSkew) { + this.flatSkew = flatSkew; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit); @@ -146,7 +163,7 @@ private Decision underCapacity( Metadata metadata = allocation.metadata(); float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity; int nodeShardCount = node.numberOfOwningShards(); - int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0)); + int limit = flatSkew + (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0)); if (decider.test(nodeShardCount, limit)) { logger.debug( () -> new ParameterizedMessage( @@ -163,10 +180,11 @@ private Decision underCapacity( Decision.NO, NAME, "too many shards [%d] allocated to this node, limit per node [%d] considering" - + " overload factor [%.2f] based on capacity [%d]", + + " overload factor [%.2f] and flat skew [%d] based on capacity [%d]", nodeShardCount, limit, skewFactor, + flatSkew, provisionedCapacity ); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index be92bf1643aee..9ba56dfa6456f 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -556,6 +556,7 @@ public void apply(Settings value, Settings current, Settings previous) { NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED, ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED, ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index ba2666b53d7a8..75d7081e7729a 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -217,7 +217,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { */ public static final Map FEATURE_FLAGGED_INDEX_SETTINGS = Map.of( FeatureFlags.REPLICATION_TYPE, - IndexMetadata.INDEX_REPLICATION_TYPE_SETTING + IndexMetadata.INDEX_REPLICATION_TYPE_SETTING, + FeatureFlags.REMOTE_STORE, + IndexMetadata.INDEX_REMOTE_STORE_SETTING ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 0b31e3814667a..fa39dc9ac5aa0 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -23,6 +23,12 @@ public class FeatureFlags { */ public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled"; + /** + * Gates the visibility of the index setting that allows persisting data to remote store along with local disk. + * Once the feature is ready for production release, this feature flag can be removed. + */ + public static final String REMOTE_STORE = "opensearch.experimental.feature.remote_store.enabled"; + /** * Used to test feature flags whose values are expected to be booleans. * This method returns true if the value is "true" (case-insensitive), diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 49daf8293656c..2cea0e4e3e95c 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -70,6 +70,7 @@ import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -118,6 +119,8 @@ public final class IndexModule { private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory(); + private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory(); + private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new; public static final Setting INDEX_STORE_TYPE_SETTING = new Setting<>( @@ -516,6 +519,7 @@ public IndexService newIndexService( client, queryCache, directoryFactory, + REMOTE_DIRECTORY_FACTORY, eventListener, readerWrapperFactory, mapperRegistry, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 0a6d1501f2bea..f699278919d6b 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -81,6 +81,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexingOperationListener; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; @@ -96,6 +97,9 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final NodeEnvironment nodeEnv; private final ShardStoreDeleter shardStoreDeleter; private final IndexStorePlugin.DirectoryFactory directoryFactory; + private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; private final CheckedFunction readerWrapper; private final IndexCache indexCache; @@ -190,6 +195,7 @@ public IndexService( Client client, QueryCache queryCache, IndexStorePlugin.DirectoryFactory directoryFactory, + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, IndexEventListener eventListener, Function> wrapperFactory, MapperRegistry mapperRegistry, @@ -260,6 +266,7 @@ public IndexService( this.eventListener = eventListener; this.nodeEnv = nodeEnv; this.directoryFactory = directoryFactory; + this.remoteDirectoryFactory = remoteDirectoryFactory; this.recoveryStateFactory = recoveryStateFactory; this.engineFactory = Objects.requireNonNull(engineFactory); this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory); @@ -430,7 +437,8 @@ public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher, + final RepositoriesService repositoriesService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -504,6 +512,21 @@ public synchronized IndexShard createShard( } }; Directory directory = directoryFactory.newDirectory(this.indexSettings, path); + Directory remoteDirectory = null; + RemoteStoreRefreshListener remoteStoreRefreshListener = null; + if (this.indexSettings.isRemoteStoreEnabled()) { + try { + Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID()); + remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory); + } catch (RepositoryMissingException e) { + throw new IllegalArgumentException( + "Repository should be created before creating index with remote_store enabled setting", + e + ); + } + } + store = new Store( shardId, this.indexSettings, @@ -533,7 +556,8 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null + this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null, + remoteStoreRefreshListener ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index e40acb94ee498..ed3f6002be073 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -534,6 +534,7 @@ public final class IndexSettings { private final Settings nodeSettings; private final int numberOfShards; private final ReplicationType replicationType; + private final boolean isRemoteStoreEnabled; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock private volatile Settings settings; private volatile IndexMetadata indexMetadata; @@ -686,6 +687,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.indexMetadata = indexMetadata; numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); + isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE, false); this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); @@ -927,6 +929,13 @@ public boolean isSegRepEnabled() { return ReplicationType.SEGMENT.equals(replicationType); } + /** + * Returns if remote store is enabled for this index. + */ + public boolean isRemoteStoreEnabled() { + return isRemoteStoreEnabled; + } + /** * Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the * index settings and the node settings where node settings are overwritten by index settings. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 20d97c77672fd..32d1db647732c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -303,6 +303,8 @@ Runnable getGlobalCheckpointSyncer() { private volatile boolean useRetentionLeasesInPeerRecovery; private final ReferenceManager.RefreshListener checkpointRefreshListener; + private final RemoteStoreRefreshListener remoteStoreRefreshListener; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -324,7 +326,8 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher + @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, + @Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -412,6 +415,7 @@ public boolean shouldCache(Query query) { } else { this.checkpointRefreshListener = null; } + this.remoteStoreRefreshListener = remoteStoreRefreshListener; } public ThreadPool getThreadPool() { @@ -3171,11 +3175,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } }; - final List internalRefreshListener; + final List internalRefreshListener = new ArrayList<>(); + internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + if (remoteStoreRefreshListener != null && shardRouting.primary()) { + internalRefreshListener.add(remoteStoreRefreshListener); + } if (this.checkpointRefreshListener != null) { - internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); - } else { - internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + internalRefreshListener.add(checkpointRefreshListener); } return this.engineConfigFactory.newEngineConfig( diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java new file mode 100644 index 0000000000000..4b549ec485c0e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * RefreshListener implementation to upload newly created segment files to the remote store + */ +public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { + + private final Directory storeDirectory; + private final Directory remoteDirectory; + // ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398) + private final Set filesUploadedToRemoteStore; + private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); + + public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException { + this.storeDirectory = storeDirectory; + this.remoteDirectory = remoteDirectory; + // ToDo: Handle failures in reading list of files (GitHub #3397) + this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll())); + } + + @Override + public void beforeRefresh() throws IOException { + // Do Nothing + } + + /** + * Upload new segment files created as part of the last refresh to the remote segment store. + * The method also deletes segment files from remote store which are not part of local filesystem. + * @param didRefresh true if the refresh opened a new reference + * @throws IOException in case of I/O error in reading list of local files + */ + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + if (didRefresh) { + Set localFiles = Set.of(storeDirectory.listAll()); + localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> { + try { + remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); + filesUploadedToRemoteStore.add(file); + } catch (NoSuchFileException e) { + logger.info( + () -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file), + e + ); + } catch (IOException e) { + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); + } + }); + + Set remoteFilesToBeDeleted = new HashSet<>(); + // ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142) + filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> { + try { + remoteDirectory.deleteFile(file); + remoteFilesToBeDeleted.add(file); + } catch (IOException e) { + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e); + } + }); + + remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java new file mode 100644 index 0000000000000..2f8f977537327 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. + * A remoteDirectory contains only files (no sub-folder hierarchy). This class does not support all the methods in + * the Directory interface. Currently, it contains implementation of methods which are used to copy files to/from + * the remote store. Implementation of remaining methods will be added as remote store is integrated with + * replication, peer recovery etc. + * + * @opensearch.internal + */ +public final class RemoteDirectory extends Directory { + + private final BlobContainer blobContainer; + + public RemoteDirectory(BlobContainer blobContainer) { + this.blobContainer = blobContainer; + } + + /** + * Returns names of all files stored in this directory. The output must be in sorted (UTF-16, + * java's {@link String#compareTo}) order. + */ + @Override + public String[] listAll() throws IOException { + return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new); + } + + /** + * Removes an existing file in the directory. + * + *

This method will not throw an exception when the file doesn't exist and simply ignores this case. + * This is a deviation from the {@code Directory} interface where it is expected to throw either + * {@link NoSuchFileException} or {@link FileNotFoundException} if {@code name} points to a non-existing file. + * + * @param name the name of an existing file. + * @throws IOException if the file exists but could not be deleted. + */ + @Override + public void deleteFile(String name) throws IOException { + // ToDo: Add a check for file existence + blobContainer.deleteBlobsIgnoringIfNotExists(Collections.singletonList(name)); + } + + /** + * Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote + * store. + * + *

In the {@link Directory} interface, it is expected to throw {@link java.nio.file.FileAlreadyExistsException} + * if the file already exists in the remote store. As this method does not open a file, it does not throw the + * exception. + * + * @param name the name of the file to copy to remote store. + */ + @Override + public IndexOutput createOutput(String name, IOContext context) { + return new RemoteIndexOutput(name, blobContainer); + } + + /** + * Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream. + * + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist + */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name)); + } + + /** + * Closes the directory by deleting all the files in this directory + */ + @Override + public void close() throws IOException { + blobContainer.delete(); + } + + /** + * Returns the byte length of a file in the directory. + * + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist + */ + @Override + public long fileLength(String name) throws IOException { + // ToDo: Instead of calling remote store each time, keep a cache with segment metadata + Map metadata = blobContainer.listBlobsByPrefix(name); + if (metadata.containsKey(name)) { + return metadata.get(name).length(); + } + throw new NoSuchFileException(name); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once soft deleting is supported segment files in the remote store, this method will provide details of + * number of files marked as deleted but not actually deleted from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public Set getPendingDeletions() throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Temporary IndexOutput is not required while working with Remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Segment upload to the remote store will be permanent and does not require a separate sync API. + * This may change in the future if segment upload to remote store happens via cache and we need sync API to write + * the cache contents to the store permanently. + * + * @throws UnsupportedOperationException always + */ + @Override + public void sync(Collection names) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once metadata to be stored with each shard is finalized, syncMetaData method will be used to sync the directory + * metadata to the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void syncMetaData() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * As this method is used by IndexWriter to publish commits, the implementation of this method is required when + * IndexWriter is backed by RemoteDirectory. + * + * @throws UnsupportedOperationException always + */ + @Override + public void rename(String source, String dest) throws IOException { + throw new UnsupportedOperationException(); + + } + + /** + * Guaranteed to throw an exception and leave the directory unmodified. + * Once locking segment files in remote store is supported, implementation of this method is required with + * remote store specific LockFactory. + * + * @throws UnsupportedOperationException always + */ + @Override + public Lock obtainLock(String name) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java new file mode 100644 index 0000000000000..eb7912a1f4a2b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; + +/** + * Factory for a remote store directory + * + * @opensearch.internal + */ +public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { + + @Override + public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobPath blobPath = new BlobPath(); + blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); + BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); + return new RemoteDirectory(blobContainer); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java new file mode 100644 index 0000000000000..24e1128dec1b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Class for input from a file in a {@link RemoteDirectory}. Used for all input operations from the remote store. + * Currently, only methods from {@link IndexInput} that are required for reading a file from remote store are + * implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, + * peer recovery etc. + * ToDo: Extend ChecksumIndexInput + * @see RemoteDirectory + * + * @opensearch.internal + */ +public class RemoteIndexInput extends IndexInput { + + private final InputStream inputStream; + private final long size; + + public RemoteIndexInput(String name, InputStream inputStream, long size) { + super(name); + this.inputStream = inputStream; + this.size = size; + } + + @Override + public byte readByte() throws IOException { + byte[] buffer = new byte[1]; + inputStream.read(buffer); + return buffer[0]; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + inputStream.read(b, offset, len); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + + @Override + public long length() { + return size; + } + + @Override + public void seek(long pos) throws IOException { + inputStream.skip(pos); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getFilePointer() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java new file mode 100644 index 0000000000000..2af65452a6eac --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.InputStreamIndexInput; + +import java.io.IOException; + +/** + * Class for output to a file in a {@link RemoteDirectory}. Used for all output operations to the remote store. + * Currently, only methods from {@link IndexOutput} that are required for uploading a segment file to remote store are + * implemented. Remaining methods will be implemented as we open up remote store for other use cases like replication, + * peer recovery etc. + * ToDo: Extend ChecksumIndexInput + * @see RemoteDirectory + * + * @opensearch.internal + */ +public class RemoteIndexOutput extends IndexOutput { + + private final BlobContainer blobContainer; + + public RemoteIndexOutput(String name, BlobContainer blobContainer) { + super(name, name); + this.blobContainer = blobContainer; + } + + @Override + public void copyBytes(DataInput input, long numBytes) throws IOException { + assert input instanceof IndexInput : "input should be instance of IndexInput"; + blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false); + } + + /** + * This is a no-op. Once segment file upload to the remote store is complete, we don't need to explicitly close + * the stream. It is taken care by internal APIs of client of the remote store. + */ + @Override + public void close() throws IOException { + // do nothing + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeByte(byte b) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public void writeBytes(byte[] byteArray, int offset, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not used for the file transfer to/from the remote store. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getFilePointer() { + throw new UnsupportedOperationException(); + } + + /** + * Guaranteed to throw an exception and leave the RemoteIndexOutput unmodified. + * This method is not implemented as it is not directly used for the file transfer to/from the remote store. + * But the checksum is important to verify integrity of the data and that means implementing this method will + * be required for the segment upload as well. + * + * @throws UnsupportedOperationException always + */ + @Override + public long getChecksum() throws IOException { + throw new UnsupportedOperationException(); + } + +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 79fd2893fb78c..b2f6f10c19638 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -859,7 +859,13 @@ public IndexShard createShard( IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); + IndexShard indexShard = indexService.createShard( + shardRouting, + globalCheckpointSyncer, + retentionLeaseSyncer, + checkpointPublisher, + repositoriesService + ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index 2f549fec54759..52ddf6dcf2753 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -39,6 +39,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.repositories.Repository; import java.io.IOException; import java.util.Collections; @@ -66,6 +67,22 @@ interface DirectoryFactory { Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException; } + /** + * An interface that describes how to create a new remote directory instance per shard. + */ + @FunctionalInterface + interface RemoteDirectoryFactory { + /** + * Creates a new remote directory per shard. This method is called once per shard on shard creation. + * @param indexSettings the shards index settings + * @param shardPath the path the shard is using + * @param repository to get the BlobContainer details + * @return a new RemoteDirectory instance + * @throws IOException if an IOException occurs while opening the directory + */ + Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException; + } + /** * The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting * {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java index c9e427a178515..b2adcd21cd8c9 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -35,23 +35,32 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; + +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static java.util.Collections.singletonMap; @@ -971,4 +980,87 @@ public void testMultipleAwarenessAttributes() { assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); } + + public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + AllocationService strategy = createAllocationService(settings); + + logger.info("Building initial routing table for 'testAllocationExplainForUnassignedShardsWithUnbalancedZones'"); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding 3 nodes in different zones and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("A-0", singletonMap("zone", "a"))) + .add(newNode("A-1", singletonMap("zone", "a"))) + .add(newNode("B-0", singletonMap("zone", "b"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shard (primary)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + // One Shard is unassigned due to forced zone awareness + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); + + List unassignedShards = clusterState.getRoutingTable().shardsWithState(UNASSIGNED); + + ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + // Add a new node in zone c + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("C-0", singletonMap("zone", "c")))) + .build(); + + final AwarenessAllocationDecider decider = new AwarenessAllocationDecider(settings, EMPTY_CLUSTER_SETTINGS); + + final RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(decider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + allocation.debugDecision(true); + + Decision decision = null; + RoutingNodes nodes = clusterState.getRoutingNodes(); + + for (RoutingNode node : nodes) { + // Try to allocate unassigned shard to A-0, fails because of forced zone awareness + if (node.nodeId().equals("A-0")) { + decision = decider.canAllocate(unassignedShards.get(0), node, allocation); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals( + decision.getExplanation(), + "there are too many copies of the shard allocated to nodes with attribute" + + " [zone], there are [3] total configured shard copies for this shard id and [3]" + + " total attribute values, expected the allocated shard count per attribute [2] to" + + " be less than or equal to the upper bound of the required number of shards per attribute [1]" + ); + } + + } + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java index d2e7e0e7e636a..c4dcae84581cb 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java @@ -22,7 +22,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; -import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; @@ -106,9 +105,11 @@ public void testNewUnassignedPrimaryAllocationOnOverload() { .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1")))) .build(); - // 4 existing shards from this node's local store get started + // 4 existing shards from this node's local store get started and cluster rebalances newState = strategy.reroute(newState, "reroute"); - newState = startInitializingShardsAndReroute(strategy, newState); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(32)); // add back node2 when skewness is still breached @@ -282,11 +283,14 @@ public void testExistingPrimariesAllocationOnOverload() { newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); newState = strategy.reroute(newState, "reroute"); - newState = startInitializingShardsAndReroute(strategy, newState); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + // 28 shards should be assigned (14 on each node -> 8 * 1.5 + 2) logger.info("limits should be applied on newly create primaries"); - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(16)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(12)); assertEquals( 12L, @@ -298,7 +302,7 @@ public void testExistingPrimariesAllocationOnOverload() { ); assertEquals( - 4L, + 0L, newState.getRoutingNodes() .shardsWithState(UNASSIGNED) .stream() @@ -306,7 +310,7 @@ public void testExistingPrimariesAllocationOnOverload() { .count() ); - assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(14)); logger.info("--> Remove node4 from zone holding primaries"); newState = removeNodes(newState, strategy, "node4"); @@ -339,10 +343,10 @@ public void testExistingPrimariesAllocationOnOverload() { logger.info("--> do another reroute, make sure nothing moves"); assertThat(strategy.reroute(newState, "reroute").routingTable(), sameInstance(newState.routingTable())); - assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); - assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(12)); + assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(14)); + assertThat(newState.getRoutingNodes().node("node5").size(), equalTo(14)); - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(24)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(28)); newState = ClusterState.builder(newState) .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node1", singletonMap("zone", "zone_1")))) @@ -436,7 +440,8 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(30)); + // Each node can take 12 shards each (2 + ceil(8*1.2)) + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(36)); for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); @@ -458,10 +463,12 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); newState = strategy.reroute(newState, "reroute"); - newState = startInitializingShardsAndReroute(strategy, newState); + while (!newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty()) { + newState = startInitializingShardsAndReroute(strategy, newState); + } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(66)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(14)); logger.info("add another index with 60 shards"); metadata = Metadata.builder(newState.metadata()) @@ -482,8 +489,8 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(126)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(14)); logger.info("change settings to allow unassigned primaries"); strategy = createAllocationServiceWithAdditionalSettings( @@ -499,7 +506,7 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { ); for (RoutingNode node : newState.getRoutingNodes()) { - assertThat(node.size(), equalTo(40)); + assertThat(node.size(), equalTo(42)); } logger.info("add another index with 5 shards"); @@ -513,15 +520,15 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { ) .build(); updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test3")).build(); - // increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35 per node and NO primaries get assigned - // since total owning shards are 40 per node already + // increases avg shard per node to 145/5 = 29, overload factor 1.2, total allowed 35+2=37 per node and NO primaries get assigned + // since total owning shards are 42 per node already newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); newState = strategy.reroute(newState, "reroute"); newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(25)); + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(126)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(19)); assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().filter(ShardRouting::primary).count(), equalTo(5L)); } @@ -600,21 +607,24 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); - assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(7)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(7)); // add the removed node newState = addNodes(newState, strategy, "zone3", "node11"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); - newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); + assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(6)); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); // add the removed node newState = addNodes(newState, strategy, "zone3", "node12"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); - newState = startInitializingShardsAndReroute(strategy, newState); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); // add the removed node @@ -674,13 +684,14 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload() { logger.info("--> add five new node in new zone and reroute"); clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + // Each node can take 7 shards each now (2 + ceil(4*1.2)) assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(30)); - assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(25)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(30)); logger.info("--> complete relocation"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); - assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(55)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(60)); logger.info("--> do another reroute, make sure nothing moves"); assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); @@ -707,6 +718,7 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload() { newState = startInitializingShardsAndReroute(strategy, newState); } + // Each node can now have 5 shards each assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); @@ -791,8 +803,9 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() newState = startInitializingShardsAndReroute(strategy, newState); } // ensure minority zone doesn't get overloaded - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(53)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + // each node can take 10 shards each (2 + ceil(7*1.1)) + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(61)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT); } @@ -912,15 +925,20 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { clusterState = startInitializingShardsAndReroute(strategy, clusterState); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); - // assert replicas are not assigned but primaries are - logger.info("--> replicas are not initializing"); - assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + // Each node can take 11 shards each (2 + ceil(8*1.1)), hence 2 replicas will also start + logger.info("--> 2 replicas are initializing"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); assertFalse(shard.primary()); } + logger.info("--> start the shards (replicas)"); + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + logger.info("--> do another reroute, make sure nothing moves"); assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); @@ -929,10 +947,12 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(18)); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } logger.info("--> replicas are started"); - assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(38)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.INDEX_CREATED); @@ -1012,11 +1032,12 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() newState = startInitializingShardsAndReroute(strategy, newState); } - assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(50)); - assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(10)); + // Each node can take 7 shards max ( 2 + ceil(4*1.2)) + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); for (RoutingNode node : newState.getRoutingNodes()) { - assertThat(node.size(), equalTo(5)); + assertThat(node.size(), equalTo(6)); } // add the removed node @@ -1025,9 +1046,7 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() .build(); newState = strategy.reroute(newState, "reroute"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); // add the removed node newState = ClusterState.builder(newState) @@ -1035,9 +1054,7 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() .build(); newState = strategy.reroute(newState, "reroute"); - assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); newState = startInitializingShardsAndReroute(strategy, newState); - assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); // add the removed node newState = ClusterState.builder(newState) @@ -1068,6 +1085,120 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); } + public void testThreeZoneOneReplicaWithSkewFactorZeroAllShardsAssignedAfterRecovery() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings( + org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), + 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), + 0, + "cluster.routing.allocation.awareness.force.zone.values", + "zone1,zone2,zone3" + ) + ); + + logger.info("Building initial routing table for 'testThreeZoneOneReplicaWithSkewFactorZeroAllShardsAssignedAfterRecovery'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(30).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(30)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> add five new node in new zone and reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(4)); + assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); + + logger.info("--> Removing three nodes from zone3"); + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + + // Each node can take 6 shards max (2 + ceil(4*1.0)), so all shards should be assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(60)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + logger.info("add another index with 30 primary 1 replica"); + metadata = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder("test1") + .settings( + settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 30) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ) + ) + .build(); + RoutingTable updatedRoutingTable = RoutingTable.builder(newState.routingTable()).addAsNew(metadata.index("test1")).build(); + + newState = ClusterState.builder(newState).metadata(metadata).routingTable(updatedRoutingTable).build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node11", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node12", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + newState = startInitializingShardsAndReroute(strategy, newState); + + // add the removed node + newState = ClusterState.builder(newState) + .nodes(DiscoveryNodes.builder(newState.nodes()).add(newNode("node13", singletonMap("zone", "zone3")))) + .build(); + newState = strategy.reroute(newState, "reroute"); + + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { + newState = startInitializingShardsAndReroute(strategy, newState); + } + assertThat(newState.getRoutingNodes().node("node13").size(), equalTo(8)); + assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(8)); + assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(8)); + // ensure all shards are assigned + assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(120)); + assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + } + private ClusterState removeNodes(ClusterState clusterState, AllocationService allocationService, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.getNodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.remove(nodeId)); @@ -1097,7 +1228,6 @@ private Settings buildSettings(Map settingsValue) { .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "zone"); settingsValue.forEach((k, v) -> { if (v instanceof Integer) settingsBuilder.put(k, (Integer) (v)); diff --git a/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java b/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java index 4c179309f16ba..776b44d346fb5 100644 --- a/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java @@ -591,6 +591,9 @@ public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception { Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) .setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy())); + // override 500ms default introduced in + // https://issues.apache.org/jira/browse/LUCENE-10078 + config.setMaxFullFlushMergeWaitMillis(0); IndexWriter writer = new IndexWriter(dir, config); int numDocs = between(1, 10); List liveDocs = new ArrayList<>(); diff --git a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java index 1084f9c658db4..a4f2b242564e2 100644 --- a/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java +++ b/server/src/test/java/org/opensearch/common/util/FeatureFlagTests.java @@ -21,6 +21,7 @@ public class FeatureFlagTests extends OpenSearchTestCase { @BeforeClass public static void enableFeature() { AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(FeatureFlags.REPLICATION_TYPE, "true")); + AccessController.doPrivileged((PrivilegedAction) () -> System.setProperty(FeatureFlags.REMOTE_STORE, "true")); } public void testReplicationTypeFeatureFlag() { @@ -40,4 +41,10 @@ public void testNonBooleanFeatureFlag() { assertNotNull(System.getProperty(javaVersionProperty)); assertFalse(FeatureFlags.isEnabled(javaVersionProperty)); } + + public void testRemoteStoreFeatureFlag() { + String remoteStoreFlag = FeatureFlags.REMOTE_STORE; + assertNotNull(System.getProperty(remoteStoreFlag)); + assertTrue(FeatureFlags.isEnabled(remoteStoreFlag)); + } } diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 71433673eef5a..4b3dc041b9f54 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.translog.Translog; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -56,6 +57,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.StringContains.containsString; import static org.hamcrest.object.HasToString.hasToString; +import static org.opensearch.common.settings.IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS; public class IndexSettingsTests extends OpenSearchTestCase { @@ -753,4 +755,41 @@ public void testIgnoreTranslogRetentionSettingsIfSoftDeletesEnabled() { assertThat(indexSettings.getTranslogRetentionAge().millis(), equalTo(-1L)); assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L)); } + + public void testRemoteStoreDefaultSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertFalse(settings.isRemoteStoreEnabled()); + } + + public void testRemoteStoreExplicitSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REMOTE_STORE, true) + .build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertTrue(settings.isRemoteStoreEnabled()); + } + + public void testUpdateRemoteStoreFails() { + Set> remoteStoreSettingSet = new HashSet<>(); + remoteStoreSettingSet.add(FEATURE_FLAGGED_INDEX_SETTINGS.get(FeatureFlags.REMOTE_STORE)); + IndexScopedSettings settings = new IndexScopedSettings(Settings.EMPTY, remoteStoreSettingSet); + IllegalArgumentException error = expectThrows( + IllegalArgumentException.class, + () -> settings.updateSettings( + Settings.builder().put("index.remote_store", randomBoolean()).build(), + Settings.builder(), + Settings.builder(), + "index" + ) + ); + assertEquals(error.getMessage(), "final index setting [index.remote_store], not updateable"); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java new file mode 100644 index 0000000000000..af92d821a9043 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doThrow; + +public class RemoteStoreRefreshListenerTests extends OpenSearchTestCase { + private Directory storeDirectory; + private Directory remoteDirectory; + + private RemoteStoreRefreshListener remoteStoreRefreshListener; + + public void setup(String[] remoteFiles) throws IOException { + storeDirectory = mock(Directory.class); + remoteDirectory = mock(Directory.class); + when(remoteDirectory.listAll()).thenReturn(remoteFiles); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(storeDirectory, remoteDirectory); + } + + public void testAfterRefreshFalse() throws IOException { + setup(new String[0]); + remoteStoreRefreshListener.afterRefresh(false); + verify(storeDirectory, times(0)).listAll(); + } + + public void testAfterRefreshTrueNoLocalFiles() throws IOException { + setup(new String[0]); + + when(storeDirectory.listAll()).thenReturn(new String[0]); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any()); + verify(remoteDirectory, times(0)).deleteFile(any()); + } + + public void testAfterRefreshOnlyUploadFiles() throws IOException { + setup(new String[0]); + + String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); + verify(remoteDirectory, times(0)).deleteFile(any()); + } + + public void testAfterRefreshOnlyUploadAndDelete() throws IOException { + setup(new String[] { "0.si", "0.cfs" }); + + String[] localFiles = new String[] { "segments_1", "1.si", "1.cfs", "1.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.si", "1.si", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT); + verify(remoteDirectory).deleteFile("0.si"); + verify(remoteDirectory).deleteFile("0.cfs"); + } + + public void testAfterRefreshOnlyDelete() throws IOException { + setup(new String[] { "0.si", "0.cfs" }); + + String[] localFiles = new String[] { "0.si" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory, times(0)).copyFrom(any(), any(), any(), any()); + verify(remoteDirectory).deleteFile("0.cfs"); + } + + public void testAfterRefreshTempLocalFile() throws IOException { + setup(new String[0]); + + String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs.tmp" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + doThrow(new NoSuchFileException("0.cfs.tmp")).when(remoteDirectory) + .copyFrom(storeDirectory, "0.cfs.tmp", "0.cfs.tmp", IOContext.DEFAULT); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); + verify(remoteDirectory, times(0)).deleteFile(any()); + } + + public void testAfterRefreshConsecutive() throws IOException { + setup(new String[0]); + + String[] localFiles = new String[] { "segments_1", "0.si", "0.cfs", "0.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFiles); + doThrow(new IOException("0.cfs")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfe", IOContext.DEFAULT); + doThrow(new IOException("0.cfe")).when(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); + + remoteStoreRefreshListener.afterRefresh(true); + verify(storeDirectory).listAll(); + verify(remoteDirectory).copyFrom(storeDirectory, "segments_1", "segments_1", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.si", "0.si", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfe", "0.cfe", IOContext.DEFAULT); + verify(remoteDirectory, times(0)).deleteFile(any()); + + String[] localFilesSecondRefresh = new String[] { "segments_1", "0.cfs", "1.cfs", "1.cfe" }; + when(storeDirectory.listAll()).thenReturn(localFilesSecondRefresh); + + remoteStoreRefreshListener.afterRefresh(true); + + verify(remoteDirectory).copyFrom(storeDirectory, "0.cfs", "0.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfs", "1.cfs", IOContext.DEFAULT); + verify(remoteDirectory).copyFrom(storeDirectory, "1.cfe", "1.cfe", IOContext.DEFAULT); + verify(remoteDirectory).deleteFile("0.si"); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java new file mode 100644 index 0000000000000..d781fad9ab99c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +public class RemoteDirectoryFactoryTests extends OpenSearchTestCase { + + private RemoteDirectoryFactory remoteDirectoryFactory; + + @Before + public void setup() { + remoteDirectoryFactory = new RemoteDirectoryFactory(); + } + + public void testNewDirectory() throws IOException { + Settings settings = Settings.builder().build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); + Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); + ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); + BlobStoreRepository repository = mock(BlobStoreRepository.class); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(repository.blobStore()).thenReturn(blobStore); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository); + assertTrue(directory instanceof RemoteDirectory); + ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); + verify(blobStore).blobContainer(blobPathCaptor.capture()); + BlobPath blobPath = blobPathCaptor.getValue(); + assertEquals("foo/0/", blobPath.buildAsString()); + + directory.listAll(); + verify(blobContainer).listBlobs(); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java new file mode 100644 index 0000000000000..c2c365d9140df --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Mockito.*; + +public class RemoteDirectoryTests extends OpenSearchTestCase { + private BlobContainer blobContainer; + + private RemoteDirectory remoteDirectory; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + remoteDirectory = new RemoteDirectory(blobContainer); + } + + public void testListAllEmpty() throws IOException { + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + String[] actualFileNames = remoteDirectory.listAll(); + String[] expectedFileName = new String[] {}; + assertArrayEquals(expectedFileName, actualFileNames); + } + + public void testListAll() throws IOException { + Map fileNames = Stream.of("abc", "xyz", "pqr", "lmn", "jkl") + .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); + + when(blobContainer.listBlobs()).thenReturn(fileNames); + + String[] actualFileNames = remoteDirectory.listAll(); + String[] expectedFileName = new String[] { "abc", "jkl", "lmn", "pqr", "xyz" }; + assertArrayEquals(expectedFileName, actualFileNames); + } + + public void testListAllException() throws IOException { + when(blobContainer.listBlobs()).thenThrow(new IOException("Error reading blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.listAll()); + } + + public void testDeleteFile() throws IOException { + remoteDirectory.deleteFile("segment_1"); + + verify(blobContainer).deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1")); + } + + public void testDeleteFileException() throws IOException { + doThrow(new IOException("Error writing to blob store")).when(blobContainer) + .deleteBlobsIgnoringIfNotExists(Collections.singletonList("segment_1")); + + assertThrows(IOException.class, () -> remoteDirectory.deleteFile("segment_1")); + } + + public void testCreateOutput() { + IndexOutput indexOutput = remoteDirectory.createOutput("segment_1", IOContext.DEFAULT); + assertTrue(indexOutput instanceof RemoteIndexOutput); + assertEquals("segment_1", indexOutput.getName()); + } + + public void testOpenInput() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); + Map fileInfo = new HashMap<>(); + fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); + when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + + IndexInput indexInput = remoteDirectory.openInput("segment_1", IOContext.DEFAULT); + assertTrue(indexInput instanceof RemoteIndexInput); + assertEquals(100, indexInput.length()); + } + + public void testOpenInputIOException() throws IOException { + when(blobContainer.readBlob("segment_1")).thenThrow(new IOException("Error while reading")); + + assertThrows(IOException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT)); + } + + public void testOpenInputNoSuchFileException() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); + when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1")); + + assertThrows(NoSuchFileException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT)); + } + + public void testClose() throws IOException { + remoteDirectory.close(); + + verify(blobContainer).delete(); + } + + public void testCloseIOException() throws IOException { + when(blobContainer.delete()).thenThrow(new IOException("Error while writing to blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.close()); + } + + public void testFileLength() throws IOException { + Map fileInfo = new HashMap<>(); + fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); + when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + + assertEquals(100, remoteDirectory.fileLength("segment_1")); + } + + public void testFileLengthIOException() throws IOException { + when(blobContainer.listBlobsByPrefix("segment_1")).thenThrow(new NoSuchFileException("segment_1")); + + assertThrows(IOException.class, () -> remoteDirectory.fileLength("segment_1")); + } + + public void testGetPendingDeletions() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.getPendingDeletions()); + } + + public void testCreateTempOutput() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.createTempOutput("segment_1", "tmp", IOContext.DEFAULT)); + } + + public void testSync() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.sync(Collections.emptyList())); + } + + public void testRename() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.rename("segment_1", "segment_2")); + } + + public void testObtainLock() { + assertThrows(UnsupportedOperationException.class, () -> remoteDirectory.obtainLock("segment_1")); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java new file mode 100644 index 0000000000000..c2f81c035e424 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexInputTests.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.io.InputStream; + +import static org.mockito.Mockito.*; + +public class RemoteIndexInputTests extends OpenSearchTestCase { + + private static final String FILENAME = "segment_1"; + private static final long FILESIZE = 200; + + private InputStream inputStream; + private RemoteIndexInput remoteIndexInput; + + @Before + public void setup() { + inputStream = mock(InputStream.class); + remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE); + } + + public void testReadByte() throws IOException { + InputStream inputStream = spy(InputStream.class); + remoteIndexInput = new RemoteIndexInput(FILENAME, inputStream, FILESIZE); + + when(inputStream.read()).thenReturn(10); + + assertEquals(10, remoteIndexInput.readByte()); + + verify(inputStream).read(any()); + } + + public void testReadByteIOException() throws IOException { + when(inputStream.read(any())).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.readByte()); + } + + public void testReadBytes() throws IOException { + byte[] buffer = new byte[10]; + remoteIndexInput.readBytes(buffer, 10, 20); + + verify(inputStream).read(buffer, 10, 20); + } + + public void testReadBytesIOException() throws IOException { + byte[] buffer = new byte[10]; + when(inputStream.read(buffer, 10, 20)).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.readBytes(buffer, 10, 20)); + } + + public void testClose() throws IOException { + remoteIndexInput.close(); + + verify(inputStream).close(); + } + + public void testCloseIOException() throws IOException { + doThrow(new IOException("Error closing")).when(inputStream).close(); + + assertThrows(IOException.class, () -> remoteIndexInput.close()); + } + + public void testLength() { + assertEquals(FILESIZE, remoteIndexInput.length()); + } + + public void testSeek() throws IOException { + remoteIndexInput.seek(10); + + verify(inputStream).skip(10); + } + + public void testSeekIOException() throws IOException { + when(inputStream.skip(10)).thenThrow(new IOException("Error reading")); + + assertThrows(IOException.class, () -> remoteIndexInput.seek(10)); + } + + public void testGetFilePointer() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.getFilePointer()); + } + + public void testSlice() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexInput.slice("Slice middle", 50, 100)); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java b/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java new file mode 100644 index 0000000000000..64975f2ac4892 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteIndexOutputTests.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexInput; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +public class RemoteIndexOutputTests extends OpenSearchTestCase { + private static final String FILENAME = "segment_1"; + + private BlobContainer blobContainer; + + private RemoteIndexOutput remoteIndexOutput; + + @Before + public void setup() { + blobContainer = mock(BlobContainer.class); + remoteIndexOutput = new RemoteIndexOutput(FILENAME, blobContainer); + } + + public void testCopyBytes() throws IOException { + IndexInput indexInput = mock(IndexInput.class); + remoteIndexOutput.copyBytes(indexInput, 100); + + verify(blobContainer).writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false)); + } + + public void testCopyBytesIOException() throws IOException { + doThrow(new IOException("Error writing")).when(blobContainer) + .writeBlob(eq(FILENAME), any(InputStreamIndexInput.class), eq(100L), eq(false)); + + IndexInput indexInput = mock(IndexInput.class); + assertThrows(IOException.class, () -> remoteIndexOutput.copyBytes(indexInput, 100)); + } + + public void testWriteByte() { + byte b = 10; + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeByte(b)); + } + + public void testWriteBytes() { + byte[] buffer = new byte[10]; + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.writeBytes(buffer, 50, 60)); + } + + public void testGetFilePointer() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getFilePointer()); + } + + public void testGetChecksum() { + assertThrows(UnsupportedOperationException.class, () -> remoteIndexOutput.getChecksum()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 0989bf869f18e..213a22539971f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY + SegmentReplicationCheckpointPublisher.EMPTY, + null ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 371fa6d102304..62c52ab636255 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -525,7 +525,8 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - checkpointPublisher + checkpointPublisher, + null ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true;