@@ -16,18 +16,16 @@ package executor
16
16
import (
17
17
"context"
18
18
"encoding/json"
19
+ "sort"
19
20
"time"
20
21
21
22
"github.com/opentracing/basictracer-go"
22
23
"github.com/opentracing/opentracing-go"
23
24
"github.com/pingcap/errors"
24
25
"github.com/pingcap/parser/ast"
25
- "github.com/pingcap/tidb/planner"
26
- plannercore "github.com/pingcap/tidb/planner/core"
27
26
"github.com/pingcap/tidb/sessionctx"
28
27
"github.com/pingcap/tidb/util/chunk"
29
28
"github.com/pingcap/tidb/util/sqlexec"
30
- "github.com/pingcap/tidb/util/tracing"
31
29
"sourcegraph.com/sourcegraph/appdash"
32
30
traceImpl "sourcegraph.com/sourcegraph/appdash/opentracing"
33
31
)
@@ -55,103 +53,63 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
55
53
if e .exhausted {
56
54
return nil
57
55
}
58
-
59
- if e .format == "json" {
60
- if se , ok := e .ctx .(sqlexec.SQLExecutor ); ok {
61
- store := appdash .NewMemoryStore ()
62
- tracer := traceImpl .NewTracer (store )
63
- span := tracer .StartSpan ("trace" )
64
- defer span .Finish ()
65
- ctx = opentracing .ContextWithSpan (ctx , span )
66
- recordSets , err := se .Execute (ctx , e .stmtNode .Text ())
67
- if err != nil {
68
- return errors .Trace (err )
69
- }
70
-
71
- for _ , rs := range recordSets {
72
- _ , err = drainRecordSet (ctx , e .ctx , rs )
73
- if err != nil {
74
- return errors .Trace (err )
75
- }
76
- if err = rs .Close (); err != nil {
77
- return errors .Trace (err )
78
- }
79
- }
80
-
81
- traces , err := store .Traces (appdash.TracesOpts {})
82
- if err != nil {
83
- return errors .Trace (err )
84
- }
85
- data , err := json .Marshal (traces )
86
- if err != nil {
87
- return errors .Trace (err )
88
- }
89
-
90
- // Split json data into rows to avoid the max packet size limitation.
91
- const maxRowLen = 4096
92
- for len (data ) > maxRowLen {
93
- chk .AppendString (0 , string (data [:maxRowLen ]))
94
- data = data [maxRowLen :]
95
- }
96
- chk .AppendString (0 , string (data ))
97
- }
56
+ se , ok := e .ctx .(sqlexec.SQLExecutor )
57
+ if ! ok {
98
58
e .exhausted = true
99
59
return nil
100
60
}
101
61
102
- // TODO: If the following code is never used, remove it later.
103
- // record how much time was spent for optimizeing plan
104
- optimizeSp := e .rootTrace .Tracer ().StartSpan ("plan_optimize" , opentracing .FollowsFrom (e .rootTrace .Context ()))
105
- stmtPlan , err := planner .Optimize (e .builder .ctx , e .stmtNode , e .builder .is )
62
+ store := appdash .NewMemoryStore ()
63
+ tracer := traceImpl .NewTracer (store )
64
+ span := tracer .StartSpan ("trace" )
65
+ defer span .Finish ()
66
+ ctx = opentracing .ContextWithSpan (ctx , span )
67
+ recordSets , err := se .Execute (ctx , e .stmtNode .Text ())
106
68
if err != nil {
107
- return err
69
+ return errors . Trace ( err )
108
70
}
109
- optimizeSp .Finish ()
110
71
111
- pp , ok := stmtPlan .(plannercore.PhysicalPlan )
112
- if ! ok {
113
- return errors .New ("cannot cast logical plan to physical plan" )
72
+ for _ , rs := range recordSets {
73
+ _ , err = drainRecordSet (ctx , e .ctx , rs )
74
+ if err != nil {
75
+ return errors .Trace (err )
76
+ }
77
+ if err = rs .Close (); err != nil {
78
+ return errors .Trace (err )
79
+ }
114
80
}
115
81
116
- // append select executor to trace executor
117
- stmtExec := e .builder .build (pp )
118
-
119
- e .rootTrace = tracing .NewRecordedTrace ("trace_exec" , func (sp basictracer.RawSpan ) {
120
- e .CollectedSpans = append (e .CollectedSpans , sp )
121
- })
122
- err = stmtExec .Open (ctx )
82
+ traces , err := store .Traces (appdash.TracesOpts {})
123
83
if err != nil {
124
84
return errors .Trace (err )
125
85
}
126
- stmtExecChk := stmtExec .newFirstChunk ()
127
86
128
- // store span into context
129
- ctx = opentracing .ContextWithSpan (ctx , e .rootTrace )
130
-
131
- for {
132
- if err := stmtExec .Next (ctx , stmtExecChk ); err != nil {
133
- return errors .Trace (err )
134
- }
135
- if stmtExecChk .NumRows () == 0 {
136
- break
87
+ // Row format.
88
+ if e .format != "json" {
89
+ if len (traces ) < 1 {
90
+ e .exhausted = true
91
+ return nil
137
92
}
93
+ trace := traces [0 ]
94
+ sortTraceByStartTime (trace )
95
+ dfsTree (trace , "" , false , chk )
96
+ e .exhausted = true
97
+ return nil
138
98
}
139
99
140
- e .rootTrace .LogKV ("event" , "tracing completed" )
141
- e .rootTrace .Finish ()
142
- var rootSpan basictracer.RawSpan
143
-
144
- treeSpans := make (map [uint64 ][]basictracer.RawSpan )
145
- for _ , sp := range e .CollectedSpans {
146
- treeSpans [sp .ParentSpanID ] = append (treeSpans [sp .ParentSpanID ], sp )
147
- // if a span's parentSpanID is 0, then it is root span
148
- // this is by design
149
- if sp .ParentSpanID == 0 {
150
- rootSpan = sp
151
- }
100
+ // Json format.
101
+ data , err := json .Marshal (traces )
102
+ if err != nil {
103
+ return errors .Trace (err )
152
104
}
153
105
154
- dfsTree (rootSpan , treeSpans , "" , false , chk )
106
+ // Split json data into rows to avoid the max packet size limitation.
107
+ const maxRowLen = 4096
108
+ for len (data ) > maxRowLen {
109
+ chk .AppendString (0 , string (data [:maxRowLen ]))
110
+ data = data [maxRowLen :]
111
+ }
112
+ chk .AppendString (0 , string (data ))
155
113
e .exhausted = true
156
114
return nil
157
115
}
@@ -173,14 +131,34 @@ func drainRecordSet(ctx context.Context, sctx sessionctx.Context, rs sqlexec.Rec
173
131
}
174
132
}
175
133
176
- func dfsTree (span basictracer.RawSpan , tree map [uint64 ][]basictracer.RawSpan , prefix string , isLast bool , chk * chunk.Chunk ) {
177
- suffix := ""
178
- spans := tree [span .Context .SpanID ]
179
- var newPrefix string
180
- if span .ParentSpanID == 0 {
181
- newPrefix = prefix
134
+ type sortByStartTime []* appdash.Trace
135
+
136
+ func (t sortByStartTime ) Len () int { return len (t ) }
137
+ func (t sortByStartTime ) Less (i , j int ) bool {
138
+ return getStartTime (t [j ]).After (getStartTime (t [i ]))
139
+ }
140
+ func (t sortByStartTime ) Swap (i , j int ) { t [i ], t [j ] = t [j ], t [i ] }
141
+
142
+ func getStartTime (trace * appdash.Trace ) (t time.Time ) {
143
+ if e , err := trace .TimespanEvent (); err == nil {
144
+ t = e .Start ()
145
+ }
146
+ return
147
+ }
148
+
149
+ func sortTraceByStartTime (trace * appdash.Trace ) {
150
+ sort .Sort (sortByStartTime (trace .Sub ))
151
+ for _ , t := range trace .Sub {
152
+ sortTraceByStartTime (t )
153
+ }
154
+ }
155
+
156
+ func dfsTree (t * appdash.Trace , prefix string , isLast bool , chk * chunk.Chunk ) {
157
+ var newPrefix , suffix string
158
+ if len (prefix ) == 0 {
159
+ newPrefix = prefix + " "
182
160
} else {
183
- if len ( tree [ span . ParentSpanID ]) > 0 && ! isLast {
161
+ if ! isLast {
184
162
suffix = "├─"
185
163
newPrefix = prefix + "│ "
186
164
} else {
@@ -189,11 +167,19 @@ func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, pr
189
167
}
190
168
}
191
169
192
- chk .AppendString (0 , prefix + suffix + span .Operation )
193
- chk .AppendString (1 , span .Start .Format (time .StampNano ))
194
- chk .AppendString (2 , span .Duration .String ())
170
+ var start time.Time
171
+ var duration time.Duration
172
+ if e , err := t .TimespanEvent (); err == nil {
173
+ start = e .Start ()
174
+ end := e .End ()
175
+ duration = end .Sub (start )
176
+ }
177
+
178
+ chk .AppendString (0 , prefix + suffix + t .Span .Name ())
179
+ chk .AppendString (1 , start .Format ("15:04:05.000000" ))
180
+ chk .AppendString (2 , duration .String ())
195
181
196
- for i , sp := range spans {
197
- dfsTree (sp , tree , newPrefix , i == (len (spans ))- 1 /*last element of array*/ , chk )
182
+ for i , sp := range t . Sub {
183
+ dfsTree (sp , newPrefix , i == (len (t . Sub ))- 1 /*last element of array*/ , chk )
198
184
}
199
185
}
0 commit comments