Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DHT Request pipelining #92

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
33d9ca1
dht_net: pipelined requests with a pump goroutine for reading
vyzo Sep 17, 2017
db09dc6
dht_net: implement messageReceiver, address XXXs
vyzo Sep 17, 2017
cb20a6f
dht_net: limit the scope of the lock in fallback
vyzo Sep 17, 2017
9f72b80
dht_net: unlock in context completions when the pipeline stalls
vyzo Sep 17, 2017
0a07fc9
dht_net: more consistent handling of retry resets; check for streamRe…
vyzo Sep 17, 2017
b686575
dht_net: increase requestResultBuffer to 64
vyzo Sep 17, 2017
d367bf9
dht_net: more consistent event logging for dhtSendMessage; measure la…
vyzo Sep 17, 2017
9b38b31
dht_net: resetHard on invalidate
vyzo Sep 20, 2017
b2c23ed
dht_net: use context with timeout instead of timer for reads
vyzo Sep 21, 2017
0490df7
dht_net: reset stream on write errors in single message per stream fa…
vyzo Sep 21, 2017
e19692d
dht_net: denser code for messageReceiver
vyzo Sep 27, 2017
298641a
dht_test: adds tests for concurrent requests
vyzo Dec 27, 2017
995da89
dht_net: eliminate rcount contraption for stream resetting
vyzo Jan 8, 2018
67c0356
dht_net: move reset definition closer to prep for better readability
vyzo Jan 8, 2018
d41c165
dht_net: address TODO about the lock in SendMessage
vyzo Jan 8, 2018
bc50216
dht_net: clean up the log event names; dht prefix not needed.
vyzo Jan 8, 2018
bab37ad
dht_net: kill off the stream in case of read timeouts
vyzo Jan 8, 2018
a562c8f
dht_net: messageReceiver should poll for reset before unqueuing the n…
vyzo Jan 8, 2018
a5c987e
dht_net: fix reset race
vyzo Feb 9, 2018
e553dac
dht_net: reset stream on single request context completion
vyzo Feb 9, 2018
a6343b6
use delayed datastore to test pipelined requests
laser May 17, 2018
8d99629
add comments to copypasted delayed package and fix imports
laser May 17, 2018
b126c52
datastore.DiskUsage is not defined in go-datastore 1.4.1
laser May 17, 2018
dbc46ae
add timings
laser May 17, 2018
ef3f144
add flag for opting in to slow tests
laser May 17, 2018
fa1eee2
go fmt
laser May 17, 2018
9061800
symbol rename to convey intent
laser May 17, 2018
169b260
remove flag
laser May 17, 2018
93fe9f7
update autobatch to 0.2.9
laser May 30, 2018
5d7748f
upgrade go-datastore to 2.2.0
laser May 30, 2018
5df8091
add delay
laser May 31, 2018
c40fd0e
use autobatch from go-datastore
laser May 31, 2018
79de04c
increase timeout threshhold to accommodate new PutValue perf
laser May 31, 2018
dda877c
initialize stream with DHT protocols as per new API post-rebase
laser Jun 4, 2018
cb5284f
replace delayed datastore with mocknet + latency
laser Jun 5, 2018
314c5db
remove unused gx deps
Stebalien Jun 20, 2018
7578cd8
Briefly comment singleMes
bigs Aug 21, 2018
fc3cb0f
fix broken package.json
vyzo Sep 6, 2018
4737c8a
fix broken test
vyzo Sep 6, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 233 additions & 56 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -188,26 +189,34 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
}

type messageSender struct {
s inet.Stream
r ggio.ReadCloser
w bufferedWriteCloser
lk sync.Mutex
p peer.ID
dht *IpfsDHT

invalid bool
s inet.Stream
w ggio.WriteCloser
rch chan chan requestResult
rctl chan struct{}
lk sync.Mutex
p peer.ID
dht *IpfsDHT

invalid bool
// singleMes tracks the number of times a message or request has failed to
// send via this messageSender, triggering a stream reset if its limit is
// reached.
singleMes int
}

type requestResult struct {
mes *pb.Message
err error
}

const requestResultBuffer = 64

// invalidate is called before this messageSender is removed from the strmap.
// It prevents the messageSender from being reused/reinitialized and then
// forgotten (leaving the stream open).
func (ms *messageSender) invalidate() {
ms.invalid = true
if ms.s != nil {
ms.s.Reset()
ms.s = nil
}
ms.reset()
}

func (ms *messageSender) prepOrInvalidate() error {
Expand All @@ -224,6 +233,7 @@ func (ms *messageSender) prep() error {
if ms.invalid {
return fmt.Errorf("message sender has been invalidated")
}

if ms.s != nil {
return nil
}
Expand All @@ -233,33 +243,61 @@ func (ms *messageSender) prep() error {
return err
}

ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
ms.w = newBufferedDelimitedWriter(nstr)
r := ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
rch := make(chan chan requestResult, requestResultBuffer)
rctl := make(chan struct{}, 1)
go ms.messageReceiver(rch, rctl, r)

ms.rch = rch
ms.rctl = rctl
ms.w = ggio.NewDelimitedWriter(nstr)
ms.s = nstr

return nil
}

// Resets the stream and shuts down the goroutine pump
// Mutex must be locked.
func (ms *messageSender) reset() {
if ms.s != nil {
close(ms.rch)
ms.s.Reset()
ms.s = nil
}
}

func (ms *messageSender) resetStream(s inet.Stream) {
if ms.s == s {
ms.reset()
}
}

// streamReuseTries is the number of times we will try to reuse a stream to a
// given peer before giving up and reverting to the old one-message-per-stream
// behaviour.
const streamReuseTries = 3

func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
defer log.EventBegin(ctx, "SendMessage", ms.dht.self, ms.p, pmes).Done()
ms.lk.Lock()
defer ms.lk.Unlock()
retry := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this could be renamed to retried

for {
if ms.singleMes > streamReuseTries {
ms.lk.Unlock()
return ms.sendMessageSingle(ctx, pmes)
}

if err := ms.prep(); err != nil {
ms.lk.Unlock()
return err
}

if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
if err := ms.w.WriteMsg(pmes); err != nil {
ms.reset()

if retry {
log.Info("error writing message, bailing: ", err)
ms.lk.Unlock()
return err
} else {
log.Info("error writing message, trying again: ", err)
Expand All @@ -268,31 +306,37 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
}
}

log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
ms.s = nil
} else if retry {
if retry {
ms.singleMes++
if ms.singleMes > streamReuseTries {
ms.reset()
}
}

ms.lk.Unlock()
return nil
}
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
defer log.EventBegin(ctx, "SendRequest", ms.dht.self, ms.p, pmes).Done()
retry := false
for {
ms.lk.Lock()

if ms.singleMes > streamReuseTries {
ms.lk.Unlock()
return ms.sendRequestSingle(ctx, pmes)
}

if err := ms.prep(); err != nil {
ms.lk.Unlock()
return nil, err
}

if err := ms.writeMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil
if err := ms.w.WriteMsg(pmes); err != nil {
ms.reset()
ms.lk.Unlock()

if retry {
log.Info("error writing message, bailing: ", err)
Expand All @@ -304,56 +348,189 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
}
}

mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Reset()
ms.s = nil
resch := make(chan requestResult, 1)
select {
case ms.rch <- resch:
default:
// pipeline stall, log it and time it
evt := log.EventBegin(ctx, "SendRequestStall", ms.dht.self, ms.p, pmes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could use the updated go-log api and furthermore the cancellations (caught via the two ctx.Done clauses) could either FinishWithError or LogKV relevant info (e.g. "timed out" or "request cancelled")

select {
case ms.rch <- resch:
evt.Done()
case <-ctx.Done():
evt.Done()
ms.lk.Unlock()
return nil, ctx.Err()
case <-ms.dht.ctx.Done():
evt.Done()
ms.lk.Unlock()
return nil, ms.dht.ctx.Err()
}
}

rctl := ms.rctl
s := ms.s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary

wasn't considering the threaded context


ms.lk.Unlock()

rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout)
defer cancel()

var res requestResult
select {
case res = <-resch:

case <-rctx.Done():
// A read timeout will cause the entire pipeline to time out.
// So signal for a stream reset to avoid clogging subsequent requests.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not just throw away the response in this case? Throwing everything away seems like bad manners.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rationale is that if the first request has timed out, then the subsequent requests will likely also time out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, more requests might be added in the pipeline in the meantime (while the first one has timed out), potentially leading to an avalanche of timeouts; i think it's better to just flush the pipeline and start over.

select {
case <-ctx.Done():
// not a read timeout
default:
select {
case rctl <- struct{}{}:
default:
}
}

return nil, rctx.Err()

case <-ms.dht.ctx.Done():
return nil, ms.dht.ctx.Err()
}

if res.err != nil {
if retry {
log.Info("error reading message, bailing: ", err)
return nil, err
log.Info("error reading message, bailing: ", res.err)
return nil, res.err
} else {
log.Info("error reading message, trying again: ", err)
log.Info("error reading message, trying again: ", res.err)
retry = true
continue
}
}

log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
ms.s = nil
} else if retry {
if retry {
ms.lk.Lock()
ms.singleMes++
if ms.singleMes > streamReuseTries {
ms.resetStream(s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can just be a ms.reset()

wasn't considering the threaded context

}
ms.lk.Unlock()
}

return mes, nil
return res.mes, nil
}
}

func (ms *messageSender) writeMsg(pmes *pb.Message) error {
if err := ms.w.WriteMsg(pmes); err != nil {
func (ms *messageSender) sendMessageSingle(ctx context.Context, pmes *pb.Message) error {
s, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
if err != nil {
return err
}
return ms.w.Flush()
defer s.Close()

w := ggio.NewDelimitedWriter(s)

err = w.WriteMsg(pmes)
if err != nil {
s.Reset()
}

return err
}

func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
s, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
if err != nil {
return nil, err
}
defer s.Close()

r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)

if err := w.WriteMsg(pmes); err != nil {
s.Reset()
return nil, err
}

mes := new(pb.Message)

errc := make(chan error, 1)
go func(r ggio.ReadCloser) {
go func() {
errc <- r.ReadMsg(mes)
}(ms.r)
}()

t := time.NewTimer(dhtReadMessageTimeout)
defer t.Stop()
rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout)
defer cancel()

select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
case <-t.C:
return ErrReadTimeout
if err != nil {
return nil, err
}
case <-rctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should reset the stream here so as not to send an EOF (which indicates success).

Copy link
Contributor Author

@vyzo vyzo Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably, although it should happen lazily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a reset.

s.Reset()
return nil, rctx.Err()
}

return mes, nil
}

func (ms *messageSender) messageReceiver(rch chan chan requestResult, rctl chan struct{}, r ggio.ReadCloser) {
loop:
for {
select {
case <-rctl:
// poll for reset due to timeouts first, there might be requests queued
break loop

default:
select {
case next, ok := <-rch:
if !ok {
return
}

mes := new(pb.Message)
err := r.ReadMsg(mes)
if err != nil {
next <- requestResult{err: err}
break loop
} else {
next <- requestResult{mes: mes}
}

case <-rctl:
break loop

case <-ms.dht.ctx.Done():
return
}
}
}

// reset once; needs to happen in a goroutine to avoid deadlock
// in case of pipeline stalls
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a race condition here. What happens if another thread calls reset() then prep()? As far as I can tell, we'll end up resetting their stream. Worse, this issue could repeat itself forever because the new messageReceiver, spawned by that prep() call, well fail and call reset() itself (which again, may not be the same stream).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you are right, we should track which stream we are resetting -- previously, this was protected by the rcount contraption.

Copy link
Contributor Author

@vyzo vyzo Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a guarded reset version, resetStream which will only reset if it's still the same stream.

go func(s inet.Stream) {
ms.lk.Lock()
ms.resetStream(s)
ms.lk.Unlock()
}(ms.s)

// drain the pipeline
err := errors.New("Stream has been abandoned due to earlier errors")
for {
select {
case next, ok := <-rch:
if !ok {
return
}
next <- requestResult{err: err}

case <-ms.dht.ctx.Done():
return
}
}
}
Loading