Skip to content

Commit

Permalink
Increment replication plugin version to 2.0
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Kumar <karanas@amazon.com>
  • Loading branch information
saikaranam-amazon committed Apr 5, 2022
1 parent ecd95b2 commit 20bdec8
Show file tree
Hide file tree
Showing 13 changed files with 32 additions and 122 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ jobs:
build:
strategy:
matrix:
java:
- 8
java:
- 11
- 14
# Job name
Expand All @@ -30,7 +29,7 @@ jobs:
uses: actions/checkout@v2
- name: Build and run Replication tests
run: |
./gradlew clean release -Dbuild.snapshot=true -Dopensearch.version=1.3.0-SNAPSHOT
./gradlew clean release -Dbuild.snapshot=true
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/bwc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Build and run Replication tests
run: |
echo "Running backwards compatibility tests ..."
./gradlew clean release -Dbuild.snapshot=true -Dopensearch.version=1.3.0-SNAPSHOT -x test -x IntegTest
./gradlew clean release -Dbuild.snapshot=true -x test -x IntegTest
./gradlew mixedClusterTask --stacktrace
./gradlew fullRestartClusterTask --stacktrace
- name: Upload failed logs
Expand Down
57 changes: 0 additions & 57 deletions .github/workflows/security-tests.yml

This file was deleted.

44 changes: 15 additions & 29 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.opensearch.gradle.test.RestIntegTestTask
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "false")
opensearch_version = System.getProperty("opensearch.version", "1.3.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.0.0-SNAPSHOT")
// Taken from https://github.com/opensearch-project/alerting/blob/main/build.gradle#L33
// 1.0.0 -> 1.0.0.0, and 1.0.0-SNAPSHOT -> 1.0.0.0-SNAPSHOT
opensearch_build = opensearch_version.replaceAll(/(\.\d)([^\d]*)$/, '$1.0$2')
Expand All @@ -43,7 +43,7 @@ buildscript {
plugin_previous_version = opensearch_previous_version.replaceAll(/(\.\d)([^\d]*)$/, '$1.0$2')

common_utils_version = System.getProperty("common_utils.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.3.72")
kotlin_version = System.getProperty("kotlin.version", "1.6.0")

}

Expand Down Expand Up @@ -109,21 +109,21 @@ configurations.all {
}

dependencies {
compileOnly "org.opensearch:opensearch:${opensearch_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk7"
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile "org.jetbrains:annotations:13.0"
compile "com.github.seancfoley:ipaddress:5.3.3"
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5"
compile "org.opensearch:common-utils:${common_utils_version}"

testCompile "org.opensearch.test:framework:${opensearch_version}"
runtimeOnly "org.opensearch:opensearch:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
implementation "org.jetbrains:annotations:13.0"
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:${kotlin_version}"
implementation "org.opensearch:common-utils:${common_utils_version}"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.assertj:assertj-core:3.17.2"
testImplementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.5"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${kotlin_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
}

Expand Down Expand Up @@ -434,20 +434,6 @@ integTest {
}
}

doLast {
for (int port=startJmxPort; port<endJmxPort; port++) {
def serverUrl = "service:jmx:rmi:///jndi/rmi://127.0.0.1:" + port + "/jmxrmi"
def connector = JMXConnectorFactory.connect(new JMXServiceURL(serverUrl))
try {
def jacocoMBean = new GroovyMBean(connector.MBeanServerConnection, "org.jacoco:type=Runtime")
byte[] data = jacocoMBean.getExecutionData(false)
file(jacoco.destinationFile).append(data)
} finally {
connector.close()
}
}
}

systemProperty "build.dir", "${buildDir}"
systemProperty "java.security.policy", "file://${projectDir}/src/test/resources/plugin-security.policy"
finalizedBy jacocoTestReport
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@

distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
val options = IndicesOptions.strictSingleIndexNoExpandForbidClosed()
val getMappingsRequest = GetMappingsRequest().indices(leaderIndex).indicesOptions(options)
val getMappingsResponse = remoteClient.suspending(remoteClient.admin().indices()::getMappings, injectSecurityContext = true)(getMappingsRequest)
val mappingSource = getMappingsResponse?.mappings()?.get(leaderIndex)?.get(type)?.source()?.string()
val mappingSource = getMappingsResponse?.mappings()?.get(leaderIndex)?.source()?.string()
if (null == mappingSource) {
log.error("Mapping response: $getMappingsResponse")
throw MappingNotAvailableException("Mapping for the index $leaderIndex is not available")
Expand All @@ -186,7 +186,7 @@ class TransportReplayChangesAction @Inject constructor(settings: Settings, trans
// PutMappingRequest#setConcreteIndex has a bug where it throws an NPE.This is fixed upstream in
// https://github.com/elastic/elasticsearch/pull/58419 and we should update to that when it is released.
val putMappingRequest = PutMappingRequest().indices(followerIndex).indicesOptions(options)
.type(type).source(mappingSource, XContentType.JSON)
.source(mappingSource, XContentType.JSON)
//TODO: call .masterNodeTimeout() with the setting indices.mapping.dynamic_timeout
val updateMappingRequest = UpdateMetadataRequest(followerIndex, UpdateMetadataRequest.Type.MAPPING, putMappingRequest)
client.suspendExecute(UpdateMetadataAction.INSTANCE, updateMappingRequest, injectSecurityContext = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,9 @@ class TransportUpdateMetadataAction @Inject constructor(
listener: ActionListener<AcknowledgedResponse>, metadataMappingService: MetadataMappingService
) {
val mappingRequest = request.request as PutMappingRequest
val updateRequest = PutMappingClusterStateUpdateRequest()
val updateRequest = PutMappingClusterStateUpdateRequest(mappingRequest.source())
.ackTimeout(mappingRequest.timeout()).masterNodeTimeout(mappingRequest.masterNodeTimeout())
.indices(concreteIndices).type(mappingRequest.type())
.source(mappingRequest.source())
.indices(concreteIndices)

metadataMappingService.putMapping(updateRequest,
object : ActionListener<ClusterStateUpdateResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic

val id = getId(addReq.replicationMetadata.metadataType, addReq.replicationMetadata.connectionName,
addReq.replicationMetadata.followerContext.resource)
val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX, MAPPING_TYPE, id)
val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX).setId(id)
.setSource(addReq.replicationMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
return client.suspending(indexReqBuilder::execute, defaultContext = true)("replication")
}
Expand All @@ -114,7 +114,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
}

if(REPLICATION_STORE_MAPPING_VERSION > currentSchemaVersion) {
val putMappingReq = PutMappingRequest(REPLICATION_CONFIG_SYSTEM_INDEX).type(MAPPING_TYPE)
val putMappingReq = PutMappingRequest(REPLICATION_CONFIG_SYSTEM_INDEX)
.source(REPLICATION_CONFIG_SYSTEM_INDEX_MAPPING, XContentType.JSON)
val putMappingRes = client.suspending(client.admin().indices()::putMapping, defaultContext = true)(putMappingReq)
if(!putMappingRes.isAcknowledged) {
Expand Down Expand Up @@ -227,7 +227,7 @@ class ReplicationMetadataStore constructor(val client: Client, val clusterServic
checkAndWaitForStoreHealth()
checkAndUpdateMapping()

val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX, MAPPING_TYPE, id)
val indexReqBuilder = client.prepareIndex(REPLICATION_CONFIG_SYSTEM_INDEX).setId(id)
.setSource(updateMetadataReq.replicationMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setIfSeqNo(updateMetadataReq.ifSeqno)
.setIfPrimaryTerm(updateMetadataReq.ifPrimaryTerm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.opensearch.common.inject.Inject
import org.opensearch.common.inject.Singleton
import org.opensearch.common.lucene.store.InputStreamIndexInput
import org.opensearch.core.internal.io.IOUtils
import org.opensearch.index.engine.Engine
import org.opensearch.index.seqno.RetentionLeaseActions
import org.opensearch.index.store.Store
import org.opensearch.indices.IndicesService
Expand Down Expand Up @@ -95,7 +94,7 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS
* lucene index. With the retention lock set - safe commit should have all the history
* upto the current retention leases.
*/
val retentionLock = leaderIndexShard.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)
val retentionLock = leaderIndexShard.acquireHistoryRetentionLock()
closableResources.add(retentionLock)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class RemoteClusterTranslogService : AbstractLifecycleComponent(){

public fun getHistoryOfOperations(indexShard: IndexShard, startSeqNo: Long, toSeqNo: Long): List<Translog.Operation> {
log.trace("Fetching translog snapshot for $indexShard - from $startSeqNo to $toSeqNo")
val snapshot = indexShard.getHistoryOperations(SOURCE_NAME, Engine.HistorySource.TRANSLOG, startSeqNo, toSeqNo)
// TODO: Revisit the method after closing the issue: https://github.com/opensearch-project/OpenSearch/issues/2482
val snapshot = indexShard.getHistoryOperations(SOURCE_NAME, startSeqNo, toSeqNo)

// Total ops to be fetched (both toSeqNo and startSeqNo are inclusive)
val opsSize = toSeqNo - startSeqNo + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.coroutines.ObsoleteCoroutinesApi
import org.apache.logging.log4j.Logger
import org.opensearch.OpenSearchException
import org.opensearch.action.ActionListener
Expand Down Expand Up @@ -178,6 +179,7 @@ abstract class CrossClusterReplicationTask(id: Long, type: String, action: Strin
client.suspending(::updatePersistentTaskState)(state)
}

@ObsoleteCoroutinesApi
protected abstract suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?)

protected open suspend fun cleanup() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,25 +992,6 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}
}

fun `test that replication cannot be started when soft delete is disabled`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

createConnectionBetweenClusters(FOLLOWER, LEADER)

val settings: Settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.key, false)
.build()

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName)
.settings(settings), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()

assertThatThrownBy {
followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName))
}.isInstanceOf(ResponseException::class.java).hasMessageContaining("Cannot Replicate an index where the setting index.soft_deletes.enabled is disabled")
}

fun `test leader stats`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ open class NoOpClient(testName :String) : NoOpNodeClient(testName) {

var bytesReference = replicationMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)
var by = BytesReference.bytes(bytesReference)
var result = GetResult(ReplicationMetadataStore.REPLICATION_CONFIG_SYSTEM_INDEX, "_doc", IndexReplicationTaskTests.followerIndex, 1, 1, 1, true, by, null, null)
var result = GetResult(ReplicationMetadataStore.REPLICATION_CONFIG_SYSTEM_INDEX, IndexReplicationTaskTests.followerIndex, 1, 1, 1, true, by, null, null)
var getResponse = GetResponse(result)
listener.onResponse(getResponse as Response)
} else if (action == ClusterHealthAction.INSTANCE) {
Expand Down

0 comments on commit 20bdec8

Please sign in to comment.