Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reuse streams for dht messaging #2817

Merged
merged 3 commits into from
Jun 8, 2016
Merged

Conversation

whyrusleeping
Copy link
Member

DHT messages are pretty small, and the overhead of creating a new stream and sending multistream headers over it is fairly significant. If we can do it, reusing streams for multiple messages is a great way to save bandwidth.

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
@Kubuxu
Copy link
Member

Kubuxu commented Jun 7, 2016

Isn't it a breaking change?

@whyrusleeping
Copy link
Member Author

@Kubuxu nope :) It tries to reuse streams, if it can, it does, if sending a message fails, it opens a new stream and tries to send the message again. If that retry succeeds, then we mark that and if it happens a certain number of times, we stop trying to reuse streams with that peer

@Kubuxu
Copy link
Member

Kubuxu commented Jun 7, 2016

Oh, didn't notice that right away (skimmed the change and asked the question).

Let me build it and check that it works just to be sure.

@Kubuxu
Copy link
Member

Kubuxu commented Jun 7, 2016

OK, LGTM

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
@whyrusleeping
Copy link
Member Author

@Kubuxu could you test out this branch for a bit? (just use this as your local ipfs node) I'm seeing a couple weird issues, but i'm not convinced they arent in master or caused by other bad nodes on the network

@Kubuxu
Copy link
Member

Kubuxu commented Jun 7, 2016

I've run it for a moment and it seemed OK but I will replace my system node with it just to check.

@whyrusleeping
Copy link
Member Author

Two day integration test has been great for me and @Kubuxu, lets ship it!

@whyrusleeping whyrusleeping merged commit 0e81124 into master Jun 8, 2016
@whyrusleeping whyrusleeping deleted the feat/dht-reuse-stream branch June 8, 2016 17:08

if err := ms.w.WriteMsg(pmes); err != nil {
return err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this area is a bit error prone for those who may come in here to edit. AFAICT, the call graph is:

ms.SendMessage AND ms.SendRequest
  -> ms.prep
  -> ms.writeMessage
    -> ms.w.WriteMsg
    -> ms.prep
    -> ms.w.WriteMsg

With stream being set to nil in write message. i would imagine this being a bit safer to edits:

ms.SendMessage AND ms.SendRequest
  -> ms.writeMessage
    -> for (tries = 2)
      -> ms.prep
      -> ms.w.WriteMsg

Basically, push the prep call down into writeMessage and reframe it a bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this could work. warning: have not tested.

// make sure it's always cleared exactly the same way
func (ms *messageSender) clearStream() {
  ms.s.Close()
  ms.s = nil
}

func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
  ms.lk.Lock()
  defer ms.lk.Unlock()

  if err := ms.writeMessage(pmes); err != nil {
    return err
  }

  if ms.singleMes > streamReuseTries {
    ms.clearStream()
  }

  return nil
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
  ms.lk.Lock()
  defer ms.lk.Unlock()

  if err := ms.writeMessage(pmes); err != nil {
    return nil, err
  }

  log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

  mes := new(pb.Message)
  if err := ms.r.ReadMsg(mes); err != nil {
    ms.clearStream()
    return nil, err
  }

  if ms.singleMes > streamReuseTries {
    ms.clearStream()
  }

  return mes, nil
}

func (ms *messageSender) writeMessage(pmes *pb.Message) error {

  var didRetrySending = false

  // alternatively can use a for (i := 0; i < 2; i++) { ... }. but this seemed cleaner
trySending:

  if err := ms.prep(); err != nil {
    return err
  }

  if err := ms.w.WriteMsg(pmes); err != nil {

    // If the other side isnt expecting us to be reusing streams, we're gonna
    // end up erroring here. To make sure things work seamlessly, lets retry once
    // before continuing
    if !didRetrySending {
      log.Infof("error writing message: ", err)
      ms.clearStream()
      didRetrySending = true
      goto trySending
    }

    return err
  }

  if didRetrySending { // and succeeded the second time.
    // keep track of this happening. If it happens a few times, its
    // likely we can assume the otherside will never support stream reuse
    ms.singleMes++
  }

  return nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could maybe push this block into the beginning of writeMessage too:

if ms.singleMes > streamReuseTries {
    ms.clearStream()
}

But i'm not sure if leaving the stream hanging for a while like that is bad. probably, so i left it as is.

ariescodescream pushed a commit to ariescodescream/go-ipfs that referenced this pull request Oct 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants