diff --git a/go/parquet/metadata/statistics.go b/go/parquet/metadata/statistics.go index 245b06a7348c4..97ba289885fda 100644 --- a/go/parquet/metadata/statistics.go +++ b/go/parquet/metadata/statistics.go @@ -23,6 +23,7 @@ import ( "unsafe" "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/float16" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/apache/arrow/go/v14/internal/utils" "github.com/apache/arrow/go/v14/parquet" @@ -32,7 +33,7 @@ import ( "github.com/apache/arrow/go/v14/parquet/schema" ) -//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=../internal/encoding/physical_types.tmpldata statistics_types.gen.go.tmpl +//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=statistics_types.tmpldata statistics_types.gen.go.tmpl type StatProvider interface { GetMin() []byte @@ -373,6 +374,9 @@ var ( defaultMinUInt96 parquet.Int96 defaultMaxInt96 parquet.Int96 defaultMaxUInt96 parquet.Int96 + + defaultMinFloat16 parquet.FixedLenByteArray = float16.MaxNum.ToLEBytes() + defaultMaxFloat16 parquet.FixedLenByteArray = float16.MinNum.ToLEBytes() ) func init() { @@ -407,6 +411,14 @@ func (s *Int96Statistics) defaultMax() parquet.Int96 { return defaultMaxInt96 } +func (Float16Statistics) defaultMin() parquet.FixedLenByteArray { + return defaultMinFloat16 +} + +func (Float16Statistics) defaultMax() parquet.FixedLenByteArray { + return defaultMaxFloat16 +} + func (Float32Statistics) defaultMin() float32 { return math.MaxFloat32 } func (Float32Statistics) defaultMax() float32 { return -math.MaxFloat32 } func (Float64Statistics) defaultMin() float64 { return math.MaxFloat64 } @@ -427,6 +439,10 @@ func (FixedLenByteArrayStatistics) equal(a, b parquet.FixedLenByteArray) bool { return bytes.Equal(a, b) } +func (Float16Statistics) equal(a, b parquet.FixedLenByteArray) bool { + return float16.FromLEBytes(a).Equal(float16.FromLEBytes(b)) +} + func (BooleanStatistics) less(a, b bool) bool { return !a && b } @@ -481,6 +497,10 @@ func (s *FixedLenByteArrayStatistics) less(a, b parquet.FixedLenByteArray) bool return signedByteLess([]byte(a), []byte(b)) } +func (Float16Statistics) less(a, b parquet.FixedLenByteArray) bool { + return float16.FromLEBytes(a).Less(float16.FromLEBytes(b)) +} + func (BooleanStatistics) cleanStat(minMax minmaxPairBoolean) *minmaxPairBoolean { return &minMax } func (Int32Statistics) cleanStat(minMax minmaxPairInt32) *minmaxPairInt32 { return &minMax } func (Int64Statistics) cleanStat(minMax minmaxPairInt64) *minmaxPairInt64 { return &minMax } @@ -535,6 +555,29 @@ func (Float64Statistics) cleanStat(minMax minmaxPairFloat64) *minmaxPairFloat64 return &minMax } +func (Float16Statistics) cleanStat(minMax minmaxPairFloat16) *minmaxPairFloat16 { + min := float16.FromLEBytes(minMax[0][:]) + max := float16.FromLEBytes(minMax[1][:]) + + if min.IsNaN() || max.IsNaN() { + return nil + } + + if min.Equal(float16.MaxNum) && max.Equal(float16.MinNum) { + return nil + } + + zero := float16.New(0) + if min.Equal(zero) && !min.Signbit() { + minMax[0] = min.Negate().ToLEBytes() + } + if max.Equal(zero) && max.Signbit() { + minMax[1] = max.Negate().ToLEBytes() + } + + return &minMax +} + func (ByteArrayStatistics) cleanStat(minMax minmaxPairByteArray) *minmaxPairByteArray { if minMax[0] == nil || minMax[1] == nil { return nil diff --git a/go/parquet/metadata/statistics_test.go b/go/parquet/metadata/statistics_test.go index 35d8b7821c51a..a241e6946baec 100644 --- a/go/parquet/metadata/statistics_test.go +++ b/go/parquet/metadata/statistics_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/apache/arrow/go/v14/arrow/bitutil" + "github.com/apache/arrow/go/v14/arrow/float16" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/apache/arrow/go/v14/parquet" "github.com/apache/arrow/go/v14/parquet/metadata" @@ -32,24 +33,36 @@ import ( // NOTE(zeroshade): tests will be added and updated after merging the "file" package // since the tests that I wrote relied on the file writer/reader for ease of use. +func newFloat16Node(name string, rep parquet.Repetition, fieldID int32) *schema.PrimitiveNode { + return schema.MustPrimitive(schema.NewPrimitiveNodeLogical(name, rep, schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, fieldID)) +} + func TestCheckNaNs(t *testing.T) { const ( numvals = 8 min = -4.0 max = 3.0 ) - nan := math.NaN() + var ( + nan = math.NaN() + f16Min parquet.FixedLenByteArray = float16.New(float32(min)).ToLEBytes() + f16Max parquet.FixedLenByteArray = float16.New(float32(max)).ToLEBytes() + ) allNans := []float64{nan, nan, nan, nan, nan, nan, nan, nan} allNansf32 := make([]float32, numvals) + allNansf16 := make([]parquet.FixedLenByteArray, numvals) for idx, v := range allNans { allNansf32[idx] = float32(v) + allNansf16[idx] = float16.New(float32(v)).ToLEBytes() } someNans := []float64{nan, max, -3.0, -1.0, nan, 2.0, min, nan} someNansf32 := make([]float32, numvals) + someNansf16 := make([]parquet.FixedLenByteArray, numvals) for idx, v := range someNans { someNansf32[idx] = float32(v) + someNansf16[idx] = float16.New(float32(v)).ToLEBytes() } validBitmap := []byte{0x7F} // 0b01111111 @@ -62,6 +75,8 @@ func TestCheckNaNs(t *testing.T) { s.Update(values.([]float32), 0) case *metadata.Float64Statistics: s.Update(values.([]float64), 0) + case *metadata.Float16Statistics: + s.Update(values.([]parquet.FixedLenByteArray), 0) } assert.False(t, stats.HasMinMax()) } else { @@ -72,6 +87,8 @@ func TestCheckNaNs(t *testing.T) { s.UpdateSpaced(values.([]float32), bitmap, 0, int64(nullCount)) case *metadata.Float64Statistics: s.UpdateSpaced(values.([]float64), bitmap, 0, int64(nullCount)) + case *metadata.Float16Statistics: + s.UpdateSpaced(values.([]parquet.FixedLenByteArray), bitmap, 0, int64(nullCount)) } assert.False(t, stats.HasMinMax()) } @@ -89,6 +106,11 @@ func TestCheckNaNs(t *testing.T) { assert.True(t, stats.HasMinMax()) assert.Equal(t, expectedMin, s.Min()) assert.Equal(t, expectedMax, s.Max()) + case *metadata.Float16Statistics: + s.Update(values.([]parquet.FixedLenByteArray), 0) + assert.True(t, stats.HasMinMax()) + assert.Equal(t, expectedMin, s.Min()) + assert.Equal(t, expectedMax, s.Max()) } } @@ -106,34 +128,48 @@ func TestCheckNaNs(t *testing.T) { assert.True(t, s.HasMinMax()) assert.Equal(t, expectedMin, s.Min()) assert.Equal(t, expectedMax, s.Max()) + case *metadata.Float16Statistics: + s.UpdateSpaced(values.([]parquet.FixedLenByteArray), bitmap, 0, int64(nullCount)) + assert.True(t, s.HasMinMax()) + assert.Equal(t, expectedMin, s.Min()) + assert.Equal(t, expectedMax, s.Max()) } } f32Col := schema.NewColumn(schema.NewFloat32Node("f", parquet.Repetitions.Optional, -1), 1, 1) f64Col := schema.NewColumn(schema.NewFloat64Node("f", parquet.Repetitions.Optional, -1), 1, 1) + f16Col := schema.NewColumn(newFloat16Node("f", parquet.Repetitions.Required, -1), 1, 1) // test values someNanStats := metadata.NewStatistics(f64Col, memory.DefaultAllocator) someNanStatsf32 := metadata.NewStatistics(f32Col, memory.DefaultAllocator) + someNanStatsf16 := metadata.NewStatistics(f16Col, memory.DefaultAllocator) // ingesting only nans should not yield a min or max assertUnsetMinMax(someNanStats, allNans, nil) assertUnsetMinMax(someNanStatsf32, allNansf32, nil) + assertUnsetMinMax(someNanStatsf16, allNansf16, nil) // ingesting a mix should yield a valid min/max assertMinMaxAre(someNanStats, someNans, min, max) assertMinMaxAre(someNanStatsf32, someNansf32, float32(min), float32(max)) + assertMinMaxAre(someNanStatsf16, someNansf16, f16Min, f16Max) // ingesting only nans after a valid min/max should have no effect assertMinMaxAre(someNanStats, allNans, min, max) assertMinMaxAre(someNanStatsf32, allNansf32, float32(min), float32(max)) + assertMinMaxAre(someNanStatsf16, allNansf16, f16Min, f16Max) someNanStats = metadata.NewStatistics(f64Col, memory.DefaultAllocator) someNanStatsf32 = metadata.NewStatistics(f32Col, memory.DefaultAllocator) + someNanStatsf16 = metadata.NewStatistics(f16Col, memory.DefaultAllocator) assertUnsetMinMax(someNanStats, allNans, validBitmap) assertUnsetMinMax(someNanStatsf32, allNansf32, validBitmap) + assertUnsetMinMax(someNanStatsf16, allNansf16, validBitmap) // nans should not pollute min/max when excluded via null bitmap assertMinMaxAreSpaced(someNanStats, someNans, validBitmapNoNaNs, min, max) assertMinMaxAreSpaced(someNanStatsf32, someNansf32, validBitmapNoNaNs, float32(min), float32(max)) + assertMinMaxAreSpaced(someNanStatsf16, someNansf16, validBitmapNoNaNs, f16Min, f16Max) // ingesting nans with a null bitmap should not change the result assertMinMaxAreSpaced(someNanStats, someNans, validBitmap, min, max) assertMinMaxAreSpaced(someNanStatsf32, someNansf32, validBitmap, float32(min), float32(max)) + assertMinMaxAreSpaced(someNanStatsf16, someNansf16, validBitmap, f16Min, f16Max) } func TestCheckNegativeZeroStats(t *testing.T) { @@ -155,37 +191,61 @@ func TestCheckNegativeZeroStats(t *testing.T) { assert.True(t, math.Signbit(s.Min())) assert.Equal(t, zero, s.Max()) assert.False(t, math.Signbit(s.Max())) + case *metadata.Float16Statistics: + s.Update(values.([]parquet.FixedLenByteArray), 0) + assert.True(t, s.HasMinMax()) + var zero float64 + min := float64(float16.FromLEBytes(s.Min()).Float32()) + max := float64(float16.FromLEBytes(s.Max()).Float32()) + assert.Equal(t, zero, min) + assert.True(t, math.Signbit(min)) + assert.Equal(t, zero, max) + assert.False(t, math.Signbit(max)) } } fcol := schema.NewColumn(schema.NewFloat32Node("f", parquet.Repetitions.Optional, -1), 1, 1) dcol := schema.NewColumn(schema.NewFloat64Node("d", parquet.Repetitions.Optional, -1), 1, 1) + hcol := schema.NewColumn(newFloat16Node("h", parquet.Repetitions.Optional, -1), 1, 1) var f32zero float32 var f64zero float64 + var f16PosZero parquet.FixedLenByteArray = float16.New(+f32zero).ToLEBytes() + var f16NegZero parquet.FixedLenByteArray = float16.New(-f32zero).ToLEBytes() + + assert.False(t, float16.FromLEBytes(f16PosZero).Signbit()) + assert.True(t, float16.FromLEBytes(f16NegZero).Signbit()) { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{-f32zero, f32zero}) assertMinMaxZeroesSign(dstats, []float64{-f64zero, f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16NegZero, f16PosZero}) } { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{f32zero, -f32zero}) assertMinMaxZeroesSign(dstats, []float64{f64zero, -f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16PosZero, f16NegZero}) } { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{-f32zero, -f32zero}) assertMinMaxZeroesSign(dstats, []float64{-f64zero, -f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16NegZero, f16NegZero}) } { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{f32zero, f32zero}) assertMinMaxZeroesSign(dstats, []float64{f64zero, f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16PosZero, f16PosZero}) } } diff --git a/go/parquet/metadata/statistics_types.gen.go b/go/parquet/metadata/statistics_types.gen.go index e6aa7f1801a0f..6a0723f3a0895 100644 --- a/go/parquet/metadata/statistics_types.gen.go +++ b/go/parquet/metadata/statistics_types.gen.go @@ -24,6 +24,7 @@ import ( "github.com/apache/arrow/go/v14/arrow" "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/float16" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/apache/arrow/go/v14/internal/bitutils" shared_utils "github.com/apache/arrow/go/v14/internal/utils" @@ -2432,6 +2433,314 @@ func (s *FixedLenByteArrayStatistics) Encode() (enc EncodedStatistics, err error return } +type minmaxPairFloat16 [2]parquet.FixedLenByteArray + +// Float16Statistics is the typed interface for managing stats for a column +// of Float16 type. +type Float16Statistics struct { + statistics + min parquet.FixedLenByteArray + max parquet.FixedLenByteArray + + bitSetReader bitutils.SetBitRunReader +} + +// NewFloat16Statistics constructs an appropriate stat object type using the +// given column descriptor and allocator. +// +// Panics if the physical type of descr is not parquet.Type.FixedLenByteArray +// Panics if the logical type of descr is not schema.Float16LogicalType +func NewFloat16Statistics(descr *schema.Column, mem memory.Allocator) *Float16Statistics { + if descr.PhysicalType() != parquet.Types.FixedLenByteArray { + panic(fmt.Errorf("parquet: invalid type %s for constructing a Float16 stat object", descr.PhysicalType())) + } + if !descr.LogicalType().Equals(schema.Float16LogicalType{}) { + panic(fmt.Errorf("parquet: invalid logical type %s for constructing a Float16 stat object", descr.LogicalType().String())) + } + + return &Float16Statistics{ + statistics: statistics{ + descr: descr, + hasNullCount: true, + hasDistinctCount: true, + order: descr.SortOrder(), + encoder: encoding.NewEncoder(descr.PhysicalType(), parquet.Encodings.Plain, false, descr, mem), + mem: mem, + }, + } +} + +// NewFloat16StatisticsFromEncoded will construct a propertly typed statistics object +// initializing it with the provided information. +func NewFloat16StatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) *Float16Statistics { + ret := NewFloat16Statistics(descr, mem) + ret.nvalues += nvalues + if encoded.IsSetNullCount() { + ret.IncNulls(encoded.GetNullCount()) + } + if encoded.IsSetDistinctCount() { + ret.IncDistinct(encoded.GetDistinctCount()) + } + + encodedMin := encoded.GetMin() + if encodedMin != nil && len(encodedMin) > 0 { + ret.min = ret.plainDecode(encodedMin) + } + encodedMax := encoded.GetMax() + if encodedMax != nil && len(encodedMax) > 0 { + ret.max = ret.plainDecode(encodedMax) + } + ret.hasMinMax = encoded.IsSetMax() || encoded.IsSetMin() + return ret +} + +func (s *Float16Statistics) plainEncode(src parquet.FixedLenByteArray) []byte { + s.encoder.(encoding.FixedLenByteArrayEncoder).Put([]parquet.FixedLenByteArray{src}) + buf, err := s.encoder.FlushValues() + if err != nil { + panic(err) // recovered by Encode + } + defer buf.Release() + + out := make([]byte, buf.Len()) + copy(out, buf.Bytes()) + return out +} + +func (s *Float16Statistics) plainDecode(src []byte) parquet.FixedLenByteArray { + var buf [1]parquet.FixedLenByteArray + + decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem) + decoder.SetData(1, src) + decoder.(encoding.FixedLenByteArrayDecoder).Decode(buf[:]) + return buf[0] +} + +func (s *Float16Statistics) minval(a, b parquet.FixedLenByteArray) parquet.FixedLenByteArray { + switch { + case a == nil: + return b + case b == nil: + return a + case s.less(a, b): + return a + default: + return b + } +} + +func (s *Float16Statistics) maxval(a, b parquet.FixedLenByteArray) parquet.FixedLenByteArray { + switch { + case a == nil: + return b + case b == nil: + return a + case s.less(a, b): + return b + default: + return a + } +} + +// MinMaxEqual returns true if both stat objects have the same Min and Max values +func (s *Float16Statistics) MinMaxEqual(rhs *Float16Statistics) bool { + return s.equal(s.min, rhs.min) && s.equal(s.max, rhs.max) +} + +// Equals returns true only if both objects are the same type, have the same min and +// max values, null count, distinct count and number of values. +func (s *Float16Statistics) Equals(other TypedStatistics) bool { + if s.Type() != other.Type() || !s.descr.LogicalType().Equals(other.Descr().LogicalType()) { + return false + } + rhs, ok := other.(*Float16Statistics) + if !ok { + return false + } + + if s.HasMinMax() != rhs.HasMinMax() { + return false + } + return (s.hasMinMax && s.MinMaxEqual(rhs)) && + s.NullCount() == rhs.NullCount() && + s.DistinctCount() == rhs.DistinctCount() && + s.NumValues() == rhs.NumValues() +} + +func (s *Float16Statistics) coalesce(val, fallback parquet.FixedLenByteArray) parquet.FixedLenByteArray { + if float16.FromLEBytes(val).IsNaN() { + return fallback + } + return val +} + +func (s *Float16Statistics) getMinMax(values []parquet.FixedLenByteArray) (min, max parquet.FixedLenByteArray) { + defMin := s.defaultMin() + defMax := s.defaultMax() + + min = defMin + max = defMax + + for _, v := range values { + min = s.minval(min, s.coalesce(v, defMin)) + max = s.maxval(max, s.coalesce(v, defMax)) + } + return +} + +func (s *Float16Statistics) getMinMaxSpaced(values []parquet.FixedLenByteArray, validBits []byte, validBitsOffset int64) (min, max parquet.FixedLenByteArray) { + min = s.defaultMin() + max = s.defaultMax() + + if s.bitSetReader == nil { + s.bitSetReader = bitutils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(values))) + } else { + s.bitSetReader.Reset(validBits, validBitsOffset, int64(len(values))) + } + + for { + run := s.bitSetReader.NextRun() + if run.Length == 0 { + break + } + for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] { + min = s.minval(min, coalesce(v, s.defaultMin()).(parquet.FixedLenByteArray)) + max = s.maxval(max, coalesce(v, s.defaultMax()).(parquet.FixedLenByteArray)) + } + } + return +} + +func (s *Float16Statistics) Min() parquet.FixedLenByteArray { return s.min } +func (s *Float16Statistics) Max() parquet.FixedLenByteArray { return s.max } + +// Merge merges the stats from other into this stat object, updating +// the null count, distinct count, number of values and the min/max if +// appropriate. +func (s *Float16Statistics) Merge(other TypedStatistics) { + rhs, ok := other.(*Float16Statistics) + if !ok { + panic("incompatible stat type merge") + } + + s.statistics.merge(rhs) + if rhs.HasMinMax() { + s.SetMinMax(rhs.Min(), rhs.Max()) + } +} + +// Update is used to add more values to the current stat object, finding the +// min and max values etc. +func (s *Float16Statistics) Update(values []parquet.FixedLenByteArray, numNull int64) { + s.IncNulls(numNull) + s.nvalues += int64(len(values)) + + if len(values) == 0 { + return + } + + s.SetMinMax(s.getMinMax(values)) +} + +// UpdateSpaced is just like Update, but for spaced values using validBits to determine +// and skip null values. +func (s *Float16Statistics) UpdateSpaced(values []parquet.FixedLenByteArray, validBits []byte, validBitsOffset, numNull int64) { + s.IncNulls(numNull) + notnull := int64(len(values)) - numNull + s.nvalues += notnull + + if notnull == 0 { + return + } + + s.SetMinMax(s.getMinMaxSpaced(values, validBits, validBitsOffset)) +} + +func (s *Float16Statistics) UpdateFromArrow(values arrow.Array, updateCounts bool) error { + if updateCounts { + s.IncNulls(int64(values.NullN())) + s.nvalues += int64(values.Len() - values.NullN()) + } + + if values.NullN() == values.Len() { + return nil + } + + return fmt.Errorf("%w: update float16 stats from Arrow", arrow.ErrNotImplemented) +} + +// SetMinMax updates the min and max values only if they are not currently set +// or if argMin is less than the current min / argMax is greater than the current max +func (s *Float16Statistics) SetMinMax(argMin, argMax parquet.FixedLenByteArray) { + maybeMinMax := s.cleanStat([2]parquet.FixedLenByteArray{argMin, argMax}) + if maybeMinMax == nil { + return + } + + min := (*maybeMinMax)[0] + max := (*maybeMinMax)[1] + + if !s.hasMinMax { + s.hasMinMax = true + s.min = min + s.max = max + } else { + if !s.less(s.min, min) { + s.min = min + } + if s.less(s.max, max) { + s.max = max + } + } +} + +// EncodeMin returns the encoded min value with plain encoding. +// +// ByteArray stats do not include the length in the encoding. +func (s *Float16Statistics) EncodeMin() []byte { + if s.HasMinMax() { + return s.plainEncode(s.min) + } + return nil +} + +// EncodeMax returns the current encoded max value with plain encoding +// +// ByteArray stats do not include the length in the encoding +func (s *Float16Statistics) EncodeMax() []byte { + if s.HasMinMax() { + return s.plainEncode(s.max) + } + return nil +} + +// Encode returns a populated EncodedStatistics object +func (s *Float16Statistics) Encode() (enc EncodedStatistics, err error) { + defer func() { + if r := recover(); r != nil { + switch r := r.(type) { + case error: + err = r + case string: + err = xerrors.New(r) + default: + err = fmt.Errorf("unknown error type thrown from panic: %v", r) + } + } + }() + if s.HasMinMax() { + enc.SetMax(s.EncodeMax()) + enc.SetMin(s.EncodeMin()) + } + if s.HasNullCount() { + enc.SetNullCount(s.NullCount()) + } + if s.HasDistinctCount() { + enc.SetDistinctCount(s.DistinctCount()) + } + return +} + // NewStatistics uses the type in the column descriptor to construct the appropriate // typed stats object. If mem is nil, then memory.DefaultAllocator will be used. func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { @@ -2454,6 +2763,9 @@ func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { case parquet.Types.ByteArray: return NewByteArrayStatistics(descr, mem) case parquet.Types.FixedLenByteArray: + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16Statistics(descr, mem) + } return NewFixedLenByteArrayStatistics(descr, mem) default: panic("not implemented") @@ -2484,6 +2796,9 @@ func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalue case parquet.Types.ByteArray: return NewByteArrayStatisticsFromEncoded(descr, mem, nvalues, encoded) case parquet.Types.FixedLenByteArray: + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16StatisticsFromEncoded(descr, mem, nvalues, encoded) + } return NewFixedLenByteArrayStatisticsFromEncoded(descr, mem, nvalues, encoded) default: panic("not implemented") diff --git a/go/parquet/metadata/statistics_types.gen.go.tmpl b/go/parquet/metadata/statistics_types.gen.go.tmpl index 35470f06046bf..cf2de0b55d7e4 100644 --- a/go/parquet/metadata/statistics_types.gen.go.tmpl +++ b/go/parquet/metadata/statistics_types.gen.go.tmpl @@ -45,10 +45,18 @@ type {{.Name}}Statistics struct { // given column descriptor and allocator. // // Panics if the physical type of descr is not parquet.Type.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} +{{- if eq .Name "Float16"}} +// Panics if the logical type of descr is not schema.Float16LogicalType +{{- end}} func New{{.Name}}Statistics(descr *schema.Column, mem memory.Allocator) *{{.Name}}Statistics { if descr.PhysicalType() != parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} { panic(fmt.Errorf("parquet: invalid type %s for constructing a {{.Name}} stat object", descr.PhysicalType())) } +{{- if eq .Name "Float16"}} + if !descr.LogicalType().Equals(schema.Float16LogicalType{}) { + panic(fmt.Errorf("parquet: invalid logical type %s for constructing a {{.Name}} stat object", descr.LogicalType().String())) + } +{{- end}} return &{{.Name}}Statistics{ statistics: statistics{ @@ -96,7 +104,7 @@ func (s *{{.Name}}Statistics) plainEncode(src {{.name}}) []byte { copy(out, src) return out {{- else}} - s.encoder.(encoding.{{.Name}}Encoder).Put([]{{.name}}{src}) + s.encoder.(encoding.{{if .logical}}{{.physical}}{{else}}{{.Name}}{{end}}Encoder).Put([]{{.name}}{src}) buf, err := s.encoder.FlushValues() if err != nil { panic(err) // recovered by Encode @@ -117,12 +125,12 @@ func (s *{{.Name}}Statistics) plainDecode(src []byte) {{.name}} { decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem) decoder.SetData(1, src) - decoder.(encoding.{{.Name}}Decoder).Decode(buf[:]) + decoder.(encoding.{{if .logical}}{{.physical}}{{else}}{{.Name}}{{end}}Decoder).Decode(buf[:]) return buf[0] {{- end}} } -{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}} +{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray") (ne .Name "Float16")}} func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { if s.less(a, b) { return a @@ -172,7 +180,11 @@ func (s *{{.Name}}Statistics) MinMaxEqual(rhs *{{.Name}}Statistics) bool { // Equals returns true only if both objects are the same type, have the same min and // max values, null count, distinct count and number of values. func (s *{{.Name}}Statistics) Equals(other TypedStatistics) bool { +{{- if .logical}} + if s.Type() != other.Type() || !s.descr.LogicalType().Equals(other.Descr().LogicalType()) { +{{- else}} if s.Type() != other.Type() { +{{- end}} return false } rhs, ok := other.(*{{.Name}}Statistics) @@ -194,6 +206,13 @@ func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} { } return val } +{{else if eq .Name "Float16"}} +func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} { + if float16.FromLEBytes(val).IsNaN() { + return fallback + } + return val +} {{end}} func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) { @@ -212,7 +231,7 @@ func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) max = defMax for _, v := range values { -{{- if or (eq .name "float32") (eq .name "float64") }} +{{- if or (eq .name "float32") (eq .name "float64") (eq .Name "Float16") }} min = s.minval(min, s.coalesce(v, defMin)) max = s.maxval(max, s.coalesce(v, defMax)) {{- else}} @@ -261,7 +280,7 @@ func (s *{{.Name}}Statistics) getMinMaxSpaced(values []{{.name}}, validBits []by } {{- else}} for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] { -{{- if or (eq .name "float32") (eq .name "float64") }} +{{- if or (eq .name "float32") (eq .name "float64") (eq .Name "Float16") }} min = s.minval(min, coalesce(v, s.defaultMin()).({{.name}})) max = s.maxval(max, coalesce(v, s.defaultMax()).({{.name}})) {{- else}} @@ -381,7 +400,9 @@ func (s *{{.Name}}Statistics) UpdateFromArrow(values arrow.Array, updateCounts b s.SetMinMax(min, max) return nil {{else if eq .Name "Boolean"}} - return fmt.Errorf("%w: update boolean stats from Arrow", arrow.ErrNotImplemented) + return fmt.Errorf("%w: update boolean stats from Arrow", arrow.ErrNotImplemented) +{{else if eq .Name "Float16"}} + return fmt.Errorf("%w: update float16 stats from Arrow", arrow.ErrNotImplemented) {{else}} if values.DataType().(arrow.FixedWidthDataType).Bytes() != arrow.{{.Name}}SizeBytes { return fmt.Errorf("%w: cannot update {{.name}} stats with %s arrow array", @@ -475,8 +496,15 @@ func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { } switch descr.PhysicalType() { {{- range .In}} + {{- if not .logical}} case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: + {{- if eq .Name "FixedLenByteArray"}} + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16Statistics(descr, mem) + } + {{- end}} return New{{.Name}}Statistics(descr, mem) + {{- end}} {{- end}} default: panic("not implemented") @@ -493,8 +521,15 @@ func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalue } switch descr.PhysicalType() { {{- range .In}} + {{- if not .logical}} case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: + {{- if eq .Name "FixedLenByteArray"}} + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16StatisticsFromEncoded(descr, mem, nvalues, encoded) + } + {{- end}} return New{{.Name}}StatisticsFromEncoded(descr, mem, nvalues, encoded) + {{- end}} {{- end}} default: panic("not implemented") diff --git a/go/parquet/metadata/statistics_types.tmpldata b/go/parquet/metadata/statistics_types.tmpldata new file mode 100644 index 0000000000000..400c0a3ca515d --- /dev/null +++ b/go/parquet/metadata/statistics_types.tmpldata @@ -0,0 +1,60 @@ +[ + { + "Name": "Int32", + "name": "int32", + "lower": "int32", + "prefix": "arrow" + }, + { + "Name": "Int64", + "name": "int64", + "lower": "int64", + "prefix": "arrow" + }, + { + "Name": "Int96", + "name": "parquet.Int96", + "lower": "int96", + "prefix": "parquet" + }, + { + "Name": "Float32", + "name": "float32", + "lower": "float32", + "prefix": "arrow", + "physical": "Float" + }, + { + "Name": "Float64", + "name": "float64", + "lower": "float64", + "prefix": "arrow", + "physical": "Double" + }, + { + "Name": "Boolean", + "name": "bool", + "lower": "bool", + "prefix": "arrow" + }, + { + "Name": "ByteArray", + "name": "parquet.ByteArray", + "lower": "byteArray", + "prefix": "parquet" + }, + { + "Name": "FixedLenByteArray", + "name": "parquet.FixedLenByteArray", + "lower": "fixedLenByteArray", + "prefix": "parquet" + }, + { + "Name": "Float16", + "name": "parquet.FixedLenByteArray", + "lower": "float16", + "prefix": "parquet", + "physical": "FixedLenByteArray", + "logical": "Float16LogicalType" + } +]