Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35775: [Go][Parquet] Allow key value file metadata to be written after writing row groups #37786

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions go/parquet/file/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,24 @@ type Writer struct {

// The Schema of this writer
Schema *schema.Schema
// The current FileMetadata to write
FileMetadata *metadata.FileMetaData
// The current keyvalue metadata
KeyValueMetadata metadata.KeyValueMetadata
}

type WriteOption func(*Writer)
type writerConfig struct {
props *parquet.WriterProperties
keyValueMetadata metadata.KeyValueMetadata
}

type WriteOption func(*writerConfig)

func WithWriterProps(props *parquet.WriterProperties) WriteOption {
return func(w *Writer) {
w.props = props
return func(c *writerConfig) {
c.props = props
}
}

func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption {
return func(w *Writer) {
w.KeyValueMetadata = meta
return func(c *writerConfig) {
c.keyValueMetadata = meta
}
}

Expand All @@ -66,19 +67,23 @@ func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption {
// If props is nil, then the default Writer Properties will be used. If the key value metadata is not nil,
// it will be added to the file.
func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *Writer {
config := &writerConfig{}
for _, o := range opts {
o(config)
}
if config.props == nil {
config.props = parquet.NewWriterProperties()
}

fileSchema := schema.NewSchema(sc)
fw := &Writer{
props: config.props,
sink: &utils.TellWrapper{Writer: w},
open: true,
Schema: fileSchema,
}
for _, o := range opts {
o(fw)
}
if fw.props == nil {
fw.props = parquet.NewWriterProperties()
}
fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, fw.KeyValueMetadata)

fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, config.keyValueMetadata)
fw.startFile()
return fw
}
Expand Down Expand Up @@ -154,6 +159,11 @@ func (fw *Writer) startFile() {
}
}

// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
func (fw *Writer) AppendKeyValueMetadata(key string, value string) error {
return fw.metadata.AppendKeyValueMetadata(key, value)
}

// Close closes any open row group writer and writes the file footer. Subsequent
// calls to close will have no effect.
func (fw *Writer) Close() (err error) {
Expand All @@ -180,11 +190,12 @@ func (fw *Writer) Close() (err error) {

fileEncryptProps := fw.props.FileEncryptionProperties()
if fileEncryptProps == nil { // non encrypted file
if fw.FileMetadata, err = fw.metadata.Finish(); err != nil {
fileMetadata, err := fw.metadata.Finish()
if err != nil {
return err
}

_, err = writeFileMetadata(fw.FileMetadata, fw.sink)
_, err = writeFileMetadata(fileMetadata, fw.sink)
return err
}

Expand All @@ -193,12 +204,12 @@ func (fw *Writer) Close() (err error) {
return nil
}

func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (err error) {
func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) error {
// encrypted file with encrypted footer
if props.EncryptedFooter() {
fw.FileMetadata, err = fw.metadata.Finish()
fileMetadata, err := fw.metadata.Finish()
if err != nil {
return
return err
}

footerLen := int64(0)
Expand All @@ -211,7 +222,7 @@ func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (e

footerLen += n
footerEncryptor := fw.fileEncryptor.GetFooterEncryptor()
n, err = writeEncryptedFileMetadata(fw.FileMetadata, fw.sink, footerEncryptor, true)
n, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerEncryptor, true)
if err != nil {
return err
}
Expand All @@ -224,11 +235,12 @@ func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (e
return err
}
} else {
if fw.FileMetadata, err = fw.metadata.Finish(); err != nil {
return
fileMetadata, err := fw.metadata.Finish()
if err != nil {
return err
}
footerSigningEncryptor := fw.fileEncryptor.GetFooterSigningEncryptor()
if _, err = writeEncryptedFileMetadata(fw.FileMetadata, fw.sink, footerSigningEncryptor, false); err != nil {
if _, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerSigningEncryptor, false); err != nil {
return err
}
}
Expand Down
29 changes: 29 additions & 0 deletions go/parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/arrow/go/v14/parquet/internal/testutils"
"github.com/apache/arrow/go/v14/parquet/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -371,6 +372,34 @@ func TestAllNulls(t *testing.T) {
assert.Equal(t, []int16{0, 0, 0}, defLevels[:])
}

func TestKeyValueMetadata(t *testing.T) {
fields := schema.FieldList{
schema.NewInt32Node("unused", parquet.Repetitions.Optional, -1),
}
sc, _ := schema.NewGroupNode("root", parquet.Repetitions.Required, fields, -1)
sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)

writer := file.NewParquetWriter(sink, sc)

testKey := "testKey"
testValue := "testValue"
writer.AppendKeyValueMetadata(testKey, testValue)
writer.Close()

buffer := sink.Finish()
defer buffer.Release()
props := parquet.NewReaderProperties(memory.DefaultAllocator)
props.BufferedStreamEnabled = true

reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()), file.WithReadProps(props))
assert.NoError(t, err)

metadata := reader.MetaData()
got := metadata.KeyValueMetadata().FindValue(testKey)
require.NotNil(t, got)
assert.Equal(t, testValue, *got)
}

func createSerializeTestSuite(typ reflect.Type) suite.TestingSuite {
return &SerializeTestSuite{PrimitiveTypedTest: testutils.NewPrimitiveTypedTest(typ)}
}
Expand Down
5 changes: 5 additions & 0 deletions go/parquet/metadata/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder {
return f.currentRgBldr
}

// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
func (f *FileMetaDataBuilder) AppendKeyValueMetadata(key string, value string) error {
return f.kvmeta.Append(key, value)
}

// Finish will finalize the metadata of the number of rows, row groups,
// version etc. This will clear out this filemetadatabuilder so it can
// be re-used
Expand Down
35 changes: 35 additions & 0 deletions go/parquet/metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,41 @@ func TestKeyValueMetadata(t *testing.T) {
assert.True(t, faccessor.KeyValueMetadata().Equals(kvmeta))
}

func TestKeyValueMetadataAppend(t *testing.T) {
props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))

fields := schema.FieldList{
schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
}
root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
require.NoError(t, err)
schema := schema.NewSchema(root)

kvmeta := metadata.NewKeyValueMetadata()
key1 := "test_key1"
value1 := "test_value1"
require.NoError(t, kvmeta.Append(key1, value1))

fbuilder := metadata.NewFileMetadataBuilder(schema, props, kvmeta)

key2 := "test_key2"
value2 := "test_value2"
require.NoError(t, fbuilder.AppendKeyValueMetadata(key2, value2))
faccessor, err := fbuilder.Finish()
require.NoError(t, err)

kv := faccessor.KeyValueMetadata()

got1 := kv.FindValue(key1)
require.NotNil(t, got1)
assert.Equal(t, value1, *got1)

got2 := kv.FindValue(key2)
require.NotNil(t, got2)
assert.Equal(t, value2, *got2)
}

func TestApplicationVersion(t *testing.T) {
version := metadata.NewAppVersion("parquet-mr version 1.7.9")
version1 := metadata.NewAppVersion("parquet-mr version 1.8.0")
Expand Down
45 changes: 45 additions & 0 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,51 @@ func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) {
}
}

func TestWriteKeyValueMetadata(t *testing.T) {
kv := map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
}

sc := arrow.NewSchema([]arrow.Field{
{Name: "int32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
}, nil)
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
defer bldr.Release()
for _, b := range bldr.Fields() {
b.AppendNull()
}

rec := bldr.NewRecord()
defer rec.Release()

props := parquet.NewWriterProperties(
parquet.WithVersion(parquet.V1_0),
)
var buf bytes.Buffer
fw, err := pqarrow.NewFileWriter(sc, &buf, props, pqarrow.DefaultWriterProps())
require.NoError(t, err)
err = fw.Write(rec)
require.NoError(t, err)

for key, value := range kv {
require.NoError(t, fw.AppendKeyValueMetadata(key, value))
}

err = fw.Close()
require.NoError(t, err)

reader, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)

for key, value := range kv {
got := reader.MetaData().KeyValueMetadata().FindValue(key)
require.NotNil(t, got)
assert.Equal(t, value, *got)
}
}

func TestWriteEmptyLists(t *testing.T) {
sc := arrow.NewSchema([]arrow.Field{
{Name: "f1", Type: arrow.ListOf(arrow.FixedWidthTypes.Date32)},
Expand Down
5 changes: 5 additions & 0 deletions go/parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error {
return nil
}

// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error {
return fw.wr.AppendKeyValueMetadata(key, value)
}

// Close flushes out the data and closes the file. It can be called multiple times,
// subsequent calls after the first will have no effect.
func (fw *FileWriter) Close() error {
Expand Down
Loading