Skip to content

Commit

Permalink
open reader only once with correct schema (apache#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored and pwoody committed Nov 17, 2016
1 parent 6928f1a commit aa4e13d
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
// then we need to apply the predicate push down filter
footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
FilterCompat.Filter filter = getFilter(configuration);
ParquetFileReader reader = ParquetFileReader.open(taskAttemptContext.getConfiguration(), file, footer);
this.reader = ParquetFileReader.open(configuration, file, footer);
blocks = filterRowGroups(
ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS, RowGroupFilter.FilterLevel.DICTIONARY),
filter,
footer.getBlocks(),
reader);
reader.close();
} else {
// otherwise we find the row groups that were selected on the client
footer = readFooter(configuration, file, NO_FILTER);
Expand Down Expand Up @@ -141,17 +140,18 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
+ " out of: " + Arrays.toString(foundRowGroupOffsets)
+ " in range " + split.getStart() + ", " + split.getEnd());
}
this.reader = new ParquetFileReader(configuration, file, footer);
}
this.fileSchema = footer.getFileMetaData().getSchema();
Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
reader.setRequestedSchema(requestedSchema);
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
this.reader = new ParquetFileReader(configuration, file, footer);
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
Expand Down

0 comments on commit aa4e13d

Please sign in to comment.