Skip to content

Commit

Permalink
Refactoring for tags usage in test files and also added explicit denl…
Browse files Browse the repository at this point in the history
…y list setting. (#2383)

Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vamsi-amazon committed Oct 26, 2023
1 parent ff38081 commit 88b1f03
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class SparkQueryDispatcher {
private static final Logger LOG = LogManager.getLogger();
public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String CLUSTER_NAME_TAG_KEY = "domain_ident";
public static final String JOB_TYPE_TAG_KEY = "type";

private EMRServerlessClient emrServerlessClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID;
Expand All @@ -28,12 +27,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import lombok.Getter;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -105,9 +99,18 @@ public List<Setting<?>> getSettings() {
@Before
public void setup() {
clusterService = clusterService();
client = (NodeClient) cluster().client();
client
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList())
.build())
.get();
clusterSettings = clusterService.getClusterSettings();
pluginSettings = new OpenSearchSettings(clusterSettings);
client = (NodeClient) cluster().client();
dataSourceService = createDataSourceService();
dataSourceService.createDataSource(
new DataSourceMetadata(
Expand Down Expand Up @@ -144,6 +147,13 @@ public void clean() {
.setTransientSettings(
Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build())
.get();
client
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().putNull(DATASOURCE_URI_HOSTS_DENY_LIST.getKey()).build())
.get();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.*;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
Expand Down Expand Up @@ -120,9 +121,9 @@ void setUp() {
@Test
void testDispatchSelectQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -175,9 +176,9 @@ void testDispatchSelectQuery() {
@Test
void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -231,9 +232,9 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
@Test
void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -346,10 +347,10 @@ void testDispatchSelectQueryFailedCreateSession() {
@Test
void testDispatchIndexQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.STREAMING.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
String query =
"CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH"
+ " (auto_refresh = true)";
Expand Down Expand Up @@ -405,9 +406,9 @@ void testDispatchIndexQuery() {
@Test
void testDispatchWithPPLQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "source = my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -460,9 +461,9 @@ void testDispatchWithPPLQuery() {
@Test
void testDispatchQueryWithoutATableAndDataSourceName() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "show tables";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -515,10 +516,10 @@ void testDispatchQueryWithoutATableAndDataSourceName() {
@Test
void testDispatchIndexQueryWithoutADatasourceName() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.STREAMING.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
String query =
"CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH"
+ " (auto_refresh = true)";
Expand Down Expand Up @@ -574,10 +575,10 @@ void testDispatchIndexQueryWithoutADatasourceName() {
@Test
void testDispatchMaterializedViewQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("index", "flint_mv_1");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.STREAMING.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(INDEX_TAG_KEY, "flint_mv_1");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
String query =
"CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH"
+ " (auto_refresh = true)";
Expand Down Expand Up @@ -633,8 +634,8 @@ void testDispatchMaterializedViewQuery() {
@Test
void testDispatchShowMVQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
String query = "SHOW MATERIALIZED VIEW IN mys3.default";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -687,8 +688,8 @@ void testDispatchShowMVQuery() {
@Test
void testRefreshIndexQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
String query = "REFRESH SKIPPING INDEX ON my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -741,8 +742,8 @@ void testRefreshIndexQuery() {
@Test
void testDispatchDescribeIndexQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
String query = "DESCRIBE SKIPPING INDEX ON mys3.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down

0 comments on commit 88b1f03

Please sign in to comment.