diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index f9f35aaf733..afa9893d5b2 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -33,17 +33,18 @@ Read all the data in a split in a pollNext call. What splits are read will be sa ## Options -| name | type | required | default value | -|----------------------|--------|----------|---------------| -| table_name | string | yes | - | -| metastore_uri | string | yes | - | -| kerberos_principal | string | no | - | -| kerberos_keytab_path | string | no | - | -| hdfs_site_path | string | no | - | -| hive_site_path | string | no | - | -| read_partitions | list | no | - | -| read_columns | list | no | - | -| common-options | | no | - | +| name | type | required | default value | +|-------------------------------|---------|----------|---------------| +| table_name | string | yes | - | +| metastore_uri | string | yes | - | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | +| hdfs_site_path | string | no | - | +| hive_site_path | string | no | - | +| read_partitions | list | no | - | +| read_columns | list | no | - | +| abort_drop_partition_metadata | boolean | no | true | +| common-options | | no | - | ### table_name [string] @@ -80,6 +81,10 @@ The keytab file path of kerberos authentication The read column list of the data source, user can use it to implement field projection. +### abort_drop_partition_metadata [list] + +Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process). + ### common options Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java index 7d7c271e1da..4934cc2aa12 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java @@ -34,11 +34,14 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA; + @Slf4j public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter { private final Config pluginConfig; private final String dbName; private final String tableName; + private final boolean abortDropPartitionMetadata; public HiveSinkAggregatedCommitter( Config pluginConfig, String dbName, String tableName, FileSystemUtils fileSystemUtils) { @@ -46,6 +49,10 @@ public HiveSinkAggregatedCommitter( this.pluginConfig = pluginConfig; this.dbName = dbName; this.tableName = tableName; + this.abortDropPartitionMetadata = + pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key()) + ? pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key()) + : ABORT_DROP_PARTITION_METADATA.defaultValue(); } @Override @@ -79,21 +86,23 @@ public List commit( @Override public void abort(List aggregatedCommitInfos) throws Exception { super.abort(aggregatedCommitInfos); - HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig); - for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) { - Map> partitionDirAndValuesMap = - aggregatedCommitInfo.getPartitionDirAndValuesMap(); - List partitions = - partitionDirAndValuesMap.keySet().stream() - .map(partition -> partition.replaceAll("\\\\", "/")) - .collect(Collectors.toList()); - try { - hiveMetaStore.dropPartitions(dbName, tableName, partitions); - log.info("Remove these partitions {}", partitions); - } catch (TException e) { - log.error("Failed to remove these partitions {}", partitions, e); + if (abortDropPartitionMetadata) { + HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig); + for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) { + Map> partitionDirAndValuesMap = + aggregatedCommitInfo.getPartitionDirAndValuesMap(); + List partitions = + partitionDirAndValuesMap.keySet().stream() + .map(partition -> partition.replaceAll("\\\\", "/")) + .collect(Collectors.toList()); + try { + hiveMetaStore.dropPartitions(dbName, tableName, partitions); + log.info("Remove these partitions {}", partitions); + } catch (TException e) { + log.error("Failed to remove these partitions {}", partitions, e); + } } + hiveMetaStore.close(); } - hiveMetaStore.close(); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java index 142863b5135..8cf00b8c307 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java @@ -38,11 +38,19 @@ public class HiveConfig { .noDefaultValue() .withDescription("Hive metastore uri"); + public static final Option ABORT_DROP_PARTITION_METADATA = + Options.key("abort_drop_partition_metadata") + .booleanType() + .defaultValue(false) + .withDescription( + "Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process)."); + public static final Option HIVE_SITE_PATH = Options.key("hive_site_path") .stringType() .noDefaultValue() .withDescription("The path of hive-site.xml"); + public static final String TEXT_INPUT_FORMAT_CLASSNAME = "org.apache.hadoop.mapred.TextInputFormat"; public static final String TEXT_OUTPUT_FORMAT_CLASSNAME = diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java index 6674b778c4a..b98f6cffa50 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java @@ -24,6 +24,8 @@ import com.google.auto.service.AutoService; +import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA; + @AutoService(Factory.class) public class HiveSinkFactory implements TableSinkFactory { @Override @@ -36,6 +38,7 @@ public OptionRule optionRule() { return OptionRule.builder() .required(HiveConfig.TABLE_NAME) .required(HiveConfig.METASTORE_URI) + .optional(ABORT_DROP_PARTITION_METADATA) .build(); } }