Skip to content

Commit

Permalink
[HUDI-892] revert test log
Browse files Browse the repository at this point in the history
  • Loading branch information
lw309637554 committed Nov 2, 2020
1 parent b76a393 commit 3a331d1
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 30 deletions.
17 changes: 5 additions & 12 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ val hoodieIncQueryDF = spark.read.format("org.apache.hudi").
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginInstantTime).
load("/user/hive/warehouse/stock_ticks_cow");
hoodieIncQueryDF.registerTempTable("stock_ticks_cow_incr")
println("111111111111")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr where symbol = 'GOOG'").show(100, false);
println("222222222222")

spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, close from stock_ticks_cow_incr").
write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "2").
Expand All @@ -52,20 +51,17 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");
println("33333333333")

spark.sql("select count(*) from stock_ticks_derived_mor_ro").show(20, false)
println("4444444444444")
spark.sql("select count(*) from stock_ticks_derived_mor_rt").show(20, false)
println("555555555553333333")
spark.sql("select * from stock_ticks_derived_mor_rt").show(20, false)
println("55555555555222222")

val hoodieIncQueryBsDF = spark.read.format("org.apache.hudi").
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "00000000000001").
load("/user/hive/warehouse/stock_ticks_cow_bs");
hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs_incr where symbol = 'GOOG'").show(100, false);
println("6666666666666")

spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, close from stock_ticks_cow_bs_incr").
write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "2").
Expand All @@ -85,12 +81,9 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
println("77777777777777")

spark.sql("show tables").show(20, false)
println("88888888888888")
spark.sql("select count(*) from stock_ticks_derived_mor_bs_ro").show(20, false)
println("999999999999999")
spark.sql("select count(*) from stock_ticks_derived_mor_bs_rt").show(20, false)
println("1000000000000")

System.exit(0);
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public abstract class AbstractRealtimeRecordReader {
public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) {
this.split = split;
this.jobConf = job;
System.out.println("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
System.out.println("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
System.out.println("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try {
this.usesCustomPayload = usesCustomPayload();
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
Expand All @@ -85,10 +85,10 @@ private void init() throws IOException {
Schema schemaFromLogFile = LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = InputSplitUtils.getBaseFileSchema((FileSplit)split, jobConf);
System.out.println("Writer Schema From Parquet => " + writerSchema.getFields());
LOG.info("Writer Schema From Parquet => " + writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
System.out.println("Writer Schema From Log => " + writerSchema.toString(true));
LOG.info("Writer Schema From Log => " + writerSchema.toString(true));
}
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf job
// actual heavy lifting of reading the parquet files happen.
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
synchronized (jobConf) {
System.out.println(
LOG.info(
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
Expand Down Expand Up @@ -116,10 +116,8 @@ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSpli
ValidationUtils.checkArgument(split instanceof RealtimeSplit,
"HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
System.out.println("1111Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
addProjectionToJobConf(realtimeSplit, jobConf);
System.out.println("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
super.getRecordReader(split, jobConf, reporter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ private static RecordReader<NullWritable, ArrayWritable> constructRecordReader(R
JobConf jobConf, RecordReader<NullWritable, ArrayWritable> realReader) {
try {
if (canSkipMerging(jobConf)) {
System.out.println("Enabling un-merged reading of realtime records");
LOG.info("Enabling un-merged reading of realtime records");
return new RealtimeUnmergedRecordReader(split, jobConf, realReader);
}
System.out.println("Enabling merged reading of realtime records for split " + split);
LOG.info("Enabling merged reading of realtime records for split " + split);
return new RealtimeCompactedRecordReader(split, jobConf, realReader);
} catch (IOException ex) {
LOG.error("Got exception when constructing record reader", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,9 @@ public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
// Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}

System.out.println("fieldNameCsv = " + fieldNameCsv + ",fieldOrderCsv = "
+ fieldOrderCsv + ",partitioningFields = " + partitioningFields.toString());
String[] fieldOrdersWithDups = fieldOrderCsv.isEmpty() ? new String[0] : fieldOrderCsv.split(",");
Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
System.out.println("fieldOrdersWithDups = " + fieldOrdersWithDups.toString() + ",fieldOrdersSet = "
+ fieldOrdersSet.toString() + ",fieldOrders=" + fieldOrders.toString());
List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() : Arrays.stream(fieldNameCsv.split(","))
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
Expand All @@ -268,8 +263,6 @@ public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv
}
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]);
System.out.println("fieldNamesSet = " + fieldNamesSet.toString() + ",orderedFieldMap = "
+ orderedFieldMap.toString());
for (int ox = 0; ox < fieldOrders.length; ox++) {
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.integ;

import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Smoke tests to run as part of verification.
*/
public class ITTestHoodieSanity extends ITTestBase {

private static final String HDFS_BASE_URL = "hdfs://namenode";
private static final String HDFS_STREAMING_SOURCE = HDFS_BASE_URL + "/streaming/source/";
private static final String HDFS_STREAMING_CKPT = HDFS_BASE_URL + "/streaming/ckpt/";

enum PartitionType {
SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
}

@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}

@ParameterizedTest
@ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
/**
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with multiple partition-keys
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
* in hive console.
*/
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}

@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample non-partitioned COW Hoodie data-set and
* performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_non_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}

@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}

@ParameterizedTest
@ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
/**
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with multiple partition-keys
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
* in hive console.
*/
public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}

@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample non-partitioned MOR Hoodie data-set and
* performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception {
String hiveTableName = "docker_hoodie_non_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime();
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}

/**
* A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it.
* Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
* spark-shell test-case
*/
public void testRunHoodieJavaApp(String command, String hiveTableName, String tableType, PartitionType partitionType)
throws Exception {

String hdfsPath = "/" + hiveTableName;
String hdfsUrl = HDFS_BASE_URL + hdfsPath;

// Delete hdfs path if it exists
try {
executeCommandStringInDocker(ADHOC_1_CONTAINER, "hdfs dfs -rm -r " + hdfsUrl, true);
} catch (AssertionError ex) {
// Path not exists, pass
}

// Drop Table if it exists
try {
dropHiveTables(hiveTableName, tableType);
} catch (AssertionError ex) {
// In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
// Workaround to sleep for 5 secs and retry
Thread.sleep(5000);
dropHiveTables(hiveTableName, tableType);
}

// Ensure table does not exist
Pair<String, String> stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
assertTrue(stdOutErr.getLeft().isEmpty(), "Dropped table " + hiveTableName + " exists!");

// Run Hoodie Java App
String cmd;
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName;
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
} else {
cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
}

if (command.equals(HOODIE_JAVA_STREAMING_APP)) {
String streamingSourcePath = HDFS_STREAMING_SOURCE + hiveTableName;
String streamingCkptPath = HDFS_STREAMING_CKPT + hiveTableName;
cmd = cmd + " --streaming-source-path " + streamingSourcePath
+ " --streaming-checkpointing-path " + streamingCkptPath;
}
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);

String snapshotTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name())
? hiveTableName + "_rt" : hiveTableName;
Option<String> roTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name())
? Option.of(hiveTableName + "_ro") : Option.empty();

// Ensure table does exist
stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName + "'");
assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists");

// Ensure row count is 80 (without duplicates) (100 - 20 deleted)
stdOutErr = executeHiveCommand("select count(1) from " + snapshotTableName);
assertEquals(80, Integer.parseInt(stdOutErr.getLeft().trim()),
"Expecting 80 rows to be present in the snapshot table");

if (roTableName.isPresent()) {
stdOutErr = executeHiveCommand("select count(1) from " + roTableName.get());
assertEquals(80, Integer.parseInt(stdOutErr.getLeft().trim()),
"Expecting 80 rows to be present in the snapshot table");
}

// Make the HDFS dataset non-hoodie and run the same query; Checks for interoperability with non-hoodie tables
// Delete Hoodie directory to make it non-hoodie dataset
executeCommandStringInDocker(ADHOC_1_CONTAINER, "hdfs dfs -rm -r " + hdfsPath + "/.hoodie", true);

// Run the count query again. Without Hoodie, all versions are included. So we get a wrong count
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
stdOutErr = executeHiveCommand("select count(1) from " + roTableName.get());
} else {
stdOutErr = executeHiveCommand("select count(1) from " + snapshotTableName);
}
assertEquals(280, Integer.parseInt(stdOutErr.getLeft().trim()),
"Expecting 280 rows to be present in the new table");
}

public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType)
throws Exception {
testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, tableType, partitionType);
}

private void dropHiveTables(String hiveTableName, String tableType) throws Exception {
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
executeHiveCommand("drop table if exists " + hiveTableName + "_ro");
} else {
executeHiveCommand("drop table if exists " + hiveTableName);
}
}
}

0 comments on commit 3a331d1

Please sign in to comment.