Skip to content

Commit

Permalink
Improve HIVE E2E test
Browse files Browse the repository at this point in the history
  • Loading branch information
loustler committed Sep 7, 2024
1 parent d1c75f7 commit b738db8
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.apache.seatunnel.e2e.connector.hive;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

public class HiveContainer extends GenericContainer<HiveContainer> {
public static final String IMAGE = "apache/hive";
public static final String DEFAULT_TAG = "3.1.3";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse(IMAGE);

public static final int HIVE_SERVER_PORT = 10000;

public static final int HIVE_SERVER_WEBUI_PORT = 10002;

public static final int HMS_PORT = 9083;

public static final String HIVE_CUSTOM_CONF_DIR_ENV = "HIVE_CUSTOM_CONF_DIR";

private static final String SERVICE_NAME_ENV = "SERVICE_NAME";

public HiveContainer() {
this(Role.HIVE_SERVER_WITH_EMBEDDING_HMS);
}

public HiveContainer(Role role) {
super(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG));
this.addExposedPorts(HMS_PORT);
this.addEnv(SERVICE_NAME_ENV, role.serviceName);
this.setWaitStrategy(role.waitStrategy);
this.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(
DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG).toString())));
}

public HiveContainer withCustomHiveConfigPath(String location) {
return withEnv(HIVE_CUSTOM_CONF_DIR_ENV, location);
}

public enum Role {
HIVE_SERVER_WITH_EMBEDDING_HMS(
"hiveserver2",
new int[] {HIVE_SERVER_PORT, HIVE_SERVER_WEBUI_PORT},
Wait.forLogMessage(".*Starting HiveServer2.*", 1)),
HMS_STANDALONE(
"metastore",
new int[] {HMS_PORT},
Wait.forLogMessage(".*Starting Hive Metastore Server.*", 1));

private final String serviceName;
private final int[] ports;
private final WaitStrategy waitStrategy;

Role(String serviceName, int[] ports, WaitStrategy waitStrategy) {
this.serviceName = serviceName;
this.ports = ports;
this.waitStrategy = waitStrategy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,36 @@
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.utility.MountableFile;
import org.testcontainers.lifecycle.Startables;

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.util.Collections;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK})
@Disabled(
"[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}")
@Slf4j
public class HiveIT extends TestSuiteBase implements TestResource {
private static final String HADOOP_HIVE_CONF_PATH_LOCAL =
"/Users/dailai/software/hadoop-3.3.3/etc/hadoop";
private static final String HADOOP_HIVE_CONF_PATH_IN_CONTAINER = "/tmp/hadoop";
public static final String HOST = "hive-e2e";

private String hiveExeUrl() {
return "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar";
return "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar";
}

private String libFb303Url() {
return "https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar";
}

private String hadoopAwsUrl() {
return "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar";
return "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";
}

private String aliyunSdkOssUrl() {
Expand All @@ -71,50 +73,45 @@ private String hadoopCosUrl() {
return "https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";
}

public static final HiveContainer HIVE_CONTAINER =
new HiveContainer().withNetwork(NETWORK).withNetworkAliases(HOST);

@TestContainerExtension
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "chmod -R 777 /etc/hosts");
container.execInContainer("sh", "-c", "echo \"${IP01} hadoop01\" >> /etc/hosts");
container.execInContainer("sh", "-c", "echo \"${IP02} hadoop02\" >> /etc/hosts");
container.execInContainer("sh", "-c", "echo \"${IP03} hadoop03\" >> /etc/hosts");
container.execInContainer("sh", "-c", "echo \"${IP04} hadoop04\" >> /etc/hosts");
container.execInContainer("sh", "-c", "echo \"${IP05} hadoop05\" >> /etc/hosts");
container.execInContainer("sh", "-c", "echo \"${IP06} hadoop06\" >> /etc/hosts");
Assertions.assertTrue(
new File(HADOOP_HIVE_CONF_PATH_LOCAL).exists(),
HADOOP_HIVE_CONF_PATH_LOCAL + " must exist");
container.execInContainer(
"sh", "-c", "mkdir -p " + HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
container.execInContainer(
"sh", "-c", "chmod -R 777 " + HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
// Copy local hadoop conf and hive conf to the container
container.copyFileToContainer(
MountableFile.forHostPath(HADOOP_HIVE_CONF_PATH_LOCAL),
HADOOP_HIVE_CONF_PATH_IN_CONTAINER);

// To avoid get a canonical host from a docker DNS server
container.execInContainer("sh", "-c", "echo `getent hosts hive-e2e` >> /etc/hosts");
// The jar of hive-exec
Container.ExecResult extraCommands =
Container.ExecResult downloadHiveExeCommands =
container.execInContainer(
"sh",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Hive/lib && cd /tmp/seatunnel/plugins/Hive/lib && wget "
"mkdir -p /tmp/seatunnel/lib && cd /tmp/seatunnel/lib && wget "
+ hiveExeUrl());
Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
Assertions.assertEquals(
0,
downloadHiveExeCommands.getExitCode(),
downloadHiveExeCommands.getStderr());
Container.ExecResult downloadLibFb303Commands =
container.execInContainer(
"sh", "-c", "cd /tmp/seatunnel/lib && wget " + libFb303Url());
Assertions.assertEquals(
0,
downloadLibFb303Commands.getExitCode(),
downloadLibFb303Commands.getStderr());
// The jar of s3
Container.ExecResult downloadS3Commands =
container.execInContainer(
"sh",
"-c",
"cd /tmp/seatunnel/plugins/Hive/lib && wget " + hadoopAwsUrl());
"sh", "-c", "cd /tmp/seatunnel/lib && wget " + hadoopAwsUrl());
Assertions.assertEquals(
0, downloadS3Commands.getExitCode(), downloadS3Commands.getStderr());
// The jar of oss
Container.ExecResult downloadOssCommands =
container.execInContainer(
"sh",
"-c",
"cd /tmp/seatunnel/plugins/Hive/lib && wget "
"cd /tmp/seatunnel/lib && wget "
+ aliyunSdkOssUrl()
+ " && wget "
+ jdomUrl()
Expand All @@ -125,18 +122,24 @@ private String hadoopCosUrl() {
// The jar of cos
Container.ExecResult downloadCosCommands =
container.execInContainer(
"sh",
"-c",
"cd /tmp/seatunnel/plugins/Hive/lib && wget " + hadoopCosUrl());
"sh", "-c", "cd /tmp/seatunnel/lib && wget " + hadoopCosUrl());
Assertions.assertEquals(
0, downloadCosCommands.getExitCode(), downloadCosCommands.getStderr());
};

@BeforeAll
@Override
public void startUp() throws Exception {}
public void startUp() throws Exception {
Startables.deepStart(HIVE_CONTAINER).join();
HIVE_CONTAINER.setPortBindings(Collections.singletonList("9083:9083"));
log.info("Hive Container is started");
}

@AfterAll
@Override
public void tearDown() throws Exception {}
public void tearDown() throws Exception {
HIVE_CONTAINER.close();
}

private void executeJob(TestContainer container, String job1, String job2)
throws IOException, InterruptedException {
Expand All @@ -153,16 +156,22 @@ public void testFakeSinkHiveOnHDFS(TestContainer container) throws Exception {
}

@TestTemplate
@Disabled(
"[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}")
public void testFakeSinkHiveOnS3(TestContainer container) throws Exception {
executeJob(container, "/fake_to_hive_on_s3.conf", "/hive_on_s3_to_assert.conf");
}

@TestTemplate
@Disabled(
"[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}")
public void testFakeSinkHiveOnOSS(TestContainer container) throws Exception {
executeJob(container, "/fake_to_hive_on_oss.conf", "/hive_on_oss_to_assert.conf");
}

@TestTemplate
@Disabled(
"[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}")
public void testFakeSinkHiveOnCos(TestContainer container) throws Exception {
executeJob(container, "/fake_to_hive_on_cos.conf", "/hive_on_cos_to_assert.conf");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ source {

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_hdfs"
metastore_uri = "thrift://hadoop04:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
table_name = "default.test_hive_sink_on_hdfs"
metastore_uri = "thrift://hive-e2e:9083"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ env {

source {
Hive {
table_name = "test_hive.test_hive_sink_on_hdfs"
metastore_uri = "thrift://hadoop04:9083"
table_name = "default.test_hive_sink_on_hdfs"
metastore_uri = "thrift://hive-e2e:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
result_table_name = hive_source
}
Expand Down

0 comments on commit b738db8

Please sign in to comment.