Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterable ordered map alternative with improved performance #1152

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package clickhouse
import (
std_driver "database/sql/driver"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"reflect"
"regexp"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

Expand Down Expand Up @@ -320,6 +320,23 @@ func format(tz *time.Location, scale TimeUnit, v any) (string, error) {
values = append(values, fmt.Sprintf("%s, %s", name, val))
}

return "map(" + strings.Join(values, ", ") + ")", nil
case column.IterableOrderedMap:
values := make([]string, 0)
iter := v.Iterator()
for iter.Next() {
key, value := iter.Key(), iter.Value()
name, err := format(tz, scale, key)
if err != nil {
return "", err
}
val, err := format(tz, scale, value)
if err != nil {
return "", err
}
values = append(values, fmt.Sprintf("%s, %s", name, val))
}

return "map(" + strings.Join(values, ", ") + ")", nil
}
switch v := reflect.ValueOf(v); v.Kind() {
Expand Down
9 changes: 7 additions & 2 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package clickhouse_api
import (
"context"
"fmt"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
"math/rand"
"os"
"strconv"
"testing"
"time"

clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -155,6 +156,10 @@ func TestMapInsertRead(t *testing.T) {
require.NoError(t, MapInsertRead())
}

func TestIterableOrderedMapInsertRead(t *testing.T) {
require.NoError(t, IterableOrderedMapInsertRead())
}

func TestMultiHostConnect(t *testing.T) {
require.NoError(t, MultiHostVersion())
require.NoError(t, MultiHostRoundRobinVersion())
Expand Down
96 changes: 96 additions & 0 deletions examples/clickhouse_api/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"fmt"
"strconv"

"github.com/ClickHouse/clickhouse-go/v2/lib/column"
)

func MapInsertRead() error {
Expand Down Expand Up @@ -80,3 +82,97 @@ func MapInsertRead() error {
rows.Close()
return rows.Err()
}

func IterableOrderedMapInsertRead() error {
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Map(String, String)
) Engine Memory
`)
if err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
om := NewOrderedMap()
kv1 := strconv.Itoa(int(i))
kv2 := strconv.Itoa(int(i + 1))
om.Put(kv1, kv1)
om.Put(kv2, kv2)
err := batch.Append(om)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
var col1 OrderedMap
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%v\n", col1)
}
rows.Close()
return rows.Err()
}

// OrderedMap is a simple (non thread safe) ordered map
type OrderedMap struct {
Keys []any
Values []any
}

func NewOrderedMap() column.IterableOrderedMap {
return &OrderedMap{}
}

func (om *OrderedMap) Put(key any, value any) {
om.Keys = append(om.Keys, key)
om.Values = append(om.Values, value)
}

func (om *OrderedMap) Iterator() column.MapIterator {
return NewOrderedMapIterator(om)
}

type OrderedMapIter struct {
om *OrderedMap
iterIndex int
}

func NewOrderedMapIterator(om *OrderedMap) column.MapIterator {
return &OrderedMapIter{om: om, iterIndex: -1}
}

func (i *OrderedMapIter) Next() bool {
i.iterIndex++
return i.iterIndex < len(i.om.Keys)
}

func (i *OrderedMapIter) Key() any {
return i.om.Keys[i.iterIndex]
}

func (i *OrderedMapIter) Value() any {
return i.om.Values[i.iterIndex]
}
42 changes: 41 additions & 1 deletion lib/column/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package column
import (
"database/sql/driver"
"fmt"
"github.com/ClickHouse/ch-go/proto"
"reflect"
"strings"
"time"

"github.com/ClickHouse/ch-go/proto"
)

// https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnMap.cpp
Expand All @@ -42,6 +43,17 @@ type OrderedMap interface {
Keys() <-chan any
}

type MapIterator interface {
Next() bool
Key() any
Value() any
}

type IterableOrderedMap interface {
Put(key any, value any)
Iterator() MapIterator
}

func (col *Map) Reset() {
col.keys.Reset()
col.values.Reset()
Expand Down Expand Up @@ -94,6 +106,13 @@ func (col *Map) ScanRow(dest any, i int) error {
value.Set(col.row(i))
return nil
}
if om, ok := dest.(IterableOrderedMap); ok {
keys, values := col.orderedRow(i)
for i := range keys {
om.Put(keys[i], values[i])
}
return nil
}
if om, ok := dest.(OrderedMap); ok {
keys, values := col.orderedRow(i)
for i := range keys {
Expand Down Expand Up @@ -163,6 +182,27 @@ func (col *Map) AppendRow(v any) error {
return nil
}

if orderedMap, ok := v.(IterableOrderedMap); ok {
var size int64
iter := orderedMap.Iterator()
for iter.Next() {
key, value := iter.Key(), iter.Value()
size++
if err := col.keys.AppendRow(key); err != nil {
return err
}
if err := col.values.AppendRow(value); err != nil {
return err
}
}
var prev int64
if n := col.offsets.Rows(); n != 0 {
prev = col.offsets.col.Row(n - 1)
}
col.offsets.col.Append(prev + size)
return nil
}

if orderedMap, ok := v.(OrderedMap); ok {
var size int64
for key := range orderedMap.Keys() {
Expand Down
126 changes: 123 additions & 3 deletions tests/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"context"
"database/sql/driver"
"fmt"
"github.com/stretchr/testify/require"
"reflect"
"testing"

"github.com/stretchr/testify/require"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -215,8 +217,9 @@ func TestMapFlush(t *testing.T) {

// a simple (non thread safe) ordered map
type OrderedMap struct {
keys []any
values map[any]any
keys []any
values map[any]any
valuesIter []any
}

func NewOrderedMap() *OrderedMap {
Expand All @@ -240,6 +243,7 @@ func (om *OrderedMap) Put(key any, value any) {
}
om.keys = append(om.keys, key)
om.values[key] = value
om.valuesIter = append(om.valuesIter, value)
}

func (om *OrderedMap) Keys() <-chan any {
Expand Down Expand Up @@ -354,3 +358,119 @@ func TestMapValuer(t *testing.T) {
}
require.Equal(t, 1000, i)
}

func (om *OrderedMap) KeysUseChanNoGo() <-chan any {
ch := make(chan any, len(om.keys))
for _, key := range om.keys {
ch <- key
}
close(ch)
return ch
}

func (om *OrderedMap) KeysUseSlice() []any {
return om.keys
}

func (om *OrderedMap) Iter() MapIter {
return &mapIter{om: om, iterIndex: -1}
}

type MapIter interface {
Next() bool
Key() any
Value() any
}

type mapIter struct {
om *OrderedMap
iterIndex int
}

func (i *mapIter) Next() bool {
i.iterIndex++
return i.iterIndex < len(i.om.keys)
}

func (i *mapIter) Key() any {
return i.om.keys[i.iterIndex]
}

func (i *mapIter) Value() any {
return i.om.valuesIter[i.iterIndex]
}

func BenchmarkOrderedMapUseChanGo(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for key := range m.Keys() {
_, _ = m.Get(key)
}
}
}

func BenchmarkOrderedMapKeysUseChanNoGo(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for key := range m.KeysUseChanNoGo() {
_, _ = m.Get(key)
}
}
}

func BenchmarkOrderedMapKeysUseSlice(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for key := range m.KeysUseSlice() {
_, _ = m.Get(key)
}
}
}

func BenchmarkOrderedMapKeysUseIter(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
iter := m.Iter()
for iter.Next() {
_ = iter.Key()
_ = iter.Value()
}
}
}

func BenchmarkOrderedMapReflectMapIter(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
value := reflect.Indirect(reflect.ValueOf(m.values))
iter := value.MapRange()
for iter.Next() {
_ = iter.Key().Interface()
_ = iter.Value().Interface()
}
}
}
Loading