Skip to content

Commit

Permalink
Expire idle connections no longer acquired during lifetime (#945)
Browse files Browse the repository at this point in the history
* Expire idle connections

* fix race in connection close func

* atomic closed flag

* atomic closed flag replaced with uint32

* sync closed flag

* add integration test

* add close lock in a few places

* fix race conditions

* skip two tests in cloud env

* fix cloud tests

* Skip connection idling test on cloud
  • Loading branch information
jkaflik authored Mar 31, 2023
1 parent 5df7295 commit 0d10a70
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 5 deletions.
1 change: 1 addition & 0 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
if err != nil {
return nil, err
}
go result.conn.closeAfterMaxLifeTime()
return result.conn, nil
}

Expand Down
45 changes: 40 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"log"
"net"
"os"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -128,6 +129,8 @@ type connect struct {
readTimeout time.Duration
blockBufferSize uint8
maxCompressionBuffer int

rwLock sync.Mutex
}

func (c *connect) settings(querySettings Settings) []proto.Setting {
Expand All @@ -148,8 +151,7 @@ func (c *connect) settings(querySettings Settings) []proto.Setting {
}

func (c *connect) isBad() bool {
switch {
case c.closed:
if c.isClosed() {
return true
}

Expand All @@ -163,15 +165,48 @@ func (c *connect) isBad() bool {
return false
}

// closeAfterMaxLifeTime closes the connection if it has been used for longer than ConnMaxLifeTime
func (c *connect) closeAfterMaxLifeTime() {
t := time.NewTimer(c.opt.ConnMaxLifetime)
defer t.Stop()

// check if connection should be closed after duration of ConnMaxLifeTime
// if connection is closed, return
for {
select {
case <-t.C:
c.close()
return
default:
if c.isClosed() {
return
}

time.Sleep(time.Second)
}
}
}

func (c *connect) isClosed() bool {
c.rwLock.Lock()
defer c.rwLock.Unlock()

return c.closed
}

func (c *connect) close() error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

if c.closed {
return nil
}

c.closed = true
c.buffer = nil
c.reader = nil
if err := c.conn.Close(); err != nil {
return err
c.debugf("[close] %s", err)
}
return nil
}
Expand Down Expand Up @@ -236,10 +271,10 @@ func (c *connect) sendData(block *proto.Block, name string) error {
if err := c.flush(); err != nil {
if errors.Is(err, syscall.EPIPE) {
c.debugf("[send data] pipe is broken, closing connection")
c.closed = true
c.close()
} else if errors.Is(err, io.EOF) {
c.debugf("[send data] unexpected EOF, closing connection")
c.closed = true
c.close()
} else {
c.debugf("[send data] unexpected error: %v", err)
}
Expand Down
6 changes: 6 additions & 0 deletions conn_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
)

func (c *connect) handshake(database, username, password string) error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

defer c.buffer.Reset()
c.debugf("[handshake] -> %s", proto.ClientHandshake{})
// set a read deadline - alternative to context.Read operation will fail if no data is received after deadline.
Expand Down Expand Up @@ -83,6 +86,9 @@ func (c *connect) handshake(database, username, password string) error {
}

func (c *connect) sendAddendum() error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

if c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY {
c.buffer.PutString("") // todo quota key support
}
Expand Down
3 changes: 3 additions & 0 deletions conn_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
// Connection::ping
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
func (c *connect) ping(ctx context.Context) (err error) {
c.rwLock.Lock()
defer c.rwLock.Unlock()

// set a read deadline - alternative to context.Read operation will fail if no data is received after deadline.
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
defer c.conn.SetReadDeadline(time.Time{})
Expand Down
5 changes: 5 additions & 0 deletions conn_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
return ctx.Err()
default:
}
c.rwLock.Lock()
packet, err := c.reader.ReadByte()
c.rwLock.Unlock()
if err != nil {
return err
}
Expand All @@ -82,6 +84,9 @@ func (c *connect) process(ctx context.Context, on *onProcess) error {
}

func (c *connect) handle(ctx context.Context, packet byte, on *onProcess) error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

switch packet {
case proto.ServerData, proto.ServerTotals, proto.ServerExtremes:
block, err := c.readData(ctx, packet, true)
Expand Down
3 changes: 3 additions & 0 deletions conn_send_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
// Connection::sendQuery
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
func (c *connect) sendQuery(body string, o *QueryOptions) error {
c.rwLock.Lock()
defer c.rwLock.Unlock()

c.debugf("[send query] compression=%q %s", c.compression, body)
c.buffer.PutByte(proto.ClientQuery)
q := proto.Query{
Expand Down
55 changes: 55 additions & 0 deletions tests/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -305,3 +306,57 @@ func TestEmptyDatabaseConfig(t *testing.T) {
err = anotherConn.Ping(context.Background())
require.NoError(t, err)
}

func TestConnectionExpiresIdleConnection(t *testing.T) {
runInDocker, _ := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_DOCKER", "true"))
if !runInDocker {
t.Skip("Skip test in cloud environment. This test is not stable in cloud environment, due to race conditions.")
}

// given
ctx := context.Background()
testEnv, err := GetTestEnvironment(testSet)
require.NoError(t, err)

baseConn, err := testClientWithDefaultSettings(testEnv)
require.NoError(t, err)

expectedConnections := getActiveConnections(t, baseConn)

// when the client is configured to expire idle connections after 1/10 of a second
opts := clientOptionsFromEnv(testEnv, clickhouse.Settings{})
opts.MaxIdleConns = 20
opts.MaxOpenConns = 20
opts.ConnMaxLifetime = time.Second / 10
conn, err := clickhouse.Open(&opts)
require.NoError(t, err)

// run 1000 queries in parallel
var wg sync.WaitGroup
const selectToRunAtOnce = 1000
for i := 0; i < selectToRunAtOnce; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r, err := conn.Query(ctx, "SELECT 1")
require.NoError(t, err)

r.Close()
}()
}
wg.Wait()

// then we expect that all connections will be closed when they are idle
// retrying for 10 seconds to make sure that the connections are closed
assert.Eventuallyf(t, func() bool {
return getActiveConnections(t, baseConn) == expectedConnections
}, time.Second*10, opts.ConnMaxLifetime, "expected connections to be reset back to %d", expectedConnections)
}

func getActiveConnections(t *testing.T, client clickhouse.Conn) (conns int64) {
ctx := context.Background()
r := client.QueryRow(ctx, "SELECT sum(value) as conns FROM system.metrics WHERE metric LIKE '%Connection'")
require.NoError(t, r.Err())
require.NoError(t, r.Scan(&conns))
return conns
}

0 comments on commit 0d10a70

Please sign in to comment.