diff --git a/clickhouse.go b/clickhouse.go index 32d9e434c9..e397fa1093 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -118,6 +118,7 @@ func (ch *clickhouse) Query(ctx context.Context, query string, args ...interface if err != nil { return nil, err } + conn.debugf("[acquired] connection [%d]", conn.id) return conn.query(ctx, ch.release, query, args...) } @@ -128,6 +129,7 @@ func (ch *clickhouse) QueryRow(ctx context.Context, query string, args ...interf err: err, } } + conn.debugf("[acquired] connection [%d]", conn.id) return conn.queryRow(ctx, ch.release, query, args...) } diff --git a/conn.go b/conn.go index 785bde05ef..afd37afe20 100644 --- a/conn.go +++ b/conn.go @@ -72,17 +72,18 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er var ( connect = &connect{ - opt: opt, - conn: conn, - debugf: debugf, - buffer: new(chproto.Buffer), - reader: chproto.NewReader(conn), - revision: proto.ClientTCPProtocolVersion, - structMap: &structMap{}, - compression: compression, - connectedAt: time.Now(), - compressor: compress.NewWriter(), - readTimeout: opt.ReadTimeout, + id: num, + opt: opt, + conn: conn, + debugf: debugf, + buffer: new(chproto.Buffer), + reader: chproto.NewReader(conn), + revision: proto.ClientTCPProtocolVersion, + structMap: &structMap{}, + compression: compression, + connectedAt: time.Now(), + compressor: compress.NewWriter(), + readTimeout: opt.ReadTimeout, blockBufferSize: opt.BlockBufferSize, } ) @@ -101,20 +102,21 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er // https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp type connect struct { - opt *Options - conn net.Conn - debugf func(format string, v ...interface{}) - server ServerVersion - closed bool - buffer *chproto.Buffer - reader *chproto.Reader - released bool - revision uint64 - structMap *structMap - compression CompressionMethod - connectedAt time.Time - compressor *compress.Writer - readTimeout time.Duration + id int + opt *Options + conn net.Conn + debugf func(format string, v ...interface{}) + server ServerVersion + closed bool + buffer *chproto.Buffer + reader *chproto.Reader + released bool + revision uint64 + structMap *structMap + compression CompressionMethod + connectedAt time.Time + compressor *compress.Writer + readTimeout time.Duration blockBufferSize uint8 } diff --git a/conn_process.go b/conn_process.go index ca7fec0bd3..b673c78240 100644 --- a/conn_process.go +++ b/conn_process.go @@ -20,10 +20,8 @@ package clickhouse import ( "context" "fmt" - "io" - "time" - "github.com/ClickHouse/clickhouse-go/v2/lib/proto" + "io" ) type onProcess struct { @@ -137,9 +135,12 @@ func (c *connect) handle(packet byte, on *onProcess) error { } func (c *connect) cancel() error { - c.conn.SetDeadline(time.Now().Add(2 * time.Second)) c.debugf("[cancel]") - c.closed = true c.buffer.PutUVarInt(proto.ClientCancel) - return c.flush() + wErr := c.flush() + // don't reuse a cancelled query as we don't drain the connection + if cErr := c.close(); cErr != nil { + return cErr + } + return wErr } diff --git a/tests/std/conn_test.go b/tests/std/conn_test.go index c54c7474ac..9f96e8ca9b 100644 --- a/tests/std/conn_test.go +++ b/tests/std/conn_test.go @@ -286,3 +286,4 @@ func TestMaxExecutionTime(t *testing.T) { }) } } +