From e6eac9bfe5ac462ce459f4ea5163bc46ded9004b Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 13 Nov 2023 10:34:35 -0800 Subject: [PATCH] Revert "add InteractiveSession and SessionManager (#2290) (#2293) (#2315)" This reverts commit 6ac197bb59e7f8b1e4e6698e20951c85368fc0cb. --- spark/build.gradle | 39 +--- .../session/CreateSessionRequest.java | 15 -- .../execution/session/InteractiveSession.java | 61 ----- .../sql/spark/execution/session/Session.java | 19 -- .../spark/execution/session/SessionId.java | 23 -- .../execution/session/SessionManager.java | 50 ---- .../spark/execution/session/SessionModel.java | 143 ------------ .../spark/execution/session/SessionState.java | 36 --- .../spark/execution/session/SessionType.java | 33 --- .../statestore/SessionStateStore.java | 87 ------- .../session/InteractiveSessionTest.java | 213 ------------------ .../execution/session/SessionManagerTest.java | 38 ---- .../execution/session/SessionStateTest.java | 20 -- .../execution/session/SessionTypeTest.java | 20 -- .../statestore/SessionStateStoreTest.java | 42 ---- 15 files changed, 5 insertions(+), 834 deletions(-) delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java diff --git a/spark/build.gradle b/spark/build.gradle index c2c925ecaf..c06b5b6ecf 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -52,38 +52,15 @@ dependencies { api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545' implementation group: 'commons-io', name: 'commons-io', version: '2.8.0' - testImplementation(platform("org.junit:junit-bom:5.6.2")) - - testImplementation('org.junit.jupiter:junit-jupiter') + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0' - - testCompileOnly('junit:junit:4.13.1') { - exclude group: 'org.hamcrest', module: 'hamcrest-core' - } - testRuntimeOnly("org.junit.vintage:junit-vintage-engine") { - exclude group: 'org.hamcrest', module: 'hamcrest-core' - } - testRuntimeOnly("org.junit.platform:junit-platform-launcher") { - because 'allows tests to run from IDEs that bundle older version of launcher' - } - testImplementation("org.opensearch.test:framework:${opensearch_version}") + testImplementation 'junit:junit:4.13.1' + testImplementation "org.opensearch.test:framework:${opensearch_version}" } test { - useJUnitPlatform { - includeEngines("junit-jupiter") - } - testLogging { - events "failed" - exceptionFormat "full" - } -} -task junit4(type: Test) { - useJUnitPlatform { - includeEngines("junit-vintage") - } - systemProperty 'tests.security.manager', 'false' + useJUnitPlatform() testLogging { events "failed" exceptionFormat "full" @@ -91,8 +68,6 @@ task junit4(type: Test) { } jacocoTestReport { - dependsOn test, junit4 - executionData test, junit4 reports { html.enabled true xml.enabled true @@ -103,10 +78,9 @@ jacocoTestReport { })) } } +test.finalizedBy(project.tasks.jacocoTestReport) jacocoTestCoverageVerification { - dependsOn test, junit4 - executionData test, junit4 violationRules { rule { element = 'CLASS' @@ -118,9 +92,6 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.asyncquery.exceptions.*', 'org.opensearch.sql.spark.dispatcher.model.*', 'org.opensearch.sql.spark.flint.FlintIndexType', - // ignore because XContext IOException - 'org.opensearch.sql.spark.execution.statestore.SessionStateStore', - 'org.opensearch.sql.spark.execution.session.SessionModel' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java deleted file mode 100644 index 17e3346248..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import lombok.Data; -import org.opensearch.sql.spark.client.StartJobRequest; - -@Data -public class CreateSessionRequest { - private final StartJobRequest startJobRequest; - private final String datasourceName; -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java deleted file mode 100644 index 620e46b9be..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession; - -import java.util.Optional; -import lombok.Builder; -import lombok.Getter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.index.engine.VersionConflictEngineException; -import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.execution.statestore.SessionStateStore; - -/** - * Interactive session. - * - *

ENTRY_STATE: not_started - */ -@Getter -@Builder -public class InteractiveSession implements Session { - private static final Logger LOG = LogManager.getLogger(); - - private final SessionId sessionId; - private final SessionStateStore sessionStateStore; - private final EMRServerlessClient serverlessClient; - - private SessionModel sessionModel; - - @Override - public void open(CreateSessionRequest createSessionRequest) { - try { - String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest()); - String applicationId = createSessionRequest.getStartJobRequest().getApplicationId(); - - sessionModel = - initInteractiveSession( - applicationId, jobID, sessionId, createSessionRequest.getDatasourceName()); - sessionStateStore.create(sessionModel); - } catch (VersionConflictEngineException e) { - String errorMsg = "session already exist. " + sessionId; - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - } - - @Override - public void close() { - Optional model = sessionStateStore.get(sessionModel.getSessionId()); - if (model.isEmpty()) { - throw new IllegalStateException("session not exist. " + sessionModel.getSessionId()); - } else { - serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId()); - } - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java deleted file mode 100644 index ec9775e60a..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -/** Session define the statement execution context. Each session is binding to one Spark Job. */ -public interface Session { - /** open session. */ - void open(CreateSessionRequest createSessionRequest); - - /** close session. */ - void close(); - - SessionModel getSessionModel(); - - SessionId getSessionId(); -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java deleted file mode 100644 index a2847cde18..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import lombok.Data; -import org.apache.commons.lang3.RandomStringUtils; - -@Data -public class SessionId { - private final String sessionId; - - public static SessionId newSessionId() { - return new SessionId(RandomStringUtils.random(10, true, true)); - } - - @Override - public String toString() { - return "sessionId=" + sessionId; - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java deleted file mode 100644 index 3d0916bac8..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; - -import java.util.Optional; -import lombok.RequiredArgsConstructor; -import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.execution.statestore.SessionStateStore; - -/** - * Singleton Class - * - *

todo. add Session cache and Session sweeper. - */ -@RequiredArgsConstructor -public class SessionManager { - private final SessionStateStore stateStore; - private final EMRServerlessClient emrServerlessClient; - - public Session createSession(CreateSessionRequest request) { - InteractiveSession session = - InteractiveSession.builder() - .sessionId(newSessionId()) - .sessionStateStore(stateStore) - .serverlessClient(emrServerlessClient) - .build(); - session.open(request); - return session; - } - - public Optional getSession(SessionId sid) { - Optional model = stateStore.get(sid); - if (model.isPresent()) { - InteractiveSession session = - InteractiveSession.builder() - .sessionId(sid) - .sessionStateStore(stateStore) - .serverlessClient(emrServerlessClient) - .sessionModel(model.get()) - .build(); - return Optional.ofNullable(session); - } - return Optional.empty(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java deleted file mode 100644 index 656f0ec8ce..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; -import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE; - -import java.io.IOException; -import lombok.Builder; -import lombok.Data; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; -import org.opensearch.index.seqno.SequenceNumbers; - -/** Session data in flint.ql.sessions index. */ -@Data -@Builder -public class SessionModel implements ToXContentObject { - public static final String VERSION = "version"; - public static final String TYPE = "type"; - public static final String SESSION_TYPE = "sessionType"; - public static final String SESSION_ID = "sessionId"; - public static final String SESSION_STATE = "state"; - public static final String DATASOURCE_NAME = "dataSourceName"; - public static final String LAST_UPDATE_TIME = "lastUpdateTime"; - public static final String APPLICATION_ID = "applicationId"; - public static final String JOB_ID = "jobId"; - public static final String ERROR = "error"; - public static final String UNKNOWN = "unknown"; - public static final String SESSION_DOC_TYPE = "session"; - - private final String version; - private final SessionType sessionType; - private final SessionId sessionId; - private final SessionState sessionState; - private final String applicationId; - private final String jobId; - private final String datasourceName; - private final String error; - private final long lastUpdateTime; - - private final long seqNo; - private final long primaryTerm; - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, version) - .field(TYPE, SESSION_DOC_TYPE) - .field(SESSION_TYPE, sessionType.getSessionType()) - .field(SESSION_ID, sessionId.getSessionId()) - .field(SESSION_STATE, sessionState.getSessionState()) - .field(DATASOURCE_NAME, datasourceName) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } - - public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) { - return builder() - .version(copy.version) - .sessionType(copy.sessionType) - .sessionId(new SessionId(copy.sessionId.getSessionId())) - .sessionState(copy.sessionState) - .datasourceName(copy.datasourceName) - .seqNo(seqNo) - .primaryTerm(primaryTerm) - .build(); - } - - @SneakyThrows - public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { - SessionModelBuilder builder = new SessionModelBuilder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case VERSION: - builder.version(parser.text()); - break; - case SESSION_TYPE: - builder.sessionType(SessionType.fromString(parser.text())); - break; - case SESSION_ID: - builder.sessionId(new SessionId(parser.text())); - break; - case SESSION_STATE: - builder.sessionState(SessionState.fromString(parser.text())); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case ERROR: - builder.error(parser.text()); - break; - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LAST_UPDATE_TIME: - builder.lastUpdateTime(parser.longValue()); - break; - case TYPE: - // do nothing. - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - - public static SessionModel initInteractiveSession( - String applicationId, String jobId, SessionId sid, String datasourceName) { - return builder() - .version("1.0") - .sessionType(INTERACTIVE) - .sessionId(sid) - .sessionState(NOT_STARTED) - .datasourceName(datasourceName) - .applicationId(applicationId) - .jobId(jobId) - .error(UNKNOWN) - .lastUpdateTime(System.currentTimeMillis()) - .seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) - .primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) - .build(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java deleted file mode 100644 index 509d5105e9..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import java.util.Arrays; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.Getter; - -@Getter -public enum SessionState { - NOT_STARTED("not_started"), - RUNNING("running"), - DEAD("dead"), - FAIL("fail"); - - private final String sessionState; - - SessionState(String sessionState) { - this.sessionState = sessionState; - } - - private static Map STATES = - Arrays.stream(SessionState.values()) - .collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t)); - - public static SessionState fromString(String key) { - if (STATES.containsKey(key)) { - return STATES.get(key); - } - throw new IllegalArgumentException("Invalid session state: " + key); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java deleted file mode 100644 index dd179a1dc5..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import java.util.Arrays; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.Getter; - -@Getter -public enum SessionType { - INTERACTIVE("interactive"); - - private final String sessionType; - - SessionType(String sessionType) { - this.sessionType = sessionType; - } - - private static Map TYPES = - Arrays.stream(SessionType.values()) - .collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t)); - - public static SessionType fromString(String key) { - if (TYPES.containsKey(key)) { - return TYPES.get(key); - } - throw new IllegalArgumentException("Invalid session type: " + key); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java deleted file mode 100644 index 6ddce55360..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.statestore; - -import java.io.IOException; -import java.util.Locale; -import java.util.Optional; -import lombok.RequiredArgsConstructor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.get.GetRequest; -import org.opensearch.action.get.GetResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.support.WriteRequest; -import org.opensearch.client.Client; -import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.sql.spark.execution.session.SessionId; -import org.opensearch.sql.spark.execution.session.SessionModel; - -@RequiredArgsConstructor -public class SessionStateStore { - private static final Logger LOG = LogManager.getLogger(); - - private final String indexName; - private final Client client; - - public SessionModel create(SessionModel session) { - try { - IndexRequest indexRequest = - new IndexRequest(indexName) - .id(session.getSessionId().getSessionId()) - .source(session.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .setIfSeqNo(session.getSeqNo()) - .setIfPrimaryTerm(session.getPrimaryTerm()) - .create(true) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - IndexResponse indexResponse = client.index(indexRequest).actionGet(); - if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { - LOG.debug("Successfully created doc. id: {}", session.getSessionId()); - return SessionModel.of(session, indexResponse.getSeqNo(), indexResponse.getPrimaryTerm()); - } else { - throw new RuntimeException( - String.format( - Locale.ROOT, - "Failed create doc. id: %s, error: %s", - session.getSessionId(), - indexResponse.getResult().getLowercase())); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public Optional get(SessionId sid) { - try { - GetRequest getRequest = new GetRequest().index(indexName).id(sid.getSessionId()); - GetResponse getResponse = client.get(getRequest).actionGet(); - if (getResponse.isExists()) { - XContentParser parser = - XContentType.JSON - .xContent() - .createParser( - NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - getResponse.getSourceAsString()); - parser.nextToken(); - return Optional.of( - SessionModel.fromXContent( - parser, getResponse.getSeqNo(), getResponse.getPrimaryTerm())); - } else { - return Optional.empty(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java deleted file mode 100644 index 53dc211ded..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import static org.opensearch.sql.spark.execution.session.InteractiveSessionTest.TestSession.testSession; -import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; - -import com.amazonaws.services.emrserverless.model.CancelJobRunResult; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; -import java.util.HashMap; -import java.util.Optional; -import lombok.RequiredArgsConstructor; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.client.StartJobRequest; -import org.opensearch.sql.spark.execution.statestore.SessionStateStore; -import org.opensearch.test.OpenSearchSingleNodeTestCase; - -/** mock-maker-inline does not work with OpenSearchTestCase. */ -public class InteractiveSessionTest extends OpenSearchSingleNodeTestCase { - - private static final String indexName = "mockindex"; - - private TestEMRServerlessClient emrsClient; - private StartJobRequest startJobRequest; - private SessionStateStore stateStore; - - @Before - public void setup() { - emrsClient = new TestEMRServerlessClient(); - startJobRequest = new StartJobRequest("", "", "appId", "", "", new HashMap<>(), false, ""); - stateStore = new SessionStateStore(indexName, client()); - createIndex(indexName); - } - - @After - public void clean() { - client().admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet(); - } - - @Test - public void openCloseSession() { - InteractiveSession session = - InteractiveSession.builder() - .sessionId(SessionId.newSessionId()) - .sessionStateStore(stateStore) - .serverlessClient(emrsClient) - .build(); - - // open session - TestSession testSession = testSession(session, stateStore); - testSession - .open(new CreateSessionRequest(startJobRequest, "datasource")) - .assertSessionState(NOT_STARTED) - .assertAppId("appId") - .assertJobId("jobId"); - emrsClient.startJobRunCalled(1); - - // close session - testSession.close(); - emrsClient.cancelJobRunCalled(1); - } - - @Test - public void openSessionFailedConflict() { - SessionId sessionId = new SessionId("duplicate-session-id"); - InteractiveSession session = - InteractiveSession.builder() - .sessionId(sessionId) - .sessionStateStore(stateStore) - .serverlessClient(emrsClient) - .build(); - session.open(new CreateSessionRequest(startJobRequest, "datasource")); - - InteractiveSession duplicateSession = - InteractiveSession.builder() - .sessionId(sessionId) - .sessionStateStore(stateStore) - .serverlessClient(emrsClient) - .build(); - IllegalStateException exception = - assertThrows( - IllegalStateException.class, - () -> duplicateSession.open(new CreateSessionRequest(startJobRequest, "datasource"))); - assertEquals("session already exist. sessionId=duplicate-session-id", exception.getMessage()); - } - - @Test - public void closeNotExistSession() { - SessionId sessionId = SessionId.newSessionId(); - InteractiveSession session = - InteractiveSession.builder() - .sessionId(sessionId) - .sessionStateStore(stateStore) - .serverlessClient(emrsClient) - .build(); - session.open(new CreateSessionRequest(startJobRequest, "datasource")); - - client().delete(new DeleteRequest(indexName, sessionId.getSessionId())); - - IllegalStateException exception = assertThrows(IllegalStateException.class, session::close); - assertEquals("session not exist. " + sessionId, exception.getMessage()); - emrsClient.cancelJobRunCalled(0); - } - - @Test - public void sessionManagerCreateSession() { - Session session = - new SessionManager(stateStore, emrsClient) - .createSession(new CreateSessionRequest(startJobRequest, "datasource")); - - TestSession testSession = testSession(session, stateStore); - testSession.assertSessionState(NOT_STARTED).assertAppId("appId").assertJobId("jobId"); - } - - @Test - public void sessionManagerGetSession() { - SessionManager sessionManager = new SessionManager(stateStore, emrsClient); - Session session = - sessionManager.createSession(new CreateSessionRequest(startJobRequest, "datasource")); - - Optional managerSession = sessionManager.getSession(session.getSessionId()); - assertTrue(managerSession.isPresent()); - assertEquals(session.getSessionId(), managerSession.get().getSessionId()); - } - - @Test - public void sessionManagerGetSessionNotExist() { - SessionManager sessionManager = new SessionManager(stateStore, emrsClient); - - Optional managerSession = sessionManager.getSession(new SessionId("no-exist")); - assertTrue(managerSession.isEmpty()); - } - - @RequiredArgsConstructor - static class TestSession { - private final Session session; - private final SessionStateStore stateStore; - - public static TestSession testSession(Session session, SessionStateStore stateStore) { - return new TestSession(session, stateStore); - } - - public TestSession assertSessionState(SessionState expected) { - assertEquals(expected, session.getSessionModel().getSessionState()); - - Optional sessionStoreState = - stateStore.get(session.getSessionModel().getSessionId()); - assertTrue(sessionStoreState.isPresent()); - assertEquals(expected, sessionStoreState.get().getSessionState()); - - return this; - } - - public TestSession assertAppId(String expected) { - assertEquals(expected, session.getSessionModel().getApplicationId()); - return this; - } - - public TestSession assertJobId(String expected) { - assertEquals(expected, session.getSessionModel().getJobId()); - return this; - } - - public TestSession open(CreateSessionRequest req) { - session.open(req); - return this; - } - - public TestSession close() { - session.close(); - return this; - } - } - - static class TestEMRServerlessClient implements EMRServerlessClient { - - private int startJobRunCalled = 0; - private int cancelJobRunCalled = 0; - - @Override - public String startJobRun(StartJobRequest startJobRequest) { - startJobRunCalled++; - return "jobId"; - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - return null; - } - - @Override - public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { - cancelJobRunCalled++; - return null; - } - - public void startJobRunCalled(int expectedTimes) { - assertEquals(expectedTimes, startJobRunCalled); - } - - public void cancelJobRunCalled(int expectedTimes) { - assertEquals(expectedTimes, cancelJobRunCalled); - } - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java deleted file mode 100644 index d35105f787..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.After; -import org.junit.Before; -import org.mockito.MockMakers; -import org.mockito.MockSettings; -import org.mockito.Mockito; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.sql.spark.execution.statestore.SessionStateStore; -import org.opensearch.test.OpenSearchSingleNodeTestCase; - -class SessionManagerTest extends OpenSearchSingleNodeTestCase { - private static final String indexName = "mockindex"; - - // mock-maker-inline does not work with OpenSearchTestCase. make sure use mockSettings when mock. - private static final MockSettings mockSettings = - Mockito.withSettings().mockMaker(MockMakers.SUBCLASS); - - private SessionStateStore stateStore; - - @Before - public void setup() { - stateStore = new SessionStateStore(indexName, client()); - createIndex(indexName); - } - - @After - public void clean() { - client().admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet(); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java deleted file mode 100644 index a987c80d59..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; - -import org.junit.jupiter.api.Test; - -class SessionStateTest { - @Test - public void invalidSessionType() { - IllegalArgumentException exception = - assertThrows(IllegalArgumentException.class, () -> SessionState.fromString("invalid")); - assertEquals("Invalid session state: invalid", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java deleted file mode 100644 index a2ab43e709..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.session; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; - -import org.junit.jupiter.api.Test; - -class SessionTypeTest { - @Test - public void invalidSessionType() { - IllegalArgumentException exception = - assertThrows(IllegalArgumentException.class, () -> SessionType.fromString("invalid")); - assertEquals("Invalid session type: invalid", exception.getMessage()); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java deleted file mode 100644 index 9c779555d7..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.execution.statestore; - -import static org.junit.Assert.assertThrows; -import static org.mockito.Answers.RETURNS_DEEP_STUBS; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.when; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.client.Client; -import org.opensearch.sql.spark.execution.session.SessionId; -import org.opensearch.sql.spark.execution.session.SessionModel; - -@ExtendWith(MockitoExtension.class) -class SessionStateStoreTest { - @Mock(answer = RETURNS_DEEP_STUBS) - private Client client; - - @Mock private IndexResponse indexResponse; - - @Test - public void createWithException() { - when(client.index(any()).actionGet()).thenReturn(indexResponse); - doReturn(DocWriteResponse.Result.NOT_FOUND).when(indexResponse).getResult(); - SessionModel sessionModel = - SessionModel.initInteractiveSession( - "appId", "jobId", SessionId.newSessionId(), "datasource"); - SessionStateStore sessionStateStore = new SessionStateStore("indexName", client); - - assertThrows(RuntimeException.class, () -> sessionStateStore.create(sessionModel)); - } -}