Skip to content

Commit

Permalink
initial qpt datatype conversion in synapse
Browse files Browse the repository at this point in the history
  • Loading branch information
Jithendar12 committed Feb 8, 2024
1 parent ff99d07 commit 2410267
Showing 1 changed file with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -82,6 +84,7 @@
import java.util.stream.Collectors;

import static com.amazonaws.athena.connector.lambda.domain.predicate.functions.StandardFunctions.IS_DISTINCT_FROM_OPERATOR_FUNCTION_NAME;
import static java.util.Map.entry;

public class SynapseMetadataHandler extends JdbcMetadataHandler
{
Expand Down Expand Up @@ -332,6 +335,82 @@ public GetTableResponse doGetTable(final BlockAllocator blockAllocator, final Ge
}
}

@Override
public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAllocator, final GetTableRequest getTableRequest)
throws Exception
{
if (!getTableRequest.isQueryPassthrough()) {
throw new IllegalArgumentException("No Query passed through [{}]" + getTableRequest);
}

JdbcQueryPassthrough qpt = JdbcQueryPassthrough.getInstance();
QueryPassthroughSignature.verifyQueryPassthroughArguments(
getTableRequest.getQueryPassthroughArguments(), qpt.arguments());
//For JDBC; generally speaking we only have one argument; namely "QUERY" -- Other connectors might need more
String customerPassedQuery = getTableRequest.getQueryPassthroughArguments().get(qpt.getQueryArgument());

try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) {
PreparedStatement preparedStatement = connection.prepareStatement(customerPassedQuery);
ResultSetMetaData metadata = preparedStatement.getMetaData();
if (metadata == null) {
throw new UnsupportedOperationException("Query not supported: ResultSetMetaData not available for query: " + customerPassedQuery);
}
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();

for (int columnIndex = 1; columnIndex <= metadata.getColumnCount(); columnIndex++) {
String columnName = metadata.getColumnName(columnIndex);
String columnLabel = metadata.getColumnLabel(columnIndex);
//todo; is there a mechanism to pass both back to the engine?
columnName = columnName.equals(columnLabel) ? columnName : columnLabel;

int precision = metadata.getPrecision(columnIndex);
int scale = metadata.getScale(columnIndex);

ArrowType columnType = JdbcArrowTypeConverter.toArrowType(
metadata.getColumnType(columnIndex),
precision,
scale,
configOptions);
String dataType = metadata.getColumnTypeName(columnIndex);
final Map<String, ArrowType> stringArrowTypeMap = Map.ofEntries(
entry("BIT", Types.MinorType.TINYINT.getType()),
entry("TINYINT", Types.MinorType.SMALLINT.getType()),
entry("NUMERIC", Types.MinorType.FLOAT8.getType()),
entry("SMALLMONEY", Types.MinorType.FLOAT8.getType()),
entry("DATE", Types.MinorType.DATEDAY.getType()),
entry("DATETIME", Types.MinorType.DATEMILLI.getType()),
entry("DATETIME2", Types.MinorType.DATEMILLI.getType()),
entry("SMALLDATETIME", Types.MinorType.DATEMILLI.getType()),
entry("DATETIMEOFFSET", Types.MinorType.DATEMILLI.getType())
);
if (dataType != null && stringArrowTypeMap.containsKey(dataType.toUpperCase())) {
columnType = stringArrowTypeMap.get(dataType.toUpperCase());
}

if (columnType != null && SupportedTypes.isSupported(columnType)) {
if (columnType instanceof ArrowType.List) {
schemaBuilder.addListField(columnName, getArrayArrowTypeFromTypeName(
metadata.getTableName(columnIndex),
metadata.getColumnDisplaySize(columnIndex),
precision));
}
else {
schemaBuilder.addField(FieldBuilder.newBuilder(columnName, columnType).build());
}
}
else {
// Default to VARCHAR ArrowType
LOGGER.warn("getSchema: Unable to map type for column[" + columnName +
"] to a supported type, attempted " + columnType + " - defaulting type to VARCHAR.");
schemaBuilder.addField(FieldBuilder.newBuilder(columnName, new ArrowType.Utf8()).build());
}
}

Schema schema = schemaBuilder.build();
return new GetTableResponse(getTableRequest.getCatalogName(), getTableRequest.getTableName(), schema, Collections.emptySet());
}
}

/**
* Appropriate datatype to arrow type conversions will be done by fetching data types of columns
* @param jdbcConnection
Expand Down

0 comments on commit 2410267

Please sign in to comment.