diff --git a/go/parquet/file/file_writer.go b/go/parquet/file/file_writer.go index 64a21473c293a..c6289434bbe6e 100644 --- a/go/parquet/file/file_writer.go +++ b/go/parquet/file/file_writer.go @@ -41,23 +41,24 @@ type Writer struct { // The Schema of this writer Schema *schema.Schema - // The current FileMetadata to write - FileMetadata *metadata.FileMetaData - // The current keyvalue metadata - KeyValueMetadata metadata.KeyValueMetadata } -type WriteOption func(*Writer) +type writerConfig struct { + props *parquet.WriterProperties + keyValueMetadata metadata.KeyValueMetadata +} + +type WriteOption func(*writerConfig) func WithWriterProps(props *parquet.WriterProperties) WriteOption { - return func(w *Writer) { - w.props = props + return func(c *writerConfig) { + c.props = props } } func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption { - return func(w *Writer) { - w.KeyValueMetadata = meta + return func(c *writerConfig) { + c.keyValueMetadata = meta } } @@ -66,19 +67,23 @@ func WithWriteMetadata(meta metadata.KeyValueMetadata) WriteOption { // If props is nil, then the default Writer Properties will be used. If the key value metadata is not nil, // it will be added to the file. func NewParquetWriter(w io.Writer, sc *schema.GroupNode, opts ...WriteOption) *Writer { + config := &writerConfig{} + for _, o := range opts { + o(config) + } + if config.props == nil { + config.props = parquet.NewWriterProperties() + } + fileSchema := schema.NewSchema(sc) fw := &Writer{ + props: config.props, sink: &utils.TellWrapper{Writer: w}, open: true, Schema: fileSchema, } - for _, o := range opts { - o(fw) - } - if fw.props == nil { - fw.props = parquet.NewWriterProperties() - } - fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, fw.KeyValueMetadata) + + fw.metadata = *metadata.NewFileMetadataBuilder(fw.Schema, fw.props, config.keyValueMetadata) fw.startFile() return fw } @@ -154,6 +159,11 @@ func (fw *Writer) startFile() { } } +// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata +func (fw *Writer) AppendKeyValueMetadata(key string, value string) error { + return fw.metadata.AppendKeyValueMetadata(key, value) +} + // Close closes any open row group writer and writes the file footer. Subsequent // calls to close will have no effect. func (fw *Writer) Close() (err error) { @@ -180,11 +190,12 @@ func (fw *Writer) Close() (err error) { fileEncryptProps := fw.props.FileEncryptionProperties() if fileEncryptProps == nil { // non encrypted file - if fw.FileMetadata, err = fw.metadata.Finish(); err != nil { + fileMetadata, err := fw.metadata.Finish() + if err != nil { return err } - _, err = writeFileMetadata(fw.FileMetadata, fw.sink) + _, err = writeFileMetadata(fileMetadata, fw.sink) return err } @@ -193,12 +204,12 @@ func (fw *Writer) Close() (err error) { return nil } -func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (err error) { +func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) error { // encrypted file with encrypted footer if props.EncryptedFooter() { - fw.FileMetadata, err = fw.metadata.Finish() + fileMetadata, err := fw.metadata.Finish() if err != nil { - return + return err } footerLen := int64(0) @@ -211,7 +222,7 @@ func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (e footerLen += n footerEncryptor := fw.fileEncryptor.GetFooterEncryptor() - n, err = writeEncryptedFileMetadata(fw.FileMetadata, fw.sink, footerEncryptor, true) + n, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerEncryptor, true) if err != nil { return err } @@ -224,11 +235,12 @@ func (fw *Writer) closeEncryptedFile(props *parquet.FileEncryptionProperties) (e return err } } else { - if fw.FileMetadata, err = fw.metadata.Finish(); err != nil { - return + fileMetadata, err := fw.metadata.Finish() + if err != nil { + return err } footerSigningEncryptor := fw.fileEncryptor.GetFooterSigningEncryptor() - if _, err = writeEncryptedFileMetadata(fw.FileMetadata, fw.sink, footerSigningEncryptor, false); err != nil { + if _, err = writeEncryptedFileMetadata(fileMetadata, fw.sink, footerSigningEncryptor, false); err != nil { return err } } diff --git a/go/parquet/file/file_writer_test.go b/go/parquet/file/file_writer_test.go index 0cca1cd40d4c9..af083ebe60e4f 100644 --- a/go/parquet/file/file_writer_test.go +++ b/go/parquet/file/file_writer_test.go @@ -30,6 +30,7 @@ import ( "github.com/apache/arrow/go/v14/parquet/internal/testutils" "github.com/apache/arrow/go/v14/parquet/schema" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -371,6 +372,34 @@ func TestAllNulls(t *testing.T) { assert.Equal(t, []int16{0, 0, 0}, defLevels[:]) } +func TestKeyValueMetadata(t *testing.T) { + fields := schema.FieldList{ + schema.NewInt32Node("unused", parquet.Repetitions.Optional, -1), + } + sc, _ := schema.NewGroupNode("root", parquet.Repetitions.Required, fields, -1) + sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) + + writer := file.NewParquetWriter(sink, sc) + + testKey := "testKey" + testValue := "testValue" + writer.AppendKeyValueMetadata(testKey, testValue) + writer.Close() + + buffer := sink.Finish() + defer buffer.Release() + props := parquet.NewReaderProperties(memory.DefaultAllocator) + props.BufferedStreamEnabled = true + + reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()), file.WithReadProps(props)) + assert.NoError(t, err) + + metadata := reader.MetaData() + got := metadata.KeyValueMetadata().FindValue(testKey) + require.NotNil(t, got) + assert.Equal(t, testValue, *got) +} + func createSerializeTestSuite(typ reflect.Type) suite.TestingSuite { return &SerializeTestSuite{PrimitiveTypedTest: testutils.NewPrimitiveTypedTest(typ)} } diff --git a/go/parquet/metadata/file.go b/go/parquet/metadata/file.go index efe3c01c25b33..dddd95c5df670 100644 --- a/go/parquet/metadata/file.go +++ b/go/parquet/metadata/file.go @@ -95,6 +95,11 @@ func (f *FileMetaDataBuilder) AppendRowGroup() *RowGroupMetaDataBuilder { return f.currentRgBldr } +// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata +func (f *FileMetaDataBuilder) AppendKeyValueMetadata(key string, value string) error { + return f.kvmeta.Append(key, value) +} + // Finish will finalize the metadata of the number of rows, row groups, // version etc. This will clear out this filemetadatabuilder so it can // be re-used diff --git a/go/parquet/metadata/metadata_test.go b/go/parquet/metadata/metadata_test.go index 0db64d88ab0f4..b685dd2223274 100644 --- a/go/parquet/metadata/metadata_test.go +++ b/go/parquet/metadata/metadata_test.go @@ -272,6 +272,41 @@ func TestKeyValueMetadata(t *testing.T) { assert.True(t, faccessor.KeyValueMetadata().Equals(kvmeta)) } +func TestKeyValueMetadataAppend(t *testing.T) { + props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)) + + fields := schema.FieldList{ + schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1), + schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1), + } + root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1) + require.NoError(t, err) + schema := schema.NewSchema(root) + + kvmeta := metadata.NewKeyValueMetadata() + key1 := "test_key1" + value1 := "test_value1" + require.NoError(t, kvmeta.Append(key1, value1)) + + fbuilder := metadata.NewFileMetadataBuilder(schema, props, kvmeta) + + key2 := "test_key2" + value2 := "test_value2" + require.NoError(t, fbuilder.AppendKeyValueMetadata(key2, value2)) + faccessor, err := fbuilder.Finish() + require.NoError(t, err) + + kv := faccessor.KeyValueMetadata() + + got1 := kv.FindValue(key1) + require.NotNil(t, got1) + assert.Equal(t, value1, *got1) + + got2 := kv.FindValue(key2) + require.NotNil(t, got2) + assert.Equal(t, value2, *got2) +} + func TestApplicationVersion(t *testing.T) { version := metadata.NewAppVersion("parquet-mr version 1.7.9") version1 := metadata.NewAppVersion("parquet-mr version 1.8.0") diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index 654d3d813cf85..3c20cf2d4757b 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -360,6 +360,51 @@ func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) { } } +func TestWriteKeyValueMetadata(t *testing.T) { + kv := map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "value3", + } + + sc := arrow.NewSchema([]arrow.Field{ + {Name: "int32", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, + }, nil) + bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc) + defer bldr.Release() + for _, b := range bldr.Fields() { + b.AppendNull() + } + + rec := bldr.NewRecord() + defer rec.Release() + + props := parquet.NewWriterProperties( + parquet.WithVersion(parquet.V1_0), + ) + var buf bytes.Buffer + fw, err := pqarrow.NewFileWriter(sc, &buf, props, pqarrow.DefaultWriterProps()) + require.NoError(t, err) + err = fw.Write(rec) + require.NoError(t, err) + + for key, value := range kv { + require.NoError(t, fw.AppendKeyValueMetadata(key, value)) + } + + err = fw.Close() + require.NoError(t, err) + + reader, err := file.NewParquetReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + + for key, value := range kv { + got := reader.MetaData().KeyValueMetadata().FindValue(key) + require.NotNil(t, got) + assert.Equal(t, value, *got) + } +} + func TestWriteEmptyLists(t *testing.T) { sc := arrow.NewSchema([]arrow.Field{ {Name: "f1", Type: arrow.ListOf(arrow.FixedWidthTypes.Date32)}, diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go index 052220e716c77..aa0bae7b1fdfb 100644 --- a/go/parquet/pqarrow/file_writer.go +++ b/go/parquet/pqarrow/file_writer.go @@ -272,6 +272,11 @@ func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error { return nil } +// AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata +func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error { + return fw.wr.AppendKeyValueMetadata(key, value) +} + // Close flushes out the data and closes the file. It can be called multiple times, // subsequent calls after the first will have no effect. func (fw *FileWriter) Close() error {