Skip to content

Commit

Permalink
apacheGH-39466: [Go][Parquet] Align Arrow and Parquet Timestamp Insta…
Browse files Browse the repository at this point in the history
…nt/Local Semantics (apache#39467)

### Rationale for this change

Closes: apache#39466 

### What changes are included in this PR?

- Update logic for determining whether an Arrow Timestamp should have `isAdjustedToUTC=true` on conversion to Parquet.
- Update conversion from Parquet Timestamp to Arrow Timestamp to align with Parquet Format [backward-compatibilty](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485) rules.
- Refactor Timestamp serialization methods to reduce duplicated code

### Are these changes tested?

Yes,
- Logical type mapping in existing test updated.
- New tests for roundtrip behavior of timestamps with various timezone settings, with/without store_schema enabled.
- New test to clarify equality behavior of timestamps with instant semantics, as well as Go-related quirks with timezone-unaware timestamps.

### Are there any user-facing changes?

Yes, users of `pqarrow.FileWriter` will produce Parquet files in which the `TIMESTAMP` type is normalized to UTC IFF the Arrow type provided has a timezone specified. This is different from the current Go behavior but aligned that of other implementations.

The conversion from Parquet to Arrow has been updated as well to reflect the Parquet format [document](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485). Rust already [implements](https://github.com/apache/arrow-rs/blob/a61e824abdd7b38ea214828480430ff2a13f2ead/parquet/src/arrow/schema/primitive.rs#L211-L239) the spec as described and apache#39489 has been reported due to a mismatch in the handling of convertedTypes in C++.

* Closes: apache#39466

Authored-by: Joel Lubinitsky <joel@cherre.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
joellubi authored and zanmato1984 committed Feb 28, 2024
1 parent eba55e0 commit 18264f5
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 28 deletions.
11 changes: 5 additions & 6 deletions go/arrow/array/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,15 @@ func (a *Timestamp) ValueStr(i int) string {
return NullValueStr
}

dt := a.DataType().(*arrow.TimestampType)
z, _ := dt.GetZone()
return a.values[i].ToTime(dt.Unit).In(z).Format("2006-01-02 15:04:05.999999999Z0700")
toTime, _ := a.DataType().(*arrow.TimestampType).GetToTimeFunc()
return toTime(a.values[i]).Format("2006-01-02 15:04:05.999999999Z0700")
}

func (a *Timestamp) GetOneForMarshal(i int) interface{} {
if a.IsNull(i) {
return nil
if val := a.ValueStr(i); val != NullValueStr {
return val
}
return a.values[i].ToTime(a.DataType().(*arrow.TimestampType).Unit).Format("2006-01-02 15:04:05.999999999")
return nil
}

func (a *Timestamp) MarshalJSON() ([]byte, error) {
Expand Down
49 changes: 48 additions & 1 deletion go/arrow/array/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestTimestampBuilder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}

func TestTimestampValueStr(t *testing.T) {
func TestTimestampValueStr(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

Expand All @@ -251,3 +251,50 @@ func TestTimestampValueStr(t *testing.T) {
assert.Equal(t, "1968-11-30 13:30:45-0700", arr.ValueStr(0))
assert.Equal(t, "2016-02-29 10:42:23-0700", arr.ValueStr(1))
}

func TestTimestampEquality(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

tsDatatypes := []*arrow.TimestampType{
{Unit: arrow.Second},
{Unit: arrow.Second, TimeZone: "UTC"},
{Unit: arrow.Second, TimeZone: "America/Phoenix"},
}

arrs := make([]*array.Timestamp, 0, len(tsDatatypes))
for _, dt := range tsDatatypes {
bldr := array.NewTimestampBuilder(mem, dt)
defer bldr.Release()

bldr.Append(-34226955)
bldr.Append(1456767743)

arr := bldr.NewTimestampArray()
defer arr.Release()

arrs = append(arrs, arr)
}

// No timezone, "wall clock" semantics
// These timestamps have no actual timezone, but we still represent as UTC per Go conventions
assert.Equal(t, "1968-11-30 20:30:45Z", arrs[0].ValueStr(0))
assert.Equal(t, "2016-02-29 17:42:23Z", arrs[0].ValueStr(1))

// UTC timezone, "instant" semantics
assert.Equal(t, "1968-11-30 20:30:45Z", arrs[1].ValueStr(0))
assert.Equal(t, "2016-02-29 17:42:23Z", arrs[1].ValueStr(1))

// America/Phoenix timezone, "instant" semantics
assert.Equal(t, "1968-11-30 13:30:45-0700", arrs[2].ValueStr(0))
assert.Equal(t, "2016-02-29 10:42:23-0700", arrs[2].ValueStr(1))

// Despite timezone and semantics, the physical values are equivalent
assert.Equal(t, arrs[0].Value(0), arrs[1].Value(0))
assert.Equal(t, arrs[0].Value(0), arrs[2].Value(0))
assert.Equal(t, arrs[1].Value(0), arrs[2].Value(0))

assert.Equal(t, arrs[0].Value(1), arrs[1].Value(1))
assert.Equal(t, arrs[0].Value(1), arrs[2].Value(1))
assert.Equal(t, arrs[1].Value(1), arrs[2].Value(1))
}
19 changes: 6 additions & 13 deletions go/arrow/datatype_fixedwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,11 @@ type TemporalWithUnit interface {
}

// TimestampType is encoded as a 64-bit signed integer since the UNIX epoch (2017-01-01T00:00:00Z).
// The zero-value is a second and time zone neutral. Time zone neutral can be
// considered UTC without having "UTC" as a time zone.
// The zero-value is a second and time zone neutral. In Arrow semantics, time zone neutral does not
// represent a physical point in time, but rather a "wall clock" time that only has meaning within
// the context that produced it. In Go, time.Time can only represent instants; there is no notion
// of "wall clock" time. Therefore, time zone neutral timestamps are represented as UTC per Go
// conventions even though the Arrow type itself has no time zone.
type TimestampType struct {
Unit TimeUnit
TimeZone string
Expand Down Expand Up @@ -454,17 +457,7 @@ func (t *TimestampType) GetToTimeFunc() (func(Timestamp) time.Time, error) {
return nil, err
}

switch t.Unit {
case Second:
return func(v Timestamp) time.Time { return time.Unix(int64(v), 0).In(tz) }, nil
case Millisecond:
return func(v Timestamp) time.Time { return time.UnixMilli(int64(v)).In(tz) }, nil
case Microsecond:
return func(v Timestamp) time.Time { return time.UnixMicro(int64(v)).In(tz) }, nil
case Nanosecond:
return func(v Timestamp) time.Time { return time.Unix(0, int64(v)).In(tz) }, nil
}
return nil, fmt.Errorf("invalid timestamp unit: %s", t.Unit)
return func(v Timestamp) time.Time { return v.ToTime(t.Unit).In(tz) }, nil
}

// Time32Type is encoded as a 32-bit signed integer, representing either seconds or milliseconds since midnight.
Expand Down
70 changes: 70 additions & 0 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,41 @@ func makeDateTypeTable(mem memory.Allocator, expected bool, partialDays bool) ar
return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
}

func makeTimestampTypeTable(mem memory.Allocator, expected bool) arrow.Table {
isValid := []bool{true, true, true, false, true, true}

// Timestamp with relative (i.e. local) semantics. Make sure it roundtrips without being incorrectly converted to an absolute point in time.
f0 := arrow.Field{Name: "f0", Type: &arrow.TimestampType{Unit: arrow.Millisecond}, Nullable: true, Metadata: arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"1"})}

// Timestamp with absolute (i.e. instant) semantics. The physical representation is always from Unix epoch in UTC timezone.
// TimeZone is used for display purposes and can be stripped on roundtrip without changing the actual instant referred to.
// WithStoreSchema will preserve the original timezone, but the instant in will be equivalent even if it's not used.
f1 := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "EST"}, Nullable: true, Metadata: arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})}
f1X := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}, Nullable: true, Metadata: arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})}

fieldList := []arrow.Field{f0}
if expected {
fieldList = append(fieldList, f1X)
} else {
fieldList = append(fieldList, f1)
}

arrsc := arrow.NewSchema(fieldList, nil)

ts64msValues := []arrow.Timestamp{1489269, 1489270, 1489271, 1489272, 1489272, 1489273}

bldr := array.NewRecordBuilder(mem, arrsc)
defer bldr.Release()

bldr.Field(0).(*array.TimestampBuilder).AppendValues(ts64msValues, isValid)
bldr.Field(1).(*array.TimestampBuilder).AppendValues(ts64msValues, isValid)

rec := bldr.NewRecord()
defer rec.Release()

return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
}

func TestWriteArrowCols(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
Expand Down Expand Up @@ -954,6 +989,25 @@ func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() {
ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable)
}

func (ps *ParquetIOTestSuite) TestTimestampTZReadWriteTable() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)

inputTable := makeTimestampTypeTable(mem, false)
defer inputTable.Release()
buf := writeTableToBuffer(ps.T(), mem, inputTable, inputTable.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
defer buf.Release()

reader := ps.createReader(mem, buf.Bytes())
roundTripOutputTable := ps.readTable(reader)
defer roundTripOutputTable.Release()

expectedOutputTable := makeTimestampTypeTable(mem, true)
defer expectedOutputTable.Release()

ps.Truef(array.TableEqual(expectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", expectedOutputTable, roundTripOutputTable)
}

func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)
Expand All @@ -973,6 +1027,22 @@ func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() {
ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable)
}

func (ps *ParquetIOTestSuite) TestTimestampTZStoreSchemaReadWriteTable() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)

inputTable := makeTimestampTypeTable(mem, false)
defer inputTable.Release()
buf := writeTableToBuffer(ps.T(), mem, inputTable, inputTable.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem), pqarrow.WithStoreSchema()))
defer buf.Release()

reader := ps.createReader(mem, buf.Bytes())
roundTripOutputTable := ps.readTable(reader)
defer roundTripOutputTable.Release()

ps.Truef(array.TableEqual(inputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", inputTable, roundTripOutputTable)
}

func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)
Expand Down
13 changes: 8 additions & 5 deletions go/parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func isDictionaryReadSupported(dt arrow.DataType) bool {
}

func arrowTimestampToLogical(typ *arrow.TimestampType, unit arrow.TimeUnit) schema.LogicalType {
utc := typ.TimeZone == "" || typ.TimeZone == "UTC"
isAdjustedToUTC := typ.TimeZone != ""

// for forward compatibility reasons, and because there's no other way
// to signal to old readers that values are timestamps, we force
Expand All @@ -146,7 +146,7 @@ func arrowTimestampToLogical(typ *arrow.TimestampType, unit arrow.TimeUnit) sche
return schema.NoLogicalType{}
}

return schema.NewTimestampLogicalTypeForce(utc, scunit)
return schema.NewTimestampLogicalTypeForce(isAdjustedToUTC, scunit)
}

func getTimestampMeta(typ *arrow.TimestampType, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (parquet.Type, schema.LogicalType, error) {
Expand Down Expand Up @@ -519,9 +519,12 @@ func arrowTime64(logical *schema.TimeLogicalType) (arrow.DataType, error) {
}

func arrowTimestamp(logical *schema.TimestampLogicalType) (arrow.DataType, error) {
tz := "UTC"
if logical.IsFromConvertedType() {
tz = ""
tz := ""

// ConvertedTypes are adjusted to UTC per backward compatibility guidelines
// https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485
if logical.IsAdjustedToUTC() || logical.IsFromConvertedType() {
tz = "UTC"
}

switch logical.TimeUnit() {
Expand Down
6 changes: 3 additions & 3 deletions go/parquet/pqarrow/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func TestCoerceTImestampV1(t *testing.T) {
arrowFields := make([]arrow.Field, 0)

parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("timestamp", parquet.Repetitions.Required,
schema.NewTimestampLogicalTypeForce(false, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
schema.NewTimestampLogicalTypeForce(true, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type: &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "EST"}})

arrowSchema := arrow.NewSchema(arrowFields, nil)
Expand All @@ -323,11 +323,11 @@ func TestAutoCoerceTImestampV1(t *testing.T) {
arrowFields := make([]arrow.Field, 0)

parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("timestamp", parquet.Repetitions.Required,
schema.NewTimestampLogicalTypeForce(false, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
schema.NewTimestampLogicalTypeForce(true, schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type: &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "EST"}})

parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("timestamp[ms]", parquet.Repetitions.Required,
schema.NewTimestampLogicalTypeForce(true, schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
schema.NewTimestampLogicalTypeForce(false, schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
arrowFields = append(arrowFields, arrow.Field{Name: "timestamp[ms]", Type: &arrow.TimestampType{Unit: arrow.Second}})

arrowSchema := arrow.NewSchema(arrowFields, nil)
Expand Down

0 comments on commit 18264f5

Please sign in to comment.