Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow creating tables with avro.schema.literal #14426

Merged
merged 1 commit into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -207,6 +209,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isSortedWritingEnabled;
import static io.trino.plugin.hive.HiveSessionProperties.isStatisticsEnabled;
import static io.trino.plugin.hive.HiveTableProperties.AUTO_PURGE;
import static io.trino.plugin.hive.HiveTableProperties.AVRO_SCHEMA_LITERAL;
import static io.trino.plugin.hive.HiveTableProperties.AVRO_SCHEMA_URL;
import static io.trino.plugin.hive.HiveTableProperties.BUCKETED_BY_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.BUCKET_COUNT_PROPERTY;
Expand All @@ -224,6 +227,7 @@
import static io.trino.plugin.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
import static io.trino.plugin.hive.HiveTableProperties.TEXTFILE_FIELD_SEPARATOR;
import static io.trino.plugin.hive.HiveTableProperties.TEXTFILE_FIELD_SEPARATOR_ESCAPE;
import static io.trino.plugin.hive.HiveTableProperties.getAvroSchemaLiteral;
import static io.trino.plugin.hive.HiveTableProperties.getAvroSchemaUrl;
import static io.trino.plugin.hive.HiveTableProperties.getBucketProperty;
import static io.trino.plugin.hive.HiveTableProperties.getExternalLocation;
Expand Down Expand Up @@ -341,6 +345,7 @@ public class HiveMetadata
private static final String NULL_FORMAT_KEY = serdeConstants.SERIALIZATION_NULL_FORMAT;

public static final String AVRO_SCHEMA_URL_KEY = "avro.schema.url";
public static final String AVRO_SCHEMA_LITERAL_KEY = "avro.schema.literal";

private static final String CSV_SEPARATOR_KEY = OpenCSVSerde.SEPARATORCHAR;
private static final String CSV_QUOTE_KEY = OpenCSVSerde.QUOTECHAR;
Expand Down Expand Up @@ -640,6 +645,10 @@ private ConnectorTableMetadata doGetTableMetadata(ConnectorSession session, Sche
if (avroSchemaUrl != null) {
properties.put(AVRO_SCHEMA_URL, avroSchemaUrl);
}
String avroSchemaLiteral = table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY);
if (avroSchemaLiteral != null) {
properties.put(AVRO_SCHEMA_LITERAL, avroSchemaLiteral);
}

// Textfile and CSV specific properties
getSerdeProperty(table, SKIP_HEADER_COUNT_KEY)
Expand Down Expand Up @@ -900,6 +909,10 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
throw new TrinoException(NOT_SUPPORTED, "Bucketing columns not supported when Avro schema url is set");
}

if (bucketProperty.isPresent() && getAvroSchemaLiteral(tableMetadata.getProperties()) != null) {
throw new TrinoException(NOT_SUPPORTED, "Bucketing/Partitioning columns not spported when Avro schema literal is set");
}

validateTimestampColumns(tableMetadata.getColumns(), getTimestampPrecision(session));
List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy));
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
Expand Down Expand Up @@ -994,10 +1007,16 @@ private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata table

// Avro specific properties
String avroSchemaUrl = getAvroSchemaUrl(tableMetadata.getProperties());
String avroSchemaLiteral = getAvroSchemaLiteral(tableMetadata.getProperties());
checkAvroSchemaProperties(avroSchemaUrl, avroSchemaLiteral);
if (avroSchemaUrl != null) {
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_URL);
tableProperties.put(AVRO_SCHEMA_URL_KEY, validateAndNormalizeAvroSchemaUrl(avroSchemaUrl, hdfsContext));
}
else if (avroSchemaLiteral != null) {
checkFormatForProperty(hiveStorageFormat, HiveStorageFormat.AVRO, AVRO_SCHEMA_LITERAL);
tableProperties.put(AVRO_SCHEMA_LITERAL_KEY, validateAvroSchemaLiteral(avroSchemaLiteral));
}

// Textfile and CSV specific properties
Set<HiveStorageFormat> csvAndTextFile = ImmutableSet.of(HiveStorageFormat.TEXTFILE, HiveStorageFormat.CSV);
Expand Down Expand Up @@ -1127,6 +1146,24 @@ private String validateAndNormalizeAvroSchemaUrl(String url, HdfsContext context
}
}

private static void checkAvroSchemaProperties(String avroSchemaUrl, String avroSchemaLiteral)
{
if (avroSchemaUrl != null && avroSchemaLiteral != null) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "avro_schema_url and avro_schema_literal cannot both be set");
}
}

private static String validateAvroSchemaLiteral(String avroSchemaLiteral)
{
try {
new Schema.Parser().parse(avroSchemaLiteral);
return avroSchemaLiteral;
}
catch (SchemaParseException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Failed to parse Avro schema: " + avroSchemaLiteral, e);
}
}

private static Path getExternalLocationAsPath(String location)
{
try {
Expand Down Expand Up @@ -1263,6 +1300,9 @@ private void failIfAvroSchemaIsSet(HiveTableHandle handle)
if (table.getParameters().containsKey(AVRO_SCHEMA_URL_KEY) || table.getStorage().getSerdeParameters().containsKey(AVRO_SCHEMA_URL_KEY)) {
throw new TrinoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema url is set");
}
if (table.getParameters().containsKey(AVRO_SCHEMA_LITERAL_KEY) || table.getStorage().getSerdeParameters().containsKey(AVRO_SCHEMA_LITERAL_KEY)) {
throw new TrinoException(NOT_SUPPORTED, "ALTER TABLE not supported when Avro schema literal is set");
}
}

@Override
Expand Down Expand Up @@ -1424,6 +1464,10 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
throw new TrinoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema url is set");
}

if (getAvroSchemaLiteral(tableMetadata.getProperties()) != null) {
throw new TrinoException(NOT_SUPPORTED, "CREATE TABLE AS not supported when Avro schema literal is set");
}

getHeaderSkipCount(tableMetadata.getProperties()).ifPresent(headerSkipCount -> {
if (headerSkipCount > 1) {
throw new TrinoException(NOT_SUPPORTED, format("Creating Hive table with data with value of %s property greater than 1 is not supported", SKIP_HEADER_COUNT_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class HiveTableProperties
public static final String ORC_BLOOM_FILTER_COLUMNS = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP = "orc_bloom_filter_fpp";
public static final String AVRO_SCHEMA_URL = "avro_schema_url";
public static final String AVRO_SCHEMA_LITERAL = "avro_schema_literal";
public static final String TEXTFILE_FIELD_SEPARATOR = "textfile_field_separator";
public static final String TEXTFILE_FIELD_SEPARATOR_ESCAPE = "textfile_field_separator_escape";
public static final String NULL_FORMAT_PROPERTY = "null_format";
Expand Down Expand Up @@ -143,6 +144,7 @@ public HiveTableProperties(
integerProperty(BUCKETING_VERSION, "Bucketing version", null, false),
integerProperty(BUCKET_COUNT_PROPERTY, "Number of buckets", 0, false),
stringProperty(AVRO_SCHEMA_URL, "URI pointing to Avro schema for the table", null, false),
stringProperty(AVRO_SCHEMA_LITERAL, "JSON-encoded Avro schema for the table", null, false),
integerProperty(SKIP_HEADER_LINE_COUNT, "Number of header lines", null, false),
integerProperty(SKIP_FOOTER_LINE_COUNT, "Number of footer lines", null, false),
stringProperty(TEXTFILE_FIELD_SEPARATOR, "TEXTFILE field separator character", null, false),
Expand Down Expand Up @@ -185,6 +187,11 @@ public static String getAvroSchemaUrl(Map<String, Object> tableProperties)
return (String) tableProperties.get(AVRO_SCHEMA_URL);
}

public static String getAvroSchemaLiteral(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(AVRO_SCHEMA_LITERAL);
}

public static Optional<Integer> getHeaderSkipCount(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Integer) tableProperties.get(SKIP_HEADER_LINE_COUNT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveMetadata.AVRO_SCHEMA_LITERAL_KEY;
import static io.trino.plugin.hive.HiveMetadata.AVRO_SCHEMA_URL_KEY;
import static io.trino.plugin.hive.HiveSplitManager.PRESTO_OFFLINE;
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
Expand Down Expand Up @@ -217,8 +218,10 @@ public static ProtectMode getProtectMode(Table table)
public static boolean isAvroTableWithSchemaSet(Table table)
{
return AVRO.getSerde().equals(table.getStorage().getStorageFormat().getSerDeNullable()) &&
(table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null ||
(table.getStorage().getSerdeParameters().get(AVRO_SCHEMA_URL_KEY) != null));
((table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null ||
(table.getStorage().getSerdeParameters().get(AVRO_SCHEMA_URL_KEY) != null)) ||
(table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY) != null ||
(table.getStorage().getSerdeParameters().get(AVRO_SCHEMA_LITERAL_KEY) != null)));
}

public static String makePartitionName(Table table, Partition partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import static io.trino.plugin.hive.HiveColumnStatisticType.NUMBER_OF_TRUE_VALUES;
import static io.trino.plugin.hive.HiveColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveMetadata.AVRO_SCHEMA_LITERAL_KEY;
import static io.trino.plugin.hive.HiveMetadata.AVRO_SCHEMA_URL_KEY;
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
import static io.trino.plugin.hive.HiveStorageFormat.CSV;
Expand Down Expand Up @@ -415,8 +416,10 @@ public static boolean isAvroTableWithSchemaSet(org.apache.hadoop.hive.metastore.
SerDeInfo serdeInfo = getSerdeInfo(table);

return serdeInfo.getSerializationLib() != null &&
(table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null ||
(serdeInfo.getParameters() != null && serdeInfo.getParameters().get(AVRO_SCHEMA_URL_KEY) != null)) &&
((table.getParameters().get(AVRO_SCHEMA_URL_KEY) != null ||
(serdeInfo.getParameters() != null && serdeInfo.getParameters().get(AVRO_SCHEMA_URL_KEY) != null)) ||
(table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY) != null ||
(serdeInfo.getParameters() != null && serdeInfo.getParameters().get(AVRO_SCHEMA_LITERAL_KEY) != null))) &&
serdeInfo.getSerializationLib().equals(AVRO.getSerde());
}

Expand Down
Loading