Skip to content

Commit

Permalink
Merge pull request #356 from ClickHouse/feat/batch-profile-and-log-op…
Browse files Browse the repository at this point in the history
…tions

feat: batch On{ProfileEvent,Log}
  • Loading branch information
ernado authored Nov 25, 2023
2 parents 6d9066b + 90590d8 commit 680b0ab
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 60 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ jobs:
- v22.3.11.12-lts
- v22.8.5.29-lts
- v22.8.20.11-lts
- v22.12.4.76-stable
- v23.2.2.20-stable
- v23.3.8.21-lts
- v23.4.6.25-stable
- v23.6.2.18-stable
- v23.7.2.25-stable
- v23.8.7.24-lts
- v23.10.3.5-stable
steps:
- uses: actions/checkout@v4

Expand Down
32 changes: 31 additions & 1 deletion proto/profile_enum.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions proto/profile_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *ProfileEvents) Result() Results {
}
}

//go:generate go run github.com/dmarkham/enumer -type ProfileEventType -trimprefix Profile -output profile_enum.go
//go:generate go run github.com/dmarkham/enumer -type ProfileEventType -trimprefix Profile -text -json -output profile_enum.go

type ProfileEventType byte

Expand All @@ -61,10 +61,10 @@ const (

// ProfileEvent is detailed profiling event from Server.
type ProfileEvent struct {
ThreadID uint64
Host string
Time time.Time
Type ProfileEventType
Name string
Value int64
Type ProfileEventType `json:"type"`
Name string `json:"name"`
Value int64 `json:"value"`
Host string `json:"host_name"`
Time time.Time `json:"current_time"`
ThreadID uint64 `json:"thread_id"`
}
14 changes: 7 additions & 7 deletions proto/server_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import "time"

// Log from server.
type Log struct {
Time time.Time
Host string
QueryID string
ThreadID uint64
Priority int8
Source string
Text string
QueryID string `json:"query_id"`
Source string `json:"source"`
Text string `json:"text"`
Time time.Time `json:"event_time"`
Host string `json:"host_name"`
ThreadID uint64 `json:"thread_id"`
Priority int8 `json:"priority"`
}

// Logs from ServerCodeLog packet.
Expand Down
100 changes: 65 additions & 35 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,19 @@ type Query struct {
// OnProfile is optional handler for profiling data.
OnProfile func(ctx context.Context, p proto.Profile) error
// OnProfileEvent is optional handler for profiling event stream data.
//
// Deprecated: use OnProfileEvents instead. This option will be removed in
// next major release.
OnProfileEvent func(ctx context.Context, e ProfileEvent) error
// OnProfileEvents is same as OnProfileEvent but is called on each event batch.
OnProfileEvents func(ctx context.Context, e []ProfileEvent) error
// OnLog is optional handler for server log entry.
//
// Deprecated: use OnLogs instead. This option will be removed in
// next major release.
OnLog func(ctx context.Context, l Log) error
// OnLogs is optional handler for server log events.
OnLogs func(ctx context.Context, l []Log) error

// Settings are optional query-scoped settings. Can override client settings.
Settings []Setting
Expand Down Expand Up @@ -306,22 +316,32 @@ func (c *Client) sendInput(ctx context.Context, info proto.ColInfoInput, q Query
if len(q.Input) == 0 {
return nil
}
// Handling input columns that require inference, e.g. enums.

// Handling input columns that require inference, e.g. enums, dates with precision, etc.
//
// Some debug structures and initializations if on debug logging level.
var inferenceColumns map[string]proto.ColumnType
inferenceDebug := c.lg.Check(zap.DebugLevel, "Inferring columns")
if inferenceDebug != nil {
inferenceColumns = make(map[string]proto.ColumnType, len(info))
}
for _, v := range info {
for _, inCol := range q.Input {
infer, ok := inCol.Data.(proto.Inferable)
if !ok || inCol.Name != v.Name {
continue
}
c.lg.Debug("Inferring column",
zap.String("column.name", v.Name),
zap.Stringer("column.type", v.Type),
)
if inferenceDebug != nil {
inferenceColumns[inCol.Name] = v.Type
}
if err := infer.Infer(v.Type); err != nil {
return errors.Wrapf(err, "infer %q", inCol.Name)
return errors.Wrapf(err, "infer %q %q", inCol.Name, v.Type)
}
}
}
if inferenceDebug != nil && len(inferenceColumns) > 0 {
inferenceDebug.Write(zap.Any("columns", inferenceColumns))
}
var (
rows = q.Input[0].Data.Rows()
f = q.OnInput
Expand Down Expand Up @@ -467,27 +487,32 @@ func (c *Client) handlePacket(ctx context.Context, p proto.ServerCode, q Query)
case proto.ServerProfileEvents:
var data proto.ProfileEvents
onResult := func(ctx context.Context, b proto.Block) error {
ce := c.lg.Check(zap.DebugLevel, "ProfileEvents")
if ce == nil && q.OnProfileEvents == nil && q.OnProfileEvent == nil {
// No handlers, skipping.
return nil
}
events, err := data.All()
if err != nil {
return errors.Wrap(err, "events")
}
for _, e := range events {
if ce := c.lg.Check(zap.DebugLevel, "ProfileEvent"); ce != nil {
ce.Write(
zap.Time("event.time", e.Time),
zap.String("event.host_name", e.Host),
zap.Uint64("event.thread_id", e.ThreadID),
zap.Stringer("event.type", e.Type),
zap.String("event.name", e.Name),
zap.Int64("event.value", e.Value),
)
if f := q.OnProfileEvents; f != nil {
if err := f(ctx, events); err != nil {
return errors.Wrap(err, "profile events")
}
if f := q.OnProfileEvent; f != nil {
}
if f := q.OnProfileEvent; f != nil {
// Deprecated.
// TODO: Remove in next major release.
for _, e := range events {
if err := f(ctx, e); err != nil {
return errors.Wrap(err, "profile event")
}
}
}
if ce != nil {
ce.Write(zap.Any("events", events))
}
return nil
}
if err := c.decodeBlock(ctx, decodeOptions{
Expand All @@ -502,19 +527,24 @@ func (c *Client) handlePacket(ctx context.Context, p proto.ServerCode, q Query)
case proto.ServerCodeLog:
var data proto.Logs
onResult := func(ctx context.Context, b proto.Block) error {
for _, l := range data.All() {
if ce := c.lg.Check(zap.DebugLevel, "Profile"); ce != nil {
ce.Write(
zap.Time("event_time", l.Time),
zap.String("host", l.Host),
zap.String("query_id", l.QueryID),
zap.Uint64("thread_id", l.ThreadID),
zap.Int8("priority", l.Priority),
zap.String("source", l.Source),
zap.String("text", l.Text),
)
ce := c.lg.Check(zap.DebugLevel, "Logs")
if ce == nil && q.OnLogs == nil && q.OnLog == nil {
// No handlers, skipping.
return nil
}
logs := data.All()
if ce != nil {
ce.Write(zap.Any("logs", logs))
}
if f := q.OnLogs; f != nil {
if err := f(ctx, logs); err != nil {
return errors.Wrap(err, "logs")
}
if f := q.OnLog; f != nil {
}
if f := q.OnLog; f != nil {
// Deprecated.
// TODO: Remove in next major release.
for _, l := range logs {
if err := f(ctx, l); err != nil {
return errors.Wrap(err, "log")
}
Expand Down Expand Up @@ -602,12 +632,12 @@ func (c *Client) Do(ctx context.Context, q Query) (err error) {
q.Result = &result
colInfo = make(chan proto.ColInfoInput, 1)
q.OnResult = func(ctx context.Context, block proto.Block) error {
c.lg.Debug("Received column info")
for _, v := range result {
c.lg.Debug("Column",
zap.String("column.name", v.Name),
zap.Stringer("column.type", v.Type),
)
if ce := c.lg.Check(zap.DebugLevel, "Received column info"); ce != nil {
info := make(map[string]proto.ColumnType, len(result))
for _, v := range result {
info[v.Name] = v.Type
}
ce.Write(zap.Any("columns", info))
}
select {
case <-ctx.Done():
Expand Down
22 changes: 19 additions & 3 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,11 +1061,17 @@ func TestClient_ServerLog(t *testing.T) {
Body: "SELECT 'foo' as s",
QueryID: qID,
OnLog: func(ctx context.Context, l Log) error {
t.Logf("Log: %s", l.Text)
logs++
assert.Equal(t, qID, l.QueryID)
return nil
},
OnLogs: func(ctx context.Context, events []Log) error {
logs += len(events)
for _, l := range events {
t.Logf("Log: %s", l.Text)
assert.Equal(t, qID, l.QueryID)
}
return nil
},
Result: proto.Results{
{
Name: "s",
Expand Down Expand Up @@ -1147,13 +1153,22 @@ func TestClient_ServerProfileEvents(t *testing.T) {
if !conn.ServerInfo().Has(proto.FeatureProfileEvents) {
t.Skip("Profile events not supported")
}
var events int
var (
events int
eventsBatch int
)
selectStr := Query{
Body: "SELECT 1",
OnProfileEvent: func(ctx context.Context, p ProfileEvent) error {
// Deprecated.
// TODO: remove
events++
return nil
},
OnProfileEvents: func(ctx context.Context, e []ProfileEvent) error {
eventsBatch += len(e)
return nil
},
Result: proto.Results{
proto.AutoResult("1"),
},
Expand All @@ -1163,6 +1178,7 @@ func TestClient_ServerProfileEvents(t *testing.T) {
if events == 0 {
t.Fatal("No profile events")
}
require.Equal(t, events, eventsBatch)
}

func TestClient_Query_Bool(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ func TestClient_Do_tracing(t *testing.T) {
require.NoError(t, conn.Do(ctx, Query{
Body: "SELECT 1",
Result: discardResult(),
OnLog: func(ctx context.Context, l Log) error {
OnLogs: func(ctx context.Context, logs []Log) error {
sc := trace.SpanContextFromContext(ctx)
traceID = sc.TraceID()
t.Log(l.Text, sc.TraceID(), sc.SpanID())
for _, l := range logs {
t.Log(l.Text, sc.TraceID(), sc.SpanID())
}
return nil
},
}))
Expand Down

0 comments on commit 680b0ab

Please sign in to comment.