Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds empty indexer, mapper, metadata, runner, search classes and crea… #324

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.RollupIndexer
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.RollupMapperService
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.RollupMetadataService
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.RollupRunner
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.RollupSearchService
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.delete.DeleteRollupAction
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.delete.TransportDeleteRollupAction
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.get.GetRollupAction
Expand Down Expand Up @@ -112,6 +117,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var clusterService: ClusterService
lateinit var indexNameExpressionResolver: IndexNameExpressionResolver

companion object {
const val PLUGIN_NAME = "opendistro-im"
Expand All @@ -129,7 +135,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act

override fun getJobType(): String = INDEX_MANAGEMENT_JOB_TYPE

override fun getJobRunner(): ScheduledJobRunner = ManagedIndexRunner
override fun getJobRunner(): ScheduledJobRunner = IndexManagementRunner

override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
Expand Down Expand Up @@ -211,7 +217,19 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
.registerScriptService(scriptService)
.registerSettings(settings)
.registerConsumers() // registerConsumers must happen after registerSettings/clusterService

val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerMapperService(RollupMapperService(client, clusterService, indexNameExpressionResolver))
.registerIndexer(RollupIndexer(settings, clusterService, client))
.registerSearcher(RollupSearchService(settings, clusterService, client))
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
.registerConsumers()
this.indexNameExpressionResolver = indexNameExpressionResolver
indexManagementIndices = IndexManagementIndices(client.admin().indices(), clusterService)
val indexStateManagementHistory =
IndexStateManagementHistory(
Expand All @@ -225,7 +243,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
val managedIndexCoordinator = ManagedIndexCoordinator(environment.settings(),
client, clusterService, threadPool, indexManagementIndices)

return listOf(managedIndexRunner, indexManagementIndices, managedIndexCoordinator, indexStateManagementHistory)
return listOf(managedIndexRunner, rollupRunner, indexManagementIndices, managedIndexCoordinator, indexStateManagementHistory)
}

override fun getSettings(): List<Setting<*>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.RollupRunner
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.Rollup
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner
import org.apache.logging.log4j.LogManager

object IndexManagementRunner : ScheduledJobRunner {

private val logger = LogManager.getLogger(javaClass)

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
when (job) {
is ManagedIndexConfig -> ManagedIndexRunner.runJob(job, context)
is Rollup -> RollupRunner.runJob(job, context)
else -> {
val errorMessage = "Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}"
logger.error(errorMessage)
throw IllegalArgumentException(errorMessage)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup

import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings

class RollupIndexer(
settings: Settings,
clusterService: ClusterService,
private val client: Client
) {

private val logger = LogManager.getLogger(javaClass)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup

import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.service.ClusterService

class RollupMapperService(
val client: Client,
val clusterService: ClusterService,
private val indexNameExpressionResolver: IndexNameExpressionResolver
) {

private val logger = LogManager.getLogger(javaClass)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup

import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.Client
import org.elasticsearch.common.xcontent.NamedXContentRegistry

@Suppress("TooManyFunctions")
class RollupMetadataService(val client: Client, val xContentRegistry: NamedXContentRegistry) {

private val logger = LogManager.getLogger(javaClass)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup

import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.Rollup
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.script.ScriptService
import org.elasticsearch.threadpool.ThreadPool

@Suppress("TooManyFunctions")
object RollupRunner : ScheduledJobRunner,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("RollupRunner")) {

private val logger = LogManager.getLogger(javaClass)

private lateinit var clusterService: ClusterService
private lateinit var client: Client
private lateinit var xContentRegistry: NamedXContentRegistry
private lateinit var scriptService: ScriptService
private lateinit var settings: Settings
private lateinit var threadPool: ThreadPool
private lateinit var rollupMapperService: RollupMapperService
private lateinit var rollupIndexer: RollupIndexer
private lateinit var rollupSearchService: RollupSearchService
private lateinit var rollupMetadataService: RollupMetadataService

fun registerClusterService(clusterService: ClusterService): RollupRunner {
this.clusterService = clusterService
return this
}

fun registerClient(client: Client): RollupRunner {
this.client = client
return this
}

fun registerNamedXContentRegistry(xContentRegistry: NamedXContentRegistry): RollupRunner {
this.xContentRegistry = xContentRegistry
return this
}

fun registerScriptService(scriptService: ScriptService): RollupRunner {
this.scriptService = scriptService
return this
}

fun registerSettings(settings: Settings): RollupRunner {
this.settings = settings
return this
}

fun registerThreadPool(threadPool: ThreadPool): RollupRunner {
this.threadPool = threadPool
return this
}

fun registerMapperService(rollupMapperService: RollupMapperService): RollupRunner {
this.rollupMapperService = rollupMapperService
return this
}

fun registerIndexer(rollupIndexer: RollupIndexer): RollupRunner {
this.rollupIndexer = rollupIndexer
return this
}

fun registerSearcher(rollupSearchService: RollupSearchService): RollupRunner {
this.rollupSearchService = rollupSearchService
return this
}

fun registerMetadataServices(
rollupMetadataService: RollupMetadataService
): RollupRunner {
this.rollupMetadataService = rollupMetadataService
return this
}

fun registerConsumers(): RollupRunner {
return this
}

@Suppress("ComplexMethod")
override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is Rollup) {
throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup

import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings

class RollupSearchService(
settings: Settings,
clusterService: ClusterService,
val client: Client
) {

private val logger = LogManager.getLogger(javaClass)
}