Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tests for different filter types Binary and Numeric #115

Merged
merged 17 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cloudbuild/presubmit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ case $STEP in
;;

acceptance-test)
$MVN test -T 1C -Dtest=DataprocImage20AcceptanceTest
$MVN test -T 1C -Dtest=DataprocImage21AcceptanceTest
$MVN test -T 2C -Dtest=DataprocImage20AcceptanceTest,DataprocImage21AcceptanceTest
;;

*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package com.google.cloud.spark.spanner;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
Expand All @@ -36,36 +37,53 @@
public class SpannerScanBuilder
implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
private CaseInsensitiveStringMap opts;
private Set<Filter> filters;
private Set<Filter> pushedFilters;
private List<String> requiredColumns;
private SpannerScanner scanner;
private static final Logger log = LoggerFactory.getLogger(SpannerScanBuilder.class);
private SpannerTable spannerTable;
private Map<String, StructField> fields;

public SpannerScanBuilder(CaseInsensitiveStringMap options) {
this.opts = options;
this.filters = new HashSet<Filter>();
this.pushedFilters = new HashSet<Filter>();
this.spannerTable = new SpannerTable(options);
this.fields = new LinkedHashMap<>();
for (StructField field : spannerTable.schema().fields()) {
fields.put(field.name(), field);
}
}

@Override
public Scan build() {
this.scanner = new SpannerScanner(this.opts.asCaseSensitiveMap());
this.scanner =
new SpannerScanner(this.opts.asCaseSensitiveMap(), this.spannerTable, this.fields);
this.scanner.setFilters(this.pushedFilters());
return this.scanner;
}

@Override
public Filter[] pushedFilters() {
return this.filters.toArray(new Filter[this.filters.size()]);
return this.pushedFilters.toArray(new Filter[this.pushedFilters.size()]);
}

@Override
public Filter[] pushFilters(Filter[] filters) {
this.filters.addAll(Arrays.asList(filters));
Filter[] allSetFilters = this.filters.toArray(new Filter[this.filters.size()]);
List<Filter> handledFilters = new ArrayList<>();
List<Filter> unhandledFilters = new ArrayList<>();
for (Filter filter : filters) {
if (SparkFilterUtils.isTopLevelFieldHandled(false, filter, fields)) {
handledFilters.add(filter);
} else {
unhandledFilters.add(filter);
}
}

this.pushedFilters.addAll(handledFilters);
if (this.scanner != null) {
this.scanner.setFilters(allSetFilters);
this.scanner.setFilters(this.pushedFilters.toArray(new Filter[this.pushedFilters.size()]));
}
return allSetFilters;
return unhandledFilters.stream().toArray(Filter[]::new);
}

public StructType readSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,10 +48,13 @@ public class SpannerScanner implements Batch, Scan {
private Map<String, String> opts;
private static final Logger log = LoggerFactory.getLogger(SpannerScanner.class);
private final Timestamp INIT_TIME = Timestamp.now();
private Map<String, StructField> fields;

public SpannerScanner(Map<String, String> opts) {
public SpannerScanner(
Map<String, String> opts, SpannerTable spannerTable, Map<String, StructField> fields) {
this.opts = opts;
this.spannerTable = new SpannerTable(opts);
this.spannerTable = spannerTable;
this.fields = fields;
}

@Override
Expand Down Expand Up @@ -98,6 +102,7 @@ public InputPartition[] planInputPartitions() {
true,
Optional.empty(),
batchClient.databaseClient.getDialect().equals(Dialect.POSTGRESQL),
fields,
filters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,16 +100,34 @@ public StructType createSchema(String tableName, ResultSet rs, boolean isPostgre
String columnName = row.getString(0);
// Integer ordinalPosition = column.getInt(1);
boolean isNullable = row.getBoolean(2);
String colType = row.getString(3);
DataType catalogType =
isPostgreSql
? SpannerTable.ofSpannerStrTypePg(row.getString(3), isNullable)
: SpannerTable.ofSpannerStrType(row.getString(3), isNullable);
schema = schema.add(columnName, catalogType, isNullable, "" /* No comments for the text */);
? SpannerTable.ofSpannerStrTypePg(colType, isNullable)
: SpannerTable.ofSpannerStrType(colType, isNullable);
MetadataBuilder metadataBuilder = new MetadataBuilder();
if (isJson(colType)) {
metadataBuilder.putString(SpannerUtils.COLUMN_TYPE, "json");
schema = schema.add(columnName, catalogType, isNullable, metadataBuilder.build());
} else if (isJsonb(colType)) {
metadataBuilder.putString(SpannerUtils.COLUMN_TYPE, "jsonb");
schema = schema.add(columnName, catalogType, isNullable, metadataBuilder.build());
} else {
schema = schema.add(columnName, catalogType, isNullable, "" /* No comments for the text */);
}
}
this.tableSchema = schema;
return schema;
}

public static boolean isJson(String spannerStrType) {
return "JSON".equals(spannerStrType.trim().toUpperCase());
}

public static boolean isJsonb(String spannerStrType) {
return "jsonb".equals(spannerStrType.trim().toLowerCase());
}

public static DataType ofSpannerStrType(String spannerStrType, boolean isNullable) {
// Trim both ends of the string firstly, it could have come in as:
// " STRUCT<a STRING(10), b INT64> "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class SpannerUtils {
.setMaxAttempts(100)
.build();
private static final ObjectMapper jsonMapper = new ObjectMapper();
public static final String COLUMN_TYPE = "col_type";

public static Long SECOND_TO_DAYS = 60 * 60 * 24L;

Expand Down
Loading