Skip to content

Commit

Permalink
Merge pull request #1739 from yichengq/230
Browse files Browse the repository at this point in the history
rafthttp: send takes raft message instead of bytes
  • Loading branch information
yichengq committed Nov 18, 2014
2 parents 6cac631 + 1a72143 commit f94ff96
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 17 deletions.
11 changes: 2 additions & 9 deletions etcdserver/sendhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,11 @@ func (h *sendHub) Send(msgs []raftpb.Message) {
continue
}

// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data, err := m.Marshal()
if err != nil {
log.Println("sender: dropping message:", err)
return // drop bad message
}
if m.Type == raftpb.MsgApp {
h.ss.SendAppendReq(len(data))
h.ss.SendAppendReq(m.Size())
}

s.Send(data)
s.Send(m)
}
}

Expand Down
3 changes: 2 additions & 1 deletion etcdserver/sendhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)

func TestSendHubInitSenders(t *testing.T) {
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestSendHubShouldStop(t *testing.T) {
t.Fatalf("received unexpected shouldstop notification")
case <-time.After(10 * time.Millisecond):
}
h.senders[1].Send([]byte("somedata"))
h.senders[1].Send(raftpb.Message{})

testutil.ForceGosched()
select {
Expand Down
9 changes: 7 additions & 2 deletions rafthttp/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"time"

"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)

const (
Expand All @@ -37,7 +39,7 @@ type Sender interface {
Update(u string)
// Send sends the data to the remote node. It is always non-blocking.
// It may be fail to send data if it returns nil error.
Send(data []byte) error
Send(m raftpb.Message) error
// Stop performs any necessary finalization and terminates the Sender
// elegantly.
Stop()
Expand Down Expand Up @@ -77,7 +79,10 @@ func (s *sender) Update(u string) {
}

// TODO (xiangli): reasonable retry logic
func (s *sender) Send(data []byte) error {
func (s *sender) Send(m raftpb.Message) error {
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data := pbutil.MustMarshal(&m)
select {
case s.q <- data:
return nil
Expand Down
11 changes: 6 additions & 5 deletions rafthttp/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)

// TestSenderSend tests that send func could post data using roundtripper
Expand All @@ -35,7 +36,7 @@ func TestSenderSend(t *testing.T) {
fs := &stats.FollowerStats{}
s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)

if err := s.Send([]byte("some data")); err != nil {
if err := s.Send(raftpb.Message{}); err != nil {
t.Fatalf("unexpect send error: %v", err)
}
s.Stop()
Expand All @@ -58,15 +59,15 @@ func TestSenderExceedMaximalServing(t *testing.T) {
// keep the sender busy and make the buffer full
// nothing can go out as we block the sender
for i := 0; i < connPerSender+senderBufSize; i++ {
if err := s.Send([]byte("some data")); err != nil {
if err := s.Send(raftpb.Message{}); err != nil {
t.Errorf("send err = %v, want nil", err)
}
// force the sender to grab data
testutil.ForceGosched()
}

// try to send a data when we are sure the buffer is full
if err := s.Send([]byte("some data")); err == nil {
if err := s.Send(raftpb.Message{}); err == nil {
t.Errorf("unexpect send success")
}

Expand All @@ -75,7 +76,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
testutil.ForceGosched()

// It could send new data after previous ones succeed
if err := s.Send([]byte("some data")); err != nil {
if err := s.Send(raftpb.Message{}); err != nil {
t.Errorf("send err = %v, want nil", err)
}
s.Stop()
Expand All @@ -87,7 +88,7 @@ func TestSenderSendFailed(t *testing.T) {
fs := &stats.FollowerStats{}
s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)

if err := s.Send([]byte("some data")); err != nil {
if err := s.Send(raftpb.Message{}); err != nil {
t.Fatalf("unexpect Send error: %v", err)
}
s.Stop()
Expand Down

0 comments on commit f94ff96

Please sign in to comment.