forked from apache/seatunnel
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Feature] Add SaveMode support for hive connector
- Loading branch information
Showing
5 changed files
with
536 additions
and
166 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
253 changes: 253 additions & 0 deletions
253
...ive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/catalog/HiveCatalog.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.hive.catalog; | ||
|
||
import org.apache.seatunnel.shade.com.typesafe.config.Config; | ||
|
||
import org.apache.seatunnel.api.table.catalog.Catalog; | ||
import org.apache.seatunnel.api.table.catalog.CatalogTable; | ||
import org.apache.seatunnel.api.table.catalog.Column; | ||
import org.apache.seatunnel.api.table.catalog.TableIdentifier; | ||
import org.apache.seatunnel.api.table.catalog.TablePath; | ||
import org.apache.seatunnel.api.table.catalog.TableSchema; | ||
import org.apache.seatunnel.api.table.catalog.exception.CatalogException; | ||
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; | ||
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; | ||
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; | ||
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; | ||
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; | ||
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; | ||
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; | ||
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; | ||
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.hive.conf.HiveConf; | ||
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; | ||
import org.apache.hadoop.hive.metastore.api.Database; | ||
import org.apache.hadoop.hive.metastore.api.FieldSchema; | ||
import org.apache.hadoop.hive.metastore.api.MetaException; | ||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor; | ||
import org.apache.hadoop.hive.metastore.api.Table; | ||
import org.apache.thrift.TException; | ||
|
||
import java.io.File; | ||
import java.net.MalformedURLException; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
public class HiveCatalog implements Catalog { | ||
|
||
private HiveMetaStoreClient hiveMetaStoreClient; | ||
|
||
public HiveCatalog(Config config) { | ||
String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key()); | ||
HiveConf hiveConf = new HiveConf(); | ||
hiveConf.set("hive.metastore.uris", metastoreUri); | ||
if (config.hasPath(BaseSourceConfig.KERBEROS_PRINCIPAL.key()) | ||
&& config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key())) { | ||
String principal = config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key()); | ||
String keytabPath = config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key()); | ||
Configuration configuration = new Configuration(); | ||
FileSystemUtils.doKerberosAuthentication(configuration, principal, keytabPath); | ||
} | ||
try { | ||
if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) { | ||
String hiveSitePath = config.getString(HiveConfig.HIVE_SITE_PATH.key()); | ||
hiveConf.addResource(new File(hiveSitePath).toURI().toURL()); | ||
} | ||
hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); | ||
} catch (MetaException e) { | ||
String errorMsg = | ||
String.format( | ||
"Using this hive uris [%s] to initialize " | ||
+ "hive metastore client instance failed", | ||
metastoreUri); | ||
throw new HiveConnectorException( | ||
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e); | ||
} catch (MalformedURLException e) { | ||
String errorMsg = | ||
String.format( | ||
"Using this hive uris [%s], hive conf [%s] to initialize " | ||
+ "hive metastore client instance failed", | ||
metastoreUri, config.getString(HiveConfig.HIVE_SITE_PATH.key())); | ||
throw new HiveConnectorException( | ||
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e); | ||
} | ||
} | ||
|
||
@Override | ||
public void open() throws CatalogException {} | ||
|
||
@Override | ||
public void close() { | ||
hiveMetaStoreClient.close(); | ||
} | ||
|
||
@Override | ||
public String getDefaultDatabase() throws CatalogException { | ||
return HiveCatalogUtils.DEFAULT_DB; | ||
} | ||
|
||
@Override | ||
public boolean databaseExists(String databaseName) throws CatalogException { | ||
return listDatabases().contains(databaseName); | ||
} | ||
|
||
@Override | ||
public List<String> listDatabases() throws CatalogException { | ||
try { | ||
return hiveMetaStoreClient.getAllDatabases(); | ||
} catch (MetaException e) { | ||
throw new CatalogException( | ||
String.format("Failed to list databases. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public List<String> listTables(String databaseName) | ||
throws CatalogException, DatabaseNotExistException { | ||
try { | ||
return hiveMetaStoreClient.getAllTables(databaseName); | ||
} catch (MetaException e) { | ||
throw new CatalogException(String.format("Failed to list tables. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean tableExists(TablePath tablePath) throws CatalogException { | ||
try { | ||
return hiveMetaStoreClient | ||
.getAllTables(tablePath.getDatabaseName()) | ||
.contains(tablePath.getTableName()); | ||
} catch (MetaException e) { | ||
throw new CatalogException( | ||
String.format("Failed to check table is exist. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public CatalogTable getTable(TablePath tablePath) | ||
throws CatalogException, TableNotExistException { | ||
try { | ||
Table table = | ||
hiveMetaStoreClient.getTable( | ||
tablePath.getDatabaseName(), tablePath.getTableName()); | ||
List<Column> cols = | ||
table.getSd().getCols().stream() | ||
.map(HiveCatalogUtils::hiveFieldSchemaToSTColumn) | ||
.collect(Collectors.toList()); | ||
List<String> partitionKeys = | ||
table.getPartitionKeys().stream() | ||
.map(FieldSchema::getName) | ||
.collect(Collectors.toList()); | ||
TableIdentifier tableIdentifier = | ||
TableIdentifier.of( | ||
HiveCatalogUtils.CATALOG_NAME, table.getDbName(), table.getTableName()); | ||
TableSchema tableSchema = new TableSchema.Builder().columns(cols).build(); | ||
CatalogTable catalogTable = | ||
CatalogTable.of( | ||
tableIdentifier, | ||
tableSchema, | ||
table.getParameters(), | ||
partitionKeys, | ||
null, | ||
HiveCatalogUtils.CATALOG_NAME); | ||
return CatalogTable.of(tableIdentifier, catalogTable); | ||
} catch (TException e) { | ||
throw new CatalogException( | ||
String.format("Failed to get table information. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) | ||
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { | ||
if (tableExists(tablePath) && ignoreIfExists) { | ||
return; | ||
} | ||
try { | ||
Table tbl = new Table(); | ||
tbl.setDbName(tablePath.getDatabaseName()); | ||
tbl.setTableName(tablePath.getTableName()); | ||
StorageDescriptor storageDescriptor = new StorageDescriptor(); | ||
List<FieldSchema> cols = | ||
table.getTableSchema().getColumns().stream() | ||
.map(HiveCatalogUtils::stColumnToHiveFieldSchema) | ||
.collect(Collectors.toList()); | ||
List<FieldSchema> partitionKeys = | ||
cols.stream() | ||
.filter(c -> table.getPartitionKeys().contains(c.getName())) | ||
.collect(Collectors.toList()); | ||
storageDescriptor.setLocation(table.getOptions().get(HiveCatalogUtils.LOCATION)); | ||
tbl.setPartitionKeys(partitionKeys); | ||
storageDescriptor.setCols(cols); | ||
tbl.setParameters(table.getOptions()); | ||
|
||
tbl.setSd(storageDescriptor); | ||
hiveMetaStoreClient.createTable(tbl); | ||
} catch (TException e) { | ||
throw new CatalogException(String.format("Failed to create table. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) | ||
throws TableNotExistException, CatalogException { | ||
try { | ||
hiveMetaStoreClient.dropTable(tablePath.getDatabaseName(), tablePath.getTableName()); | ||
} catch (TException e) { | ||
throw new CatalogException(String.format("Failed to drop table. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public void createDatabase(TablePath tablePath, boolean ignoreIfExists) | ||
throws DatabaseAlreadyExistException, CatalogException { | ||
try { | ||
Database db = new Database(); | ||
db.setName(tablePath.getDatabaseName()); | ||
hiveMetaStoreClient.createDatabase(db); | ||
} catch (TException e) { | ||
throw new CatalogException( | ||
String.format("Failed to create database. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) | ||
throws DatabaseNotExistException, CatalogException { | ||
try { | ||
hiveMetaStoreClient.dropDatabase(tablePath.getDatabaseName()); | ||
} catch (TException e) { | ||
throw new CatalogException( | ||
String.format("Failed to drop database. %s", e.getMessage())); | ||
} | ||
} | ||
|
||
@Override | ||
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) | ||
throws TableNotExistException, CatalogException { | ||
throw new CatalogException("Hive Catalog is not support truncate Table!"); | ||
} | ||
|
||
@Override | ||
public void executeSql(TablePath tablePath, String sql) { | ||
throw new CatalogException("Hive Catalog is not support execute custom sql!"); | ||
} | ||
} |
50 changes: 50 additions & 0 deletions
50
...rc/main/java/org/apache/seatunnel/connectors/seatunnel/hive/catalog/HiveCatalogUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.hive.catalog; | ||
|
||
import org.apache.seatunnel.api.table.catalog.Column; | ||
import org.apache.seatunnel.api.table.catalog.PhysicalColumn; | ||
|
||
import org.apache.hadoop.hive.metastore.api.FieldSchema; | ||
|
||
public class HiveCatalogUtils { | ||
|
||
public static final String CATALOG_NAME = "Hive"; | ||
public static final String DEFAULT_DB = "default"; | ||
public static final String LOCATION = "LOCATION"; | ||
|
||
public static Column hiveFieldSchemaToSTColumn(FieldSchema fieldSchema) { | ||
return PhysicalColumn.of( | ||
fieldSchema.getName(), | ||
null, | ||
0, | ||
true, | ||
null, | ||
fieldSchema.getComment(), | ||
fieldSchema.getType(), | ||
false, | ||
false, | ||
null, | ||
null, | ||
null); | ||
} | ||
|
||
public static FieldSchema stColumnToHiveFieldSchema(Column column) { | ||
return new FieldSchema(column.getName(), column.getSourceType(), null); | ||
} | ||
} |
Oops, something went wrong.