Skip to content

Commit

Permalink
Use prefix interfaces to enable Array(JSON)
Browse files Browse the repository at this point in the history
  • Loading branch information
SpencerTorres committed Feb 16, 2025
1 parent fca08b4 commit 3bd05b7
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 10 deletions.
45 changes: 37 additions & 8 deletions lib/column/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,32 @@ func (c *JSON) encodeStringData(buffer *proto.Buffer) {
c.jsonStrings.Encode(buffer)
}

func (c *JSON) Encode(buffer *proto.Buffer) {
func (c *JSON) WriteStatePrefix(buffer *proto.Buffer) error {
switch c.serializationVersion {
case JSONObjectSerializationVersion:
buffer.PutUInt64(JSONObjectSerializationVersion)
c.encodeObjectHeader(buffer)

return nil
case JSONStringSerializationVersion:
buffer.PutUInt64(JSONStringSerializationVersion)

return nil
default:
// If the column is an array, it can be empty but still require a prefix.
// Use string encoding since it's smaller
buffer.PutUInt64(JSONStringSerializationVersion)

return nil
}
}

func (c *JSON) Encode(buffer *proto.Buffer) {
switch c.serializationVersion {
case JSONObjectSerializationVersion:
c.encodeObjectData(buffer)
return
case JSONStringSerializationVersion:
buffer.PutUInt64(JSONStringSerializationVersion)
c.encodeStringData(buffer)
return
}
Expand Down Expand Up @@ -686,9 +703,7 @@ func (c *JSON) decodeStringData(reader *proto.Reader, rows int) error {
return c.jsonStrings.Decode(reader, rows)
}

func (c *JSON) Decode(reader *proto.Reader, rows int) error {
c.rows = rows

func (c *JSON) ReadStatePrefix(reader *proto.Reader) error {
jsonSerializationVersion, err := reader.UInt64()
if err != nil {
return fmt.Errorf("failed to read json serialization version: %w", err)
Expand All @@ -703,20 +718,34 @@ func (c *JSON) Decode(reader *proto.Reader, rows int) error {
return fmt.Errorf("failed to decode json object header: %w", err)
}

err = c.decodeObjectData(reader, rows)
return nil
case JSONStringSerializationVersion:
return nil
default:
return fmt.Errorf("unsupported JSON serialization version for prefix decode: %d", jsonSerializationVersion)
}
}

func (c *JSON) Decode(reader *proto.Reader, rows int) error {
c.rows = rows

switch c.serializationVersion {
case JSONObjectSerializationVersion:
err := c.decodeObjectData(reader, rows)
if err != nil {
return fmt.Errorf("failed to decode json object data: %w", err)
}

return nil
case JSONStringSerializationVersion:
err = c.decodeStringData(reader, rows)
err := c.decodeStringData(reader, rows)
if err != nil {
return fmt.Errorf("failed to decode json string data: %w", err)
}

return nil
default:
return fmt.Errorf("unsupported JSON serialization version for decode: %d", jsonSerializationVersion)
return fmt.Errorf("unsupported JSON serialization version for decode: %d", c.serializationVersion)
}
}

Expand Down
98 changes: 96 additions & 2 deletions tests/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/stretchr/testify/require"
"testing"
Expand Down Expand Up @@ -72,7 +71,7 @@ func TestJSONPaths(t *testing.T) {
rows, err := conn.Query(ctx, "SELECT c FROM test_json")
require.NoError(t, err)

var row chcol.JSON
var row clickhouse.JSON

require.True(t, rows.Next())
err = rows.Scan(&row)
Expand All @@ -99,6 +98,101 @@ func TestJSONPaths(t *testing.T) {
}
}

func TestJSONArray(t *testing.T) {
ctx := context.Background()
conn := setupJSONTest(t)

const ddl = `
CREATE TABLE IF NOT EXISTS test_json (
c Array(JSON)
) Engine = MergeTree() ORDER BY tuple()
`
require.NoError(t, conn.Exec(ctx, ddl))
defer func() {
require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json"))
}()

batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)")
require.NoError(t, err)

arrJsonRow := []*clickhouse.JSON{clickhouse.NewJSON(), BuildTestJSONPaths()}

require.NoError(t, batch.Append(arrJsonRow))
require.NoError(t, batch.Send())

rows, err := conn.Query(ctx, "SELECT c FROM test_json")
require.NoError(t, err)

var arrRow []*clickhouse.JSON

require.True(t, rows.Next())
err = rows.Scan(&arrRow)
require.NoError(t, err)
require.Len(t, arrRow, 2)

actualValuesByPathEmpty := arrRow[0].ValuesByPath()
for _, actualValue := range actualValuesByPathEmpty {
// Allow Nil func to compare values without Dynamic wrapper
if v, ok := actualValue.(clickhouse.Dynamic); ok {
actualValue = v.Any()
}

require.Nil(t, actualValue)
}

expectedValuesByPath := arrJsonRow[1].ValuesByPath()
actualValuesByPath := arrRow[1].ValuesByPath()
for path, expectedValue := range expectedValuesByPath {
actualValue, ok := actualValuesByPath[path]
if !ok {
t.Fatalf("result JSON is missing path: %s", path)
}

// Allow Equal func to compare values without Dynamic wrapper
if v, ok := expectedValue.(clickhouse.Dynamic); ok {
expectedValue = v.Any()
}

if v, ok := actualValue.(clickhouse.Dynamic); ok {
actualValue = v.Any()
}

require.Equal(t, expectedValue, actualValue)
}
}

func TestJSONEmptyArray(t *testing.T) {
ctx := context.Background()
conn := setupJSONTest(t)

const ddl = `
CREATE TABLE IF NOT EXISTS test_json (
c Array(JSON)
) Engine = MergeTree() ORDER BY tuple()
`
require.NoError(t, conn.Exec(ctx, ddl))
defer func() {
require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_json"))
}()

batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_json (c)")
require.NoError(t, err)

var arrJsonRow []*clickhouse.JSON
require.NoError(t, batch.Append(arrJsonRow))
require.NoError(t, batch.Send())

rows, err := conn.Query(ctx, "SELECT c FROM test_json")
require.NoError(t, err)

var arrRow []*clickhouse.JSON

require.True(t, rows.Next())
err = rows.Scan(&arrRow)
require.NoError(t, err)
require.Len(t, arrRow, 0)
}

func TestJSONStruct(t *testing.T) {
ctx := context.Background()
conn := setupJSONTest(t)
Expand Down

0 comments on commit 3bd05b7

Please sign in to comment.