-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for decimal batch reader #22636
Conversation
|
b0972c0
to
96f2271
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work, @wypb
looks nice. a few comments about comments, and code structure
switch (length) { | ||
case 8: | ||
value |= bytes[startOffset + 7] & 0xFFL; | ||
// fall through |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line comment seems not useful
same for following lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
|
||
DecimalLogicalTypeAnnotation decimalLogicalTypeAnnotation = (DecimalLogicalTypeAnnotation) logicalTypeAnnotation; | ||
return decimalLogicalTypeAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import Decimals.MAX_SHORT_PRECISION
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
public class ShortDecimalFixedWidthByteArrayBatchDecoder | ||
{ | ||
private static final ShortDecimalDecoder[] VALUE_DECODERS = new ShortDecimalDecoder[] { | ||
new BigEndianReader1(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need 7 readers? add a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually further optimizes the reading speed of short decimals. The implementation of ShortDecimalFixedWidthByteArrayBatchDecoder
actually refers to the implementation of Trino: trinodb/trino@f71a815
import static com.facebook.presto.parquet.ParquetTypeUtils.getShortDecimalValue; | ||
import static com.google.common.base.Preconditions.checkArgument; | ||
|
||
public class BinaryShortDecimalDeltaValuesDecoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we have an abstract class, where BinaryLongDecimalDeltaValuesDecoder and BinaryShortDecimalDeltaValuesDecoder are its subclasses?
The code of BinaryLongDecimalDeltaValuesDecoder and BinaryShortDecimalDeltaValuesDecoder has lots of same code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see how to refactor these two classes because they implement different interfaces.
import static com.google.common.base.Preconditions.checkArgument; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class Int64ShortDecimalDeltaValuesDecoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we have an abstract class, and Int32ShortDecimalDeltaValuesDecoder, Int64ShortDecimalDeltaValuesDecoder becomes its sub-classes? The classes share many same code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The common parts of Int32ShortDecimalDeltaValuesDecoder
and Int64ShortDecimalDeltaValuesDecoder
have been extracted into AbstractInt64AndInt32ShortDecimalDeltaValuesDecoder
.
import static io.airlift.slice.SizeOf.SIZE_OF_LONG; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class LongDecimalApacheParquetValueDecoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad naming. LongDecimalApacheParquetValueDecoder
why Apache and Parquet appears in the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have renamed LongDecimalApacheParquetValueDecoder
and ShortDecimalApacheParquetValueDecoder
to FixedLenByteArrayShortDecimalDeltaValueDecoder
and FixedLenByteArrayLongDecimalDeltaValueDecoder
respectively.
import static com.google.common.base.Preconditions.checkArgument; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class ShortDecimalApacheParquetValueDecoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad naming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wypb Thank you so much for this great work. I wonder if you could add tests?
presto-common/src/main/java/com/facebook/presto/common/type/UnscaledDecimal128Arithmetic.java
Outdated
Show resolved
Hide resolved
return decimalLogicalTypeAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION; | ||
} | ||
|
||
public static boolean isLongDecimalType(ColumnDescriptor descriptor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already removed.
presto-parquet/src/test/java/com/facebook/presto/parquet/BenchmarkParquetReader.java
Outdated
Show resolved
Hide resolved
presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/Decoders.java
Outdated
Show resolved
Hide resolved
presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java
Show resolved
Hide resolved
int inputBytesOffset = input.getByteArrayOffset(); | ||
for (int i = offset; i < offset + length; i++) { | ||
checkBytesFitInShortDecimal(inputBytes, inputBytesOffset, extraBytesLength, columnDescriptor); | ||
values[i] = getShortDecimalValue(inputBytes, inputBytesOffset + extraBytesLength, Long.BYTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if extraBytesLength < 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the implementation of the code, extraBytesLength
will not be less than 0, but must be greater than 0.
if (typeLength <= Long.BYTES) {
....
}
int extraBytesLength = typeLength - Long.BYTES;
// Equivalent to expectedValue = bytes[endOffset] < 0 ? -1 : 0 | ||
byte expectedValue = (byte) (bytes[endOffset] >> 7); | ||
for (int i = offset; i < endOffset; i++) { | ||
if (bytes[i] != expectedValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain how this works? Suppose inputBytesOffset = 0 and typeLength=9 here, then your extraBytesLength = 1, and checkBytesFitInShortDecimal(inputBytes, 0, 1, descriptor) is called. And your expectedValue is to check if inputBytes[1] < 0 ? -1 : 0. Since the values are encoded big-endian byte order, I assume you wanted to check the the most significant byte which should be inputBytes[0], but you're checking the second most significant byte inputBytes[1]. Does that work?
Also The largest precision for short decimal is 18 and the value is 999,999,999,999,999,999. It can be expressed with 60 bits value 0xDE0B6B3A763FFFF. If you really need to verify there is no overflow, the bits 61-64 also need to be checked. I don't see it's done here. Could you explain a little bit your idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bytes[endOffset] of the above code is actually the most significant byte. To illustrate this better, I built a test locally. The original data is 123456789012.12345678, typeLength=14. the contents of bytes
are [2, 0, 0, 0, 3, 1, 0, 0, 0, 0, 0, 0, -85, 84, -87, -116, -23, -53, -11, 78]
NOTE: -85, 84, -87, -116, -23, -53, -11, 78 converted to binary is actually the high 56-bit data of 12345678901212345678 converted to binary.
-85, 84, -87, -116, -23, -53, -11, 78 binary representation:
10101011010101001010100110001100111010011100101111110101
12345678901212345678 binary representation:
1010101101010100101010011000110011101001110010111111010101001110
In this scenario, inputBytesOffset = 6, extraBytesLength = 6, so endOffset = 12, bytes[endOffset] = -85, which is actually the most significant byte. Then we check whether bytes[6.. 11] is -1. Attached is the file I tested.
86216465-ad5d-4acc-bc8b-0d1972149d8c.tgz
6a1eda5
to
49c8f19
Compare
Codenotify: Notifying subscribers in CODENOTIFY files for diff 9898886...cabc10c. No notifications. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR seems short on new tests. I would have expected quite a number given all the new code.
presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java
Outdated
Show resolved
Hide resolved
|
||
for (int i = 0; i < bytes.length; i++) { | ||
value |= ((long) bytes[bytes.length - i - 1] & 0xFFL) << (8 * i); | ||
public static long getShortDecimalValue(byte[] bytes, int startOffset, int length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be private or not public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this method is used in many places.
return data; | ||
} | ||
|
||
private static int propagateSignBit(int value, int bitsToPad) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why here and in ByteUtils?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the code and found that this function can actually be deleted.
@@ -61,4 +61,9 @@ public static void unpack8Values(byte inByte, byte[] out, int outPos) | |||
out[6 + outPos] = (byte) (inByte >> 6 & 1); | |||
out[7 + outPos] = (byte) (inByte >> 7 & 1); | |||
} | |||
|
|||
public static long propagateSignBit(long value, int bitsToPad) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method could use a unit test that addresses it directly
public SimpleSliceInputStream(Slice slice, int offset) | ||
{ | ||
this.slice = requireNonNull(slice, "slice is null"); | ||
checkArgument(slice.length() == 0 || slice.hasByteArray(), "SimpleSliceInputStream supports only slices backed by byte array"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only supports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@zhenxiao @yingsu00 @elharo sorry for the late reply. I have made modifications according to the previous review comments.
I generated Parquet files with different encodings locally. I will see how to add them to the test set. |
Nit, suggest release note entry change:
|
153aa02
to
c182a88
Compare
@zhenxiao @yingsu00 I added three new tests in |
7fa98f4
to
fd3a640
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of new public API here that feels like it needs unit tests, e.g. SimpleSliceInputStream
HI @tdcmeehan @elharo Thank you for your review, I'll add some test cases soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -173,11 +234,46 @@ private static final ValuesDecoder createValuesDecoder(ColumnDescriptor columnDe | |||
|
|||
if ((encoding == DELTA_BYTE_ARRAY || encoding == DELTA_LENGTH_BYTE_ARRAY) && type == PrimitiveTypeName.BINARY) { | |||
ByteBufferInputStream inputStream = ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer, offset, length)); | |||
|
|||
Optional<Type> prestoType = createDecimalType(columnDescriptor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is from existing code, but it is confusing to detect if the logical type is decimal by calling createDecimalType() and check its returned type. It'll be clearer to make the type detecting consistent with other encodings, e.g. as what you did on line 195, 196
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already refactored, thanks.
|
||
public FixedLenByteArrayShortDecimalPlainValuesDecoder(ColumnDescriptor columnDescriptor, byte[] byteBuffer, int bufferOffset, int length) | ||
{ | ||
this.columnDescriptor = columnDescriptor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the existing decoders code has the same pattern, but can you please add requireNonNull() on columnDescriptor and byteBuffer? Same for all other constructors for the classes you add. Thanks!
HI @elharo @tdcmeehan @yingsu00 thanks for your inputs and sorry for the delay. I modified some code according to the previous review and added a new test case |
throws Exception | ||
{ | ||
for (int precision = 1; precision <= MAX_SHORT_PRECISION; precision++) { | ||
int scale = ThreadLocalRandom.current().nextInt(precision); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get very nervous seeing random in tests. If something fails it's not reproducible, Use fixed constant test data instead, even if the fixed values are initially randomly chosen.
throws Exception | ||
{ | ||
for (int precision = 1; precision <= MAX_SHORT_PRECISION; precision++) { | ||
int scale = ThreadLocalRandom.current().nextInt(precision); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
if (!isNested && isShortDecimalType(descriptor)) { | ||
int precision = ((DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation()).getPrecision(); | ||
if (precision < 10) { | ||
log.warn("PrimitiveTypeName is INT64 but precision is less then 10."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete the warning; it's not actionable
or if I'm wrong and this is a real problem, then a waring isn't enough. This should fail outright
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec says we need to produce a warning when precision < 10 for INT64. See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
* is not a common one, just use the existing one provided by Parquet library and add a wrapper around it that satisfies the | ||
* {@link ShortDecimalValuesDecoder} interface. | ||
*/ | ||
public class FixedLenByteArrayShortDecimalDeltaValueDecoder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid abbreviations; thus FixedLen --> FixedLength
but aren't all arrays in java fixed length? So maybe just ByteArrayShortDecimalDeltaValueDecoder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming convention of the Decoder class name here is ParquetPrimitiveType + [Short|Long]Decimal + encoding + ValuesDecoder. The PrimitiveTypeName corresponding to this class is FIXED_LEN_BYTE_ARRAY, so this name is used.
int positionOffset = offsets[i]; | ||
int positionLength = offsets[i + 1] - positionOffset; | ||
byte[] temp = new byte[positionLength]; | ||
System.arraycopy(byteBuffer, positionOffset, temp, 0, positionLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I find Arrays.copyOf a little easier to read; up to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to byte[] temp = Arrays.copyOfRange(byteBuffer, positionOffset, positionOffset + positionLength);
@Override | ||
public void readNext(long[] values, int offset, int length) | ||
{ | ||
final byte[] localByteBuffer = byteBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Danger! Since this local variable is just a reference, byteBuffer is modified anyway when localByteBuffer is. Code below might be correct, I'm not sure, but this variable should be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch.
byte expectedValue = (byte) (bytes[endOffset] >> 7); | ||
for (int i = offset; i < endOffset; i++) { | ||
if (bytes[i] != expectedValue) { | ||
throw new PrestoException(NOT_SUPPORTED, format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string concatenation is simpler here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
// We first shift the byte as left as possible. Then, when shifting back right, | ||
// the sign bit will get propagated | ||
values[offset] = value << 56 >> 56; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order of operations is foggy. Please use parentheses to make this explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
long[] result = new long[size]; | ||
for (int i = 0; i < size; i++) { | ||
result[i] = Math.max( | ||
Math.min(randomLong(random, bitWidth), max), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid random numbers in tests. Test results need to be reproducible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Thank you @elharo for the review! I have updated with a new (squashed) commit, and I have addressed all your comments. Let me know if I missed anything. |
@wypb There is a test failure: Failures: Can you please investigate? THanks! |
@yingsu00 this is actually likely due to facebookincubator/velox#10261 and is being backed out in facebookincubator/velox#10431. It was added in #23138 (which appeared to have been merged in spite of the failure it introduced). See: #23156 |
Hi @tdcmeehan thank you for sharing the information. @yingsu00 I synchronized the latest code, and now CI is all green. |
Thank you @tdcmeehan for letting me know! |
Description
Add support for decimal batch reader
Benchmark(The lower the better)
Impact
When we enable Parquet batch reader(
parquet_batch_read_optimization_enabled
=true), the Decimal type will read data in Batch mode.Test Plan
Contributor checklist
Release Notes
CC: @zhenxiao