Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing support #227

Merged
merged 25 commits into from
Nov 19, 2019
Merged
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
67275a6
tracing scaffolding
vyzo Nov 4, 2019
89c7ed4
trace publish
vyzo Nov 4, 2019
fd73973
add tracing to floodsub/randomsub
vyzo Nov 4, 2019
958e09a
remove useless nil check when initializing subsystem tracers
vyzo Nov 4, 2019
fb11aa9
initialize tracer with peer ID, trace RPC from join/leave announcements
vyzo Nov 11, 2019
0a25f24
trace event protobuf
vyzo Nov 11, 2019
040cabe
some minor fixes in trace pb
vyzo Nov 11, 2019
151ec25
implement tracing details
vyzo Nov 11, 2019
ae0fcc6
add traces for send/drop rpc
vyzo Nov 11, 2019
3f30acd
track topics in message tracing
vyzo Nov 11, 2019
3545acf
json tracer
vyzo Nov 12, 2019
8ff321c
move tracer implementation to its own file
vyzo Nov 12, 2019
f134d65
add protobuf file tracer
vyzo Nov 12, 2019
0aa629c
use *pb.TraceEvent as argument for Trace in the EventTracer interface
vyzo Nov 13, 2019
57ea27e
remote tracer
vyzo Nov 14, 2019
2fc5518
remote tracer: wait a second to accumulate batches
vyzo Nov 14, 2019
db8e219
remove CompressedTraceEventBatch from trace protobuf
vyzo Nov 18, 2019
abe4763
compress entire stream in remote tracer
vyzo Nov 18, 2019
cce30a4
reset remote tracer stream on errors
vyzo Nov 18, 2019
91527e2
lossy tracing for remote tracers
vyzo Nov 18, 2019
24a1181
move niling of trace buffer to the end
vyzo Nov 18, 2019
7a5aaa8
don't blanket wait for 1s to accumulate a batch.
vyzo Nov 18, 2019
40e5a49
store the remote trace peer address in the peerstore
vyzo Nov 18, 2019
cd7f42e
make tracer.Close safer
vyzo Nov 18, 2019
7065297
nits and beauty
vyzo Nov 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 38 additions & 43 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,30 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"

ggio "github.com/gogo/protobuf/io"
)

var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
var MinTraceBatchSize = 16

type basicTracer struct {
ch chan struct{}
mx sync.Mutex
buf []*pb.TraceEvent
lossy bool
ch chan struct{}
mx sync.Mutex
buf []*pb.TraceEvent
lossy bool
closed bool
}

func (t *basicTracer) Trace(evt *pb.TraceEvent) {
t.mx.Lock()
if t.closed {
t.mx.Unlock()
return
}

if t.lossy && len(t.buf) > TraceBufferSize {
log.Warningf("trace buffer overflow; dropping trace event")
} else {
Expand All @@ -45,7 +53,12 @@ func (t *basicTracer) Trace(evt *pb.TraceEvent) {
}

func (t *basicTracer) Close() {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
close(t.ch)
t.mx.Lock()
defer t.mx.Unlock()
if !t.closed {
t.closed = true
close(t.ch)
}
}

// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
Expand Down Expand Up @@ -160,12 +173,15 @@ type RemoteTracer struct {
basicTracer
ctx context.Context
host host.Host
pi peer.AddrInfo
peer peer.ID
}

// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) {
tr := &RemoteTracer{ctx: ctx, host: host, pi: pi, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
tr := &RemoteTracer{ctx: ctx, host: host, peer: pi.ID, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
for _, addr := range pi.Addrs {
host.Peerstore().AddAddr(pi.ID, addr, peerstore.PermanentAddrTTL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host.Peerstore().AddAddrs(pi.ID, pi.Addrs, ...)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(we can do them all at once)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good point; will do.

}
go tr.doWrite()
return tr, nil
}
Expand All @@ -187,15 +203,17 @@ func (t *RemoteTracer) doWrite() {
for {
_, ok := <-t.ch

// nil out the buffer to gc events when swapping buffers
for i := range buf {
buf[i] = nil
}

// wait a bit to accumulate a batch
time.Sleep(time.Second)
// deadline for batch accumulation
deadline := time.Now().Add(time.Second)

again:
t.mx.Lock()
if len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) {
t.mx.Unlock()
time.Sleep(100 * time.Millisecond)
goto again
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for loop?

t.mx.Lock()
for len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) { 
  t.mx.Unlock()
  time.Sleep(100 * time.Millisecond)
  t.mx.Lock()
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the alergy to gotos strikes again! Ok, I'll write it as a for loop.


tmp := t.buf
t.buf = buf[:0]
buf = tmp
Expand All @@ -220,6 +238,11 @@ func (t *RemoteTracer) doWrite() {
}

end:
// nil out the buffer to gc consumed events
for i := range buf {
buf[i] = nil
}

if !ok {
if err != nil {
s.Reset()
Expand All @@ -243,38 +266,10 @@ func (t *RemoteTracer) doWrite() {
}
}

func (t *RemoteTracer) connect() error {
for {
ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
err := t.host.Connect(ctx, t.pi)
cancel()
if err != nil {
if t.ctx.Err() != nil {
return err
}

// wait a minute and try again, to account for transient server downtime
select {
case <-time.After(time.Minute):
continue
case <-t.ctx.Done():
return t.ctx.Err()
}
}

return nil
}
}

func (t *RemoteTracer) openStream() (network.Stream, error) {
for {
err := t.connect()
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
s, err := t.host.NewStream(ctx, t.pi.ID, RemoteTracerProtoID)
s, err := t.host.NewStream(ctx, t.peer, RemoteTracerProtoID)
cancel()
if err != nil {
if t.ctx.Err() != nil {
Expand Down