Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
loustler committed Sep 21, 2024
1 parent b738db8 commit bffaed5
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<artifactId>connector-hive-e2e</artifactId>
<name>SeaTunnel : E2E : Connector V2 : Hive</name>

<properties>
<hive.version>3.1.3</hive.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
Expand All @@ -45,6 +49,12 @@
<classifier>optional</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package org.apache.seatunnel.e2e.connector.hive;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
import java.util.Properties;

public class HiveContainer extends GenericContainer<HiveContainer> {
public static final String IMAGE = "apache/hive";
public static final String DEFAULT_TAG = "3.1.3";
Expand All @@ -15,21 +24,15 @@ public class HiveContainer extends GenericContainer<HiveContainer> {

public static final int HIVE_SERVER_PORT = 10000;

public static final int HIVE_SERVER_WEBUI_PORT = 10002;

public static final int HMS_PORT = 9083;

public static final String HIVE_CUSTOM_CONF_DIR_ENV = "HIVE_CUSTOM_CONF_DIR";

private static final String SERVICE_NAME_ENV = "SERVICE_NAME";

public HiveContainer() {
this(Role.HIVE_SERVER_WITH_EMBEDDING_HMS);
}
private static final String DRIVER_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver";

public HiveContainer(Role role) {
super(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG));
this.addExposedPorts(HMS_PORT);
this.addExposedPorts(role.exposePort);
this.addEnv(SERVICE_NAME_ENV, role.serviceName);
this.setWaitStrategy(role.waitStrategy);
this.withLogConsumer(
Expand All @@ -38,27 +41,62 @@ public HiveContainer(Role role) {
DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG).toString())));
}

public HiveContainer withCustomHiveConfigPath(String location) {
return withEnv(HIVE_CUSTOM_CONF_DIR_ENV, location);
public static HiveContainer hmsStandalone() {
return new HiveContainer(Role.HMS_STANDALONE);
}

public static HiveContainer hiveServer() {
return new HiveContainer(Role.HIVE_SERVER_WITH_EMBEDDING_HMS);
}

public String getMetastoreUri() {
return String.format("thrift://%s:%s", getHost(), getMappedPort(HMS_PORT));
}

public String getHiveJdbcUri() {
return String.format(
"jdbc:hive2://%s:%s/default", getHost(), getMappedPort(HIVE_SERVER_PORT));
}

public HiveMetaStoreClient createMetaStoreClient() throws MetaException {
HiveConf conf = new HiveConf();
conf.set("hive.metastore.uris", getMetastoreUri());

return new HiveMetaStoreClient(conf);
}

public Connection getConnection()
throws ClassNotFoundException, InstantiationException, IllegalAccessException,
SQLException {
Driver driver = loadHiveJdbcDriver();

return driver.connect(getHiveJdbcUri(), getJdbcConnectionConfig());
}

public Driver loadHiveJdbcDriver()
throws ClassNotFoundException, InstantiationException, IllegalAccessException {
return (Driver) Class.forName(DRIVER_CLASS_NAME).newInstance();
}

public Properties getJdbcConnectionConfig() {
Properties props = new Properties();

return props;
}

public enum Role {
HIVE_SERVER_WITH_EMBEDDING_HMS(
"hiveserver2",
new int[] {HIVE_SERVER_PORT, HIVE_SERVER_WEBUI_PORT},
Wait.forLogMessage(".*Starting HiveServer2.*", 1)),
"hiveserver2", HIVE_SERVER_PORT, Wait.forLogMessage(".*Starting HiveServer2.*", 1)),
HMS_STANDALONE(
"metastore",
new int[] {HMS_PORT},
Wait.forLogMessage(".*Starting Hive Metastore Server.*", 1));
"metastore", HMS_PORT, Wait.forLogMessage(".*Starting Hive Metastore Server.*", 1));

private final String serviceName;
private final int[] ports;
private final int exposePort;
private final WaitStrategy waitStrategy;

Role(String serviceName, int[] ports, WaitStrategy waitStrategy) {
Role(String serviceName, int exposePort, WaitStrategy waitStrategy) {
this.serviceName = serviceName;
this.ports = ports;
this.exposePort = exposePort;
this.waitStrategy = waitStrategy;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.e2e.connector.hive;

import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
Expand All @@ -36,14 +37,31 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.given;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK})
@Slf4j
public class HiveIT extends TestSuiteBase implements TestResource {
public static final String HOST = "hive-e2e";
private static final String CREATE_SQL =
"CREATE TABLE test_hive_sink_on_hdfs"
+ "("
+ " pk_id BIGINT,"
+ " name STRING,"
+ " score INT"
+ ")";

private static final String HMS_HOST = "hivee2e";
private static final String HIVE_SERVER_HOST = "hiveserver2e2e";

private String hiveExeUrl() {
return "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar";
Expand Down Expand Up @@ -73,15 +91,16 @@ private String hadoopCosUrl() {
return "https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";
}

public static final HiveContainer HIVE_CONTAINER =
new HiveContainer().withNetwork(NETWORK).withNetworkAliases(HOST);
private HiveContainer hiveServerContainer;
private HiveContainer hmsContainer;
private Connection hiveConnection;

@TestContainerExtension
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "chmod -R 777 /etc/hosts");
// To avoid get a canonical host from a docker DNS server
container.execInContainer("sh", "-c", "echo `getent hosts hive-e2e` >> /etc/hosts");
container.execInContainer("sh", "-c", "echo `getent hosts hivee2e` >> /etc/hosts");
// The jar of hive-exec
Container.ExecResult downloadHiveExeCommands =
container.execInContainer(
Expand Down Expand Up @@ -130,15 +149,63 @@ private String hadoopCosUrl() {
@BeforeAll
@Override
public void startUp() throws Exception {
Startables.deepStart(HIVE_CONTAINER).join();
HIVE_CONTAINER.setPortBindings(Collections.singletonList("9083:9083"));
log.info("Hive Container is started");
hmsContainer =
HiveContainer.hmsStandalone().withNetwork(NETWORK).withNetworkAliases(HMS_HOST);
hmsContainer.setPortBindings(Collections.singletonList("9083:9083"));

Startables.deepStart(Stream.of(hmsContainer)).join();
log.info("HMS just started");

hiveServerContainer =
HiveContainer.hiveServer()
.withNetwork(NETWORK)
.withNetworkAliases(HIVE_SERVER_HOST)
.withEnv(
"SERVICE_OPTS",
"-Dhive.metastore.uris=thrift://hivee2e:9083")
.withEnv("IS_RESUME", "true")
.dependsOn(hmsContainer);
hiveServerContainer.setPortBindings(Collections.singletonList("10000:10000"));

Startables.deepStart(Stream.of(hiveServerContainer)).join();
log.info("HiveServer2 just started");

given().ignoreExceptions()
.await()
.atMost(360, TimeUnit.SECONDS)
.pollDelay(Duration.ofSeconds(10L))
.untilAsserted(this::initializeConnection);
prepareTable();
}

@AfterAll
@Override
public void tearDown() throws Exception {
HIVE_CONTAINER.close();
if (hmsContainer != null) {
hmsContainer.close();
}
if (hiveServerContainer != null) {
hiveServerContainer.close();
}
}

private void initializeConnection()
throws ClassNotFoundException, InstantiationException, IllegalAccessException,
SQLException {
this.hiveConnection = this.hiveServerContainer.getConnection();
}

private void prepareTable() throws Exception {
log.info(
String.format(
"Databases are %s",
this.hmsContainer.createMetaStoreClient().getAllDatabases()));
try (Statement statement = this.hiveConnection.createStatement()) {
statement.execute(CREATE_SQL);
} catch (Exception exception) {
log.error(ExceptionUtils.getMessage(exception));
throw exception;
}
}

private void executeJob(TestContainer container, String job1, String job2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ source {
sink {
Hive {
table_name = "default.test_hive_sink_on_hdfs"
metastore_uri = "thrift://hive-e2e:9083"
metastore_uri = "thrift://hivee2e:9083"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env {
source {
Hive {
table_name = "default.test_hive_sink_on_hdfs"
metastore_uri = "thrift://hive-e2e:9083"
metastore_uri = "thrift://hivee2e:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
result_table_name = hive_source
}
Expand Down

0 comments on commit bffaed5

Please sign in to comment.