Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fragment Watch Reponse Messages #8371

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions clientv3/integration/fragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2017 The etcd Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be part of clientv3/integration/watch_test.go

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"github.com/etcd/proxy/grpcproxy"
"github.com/etcd/proxy/grpcproxy/adapter"
)

// TestFragmentationStopsAfterServerFailure tests the edge case where either
// the server of watch proxy fails to send a message due to errors not related to
// the fragment size, such as a member failure. In that case,
// the server or watch proxy should continue to reduce the message size (a default
// action choosen because there is no way of telling whether the error caused
// by the send operation is message size related or caused by some other issue)
// until the message size becomes zero and thus we are certain that the message
// size is not the issue causing the send operation to fail.
func TestFragmentationStopsAfterServerFailure(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
},
}
cli, err := clientv3.New(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't make a special client if it's not needed; just use clus.Client(0)


cli.SetEndpoints(clus.Members[0].GRPCAddr())
firstLease, err := clus.Client(0).Grant(context.Background(), 10000)
if err != nil {
t.Error(err)
}

kv := clus.Client(0)
for i := 0; i < 25; i++ {
_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID))
if err != nil {
t.Error(err)
}
}

kv.Watch(context.TODO(), "foo", clientv3.WithRange("z"))
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using a lease to bulk delete, `Delete(context.TODO(), "foo", clientv3.WithPrefix())`` would be clearer / less code

if err != nil {
t.Error(err)
}
clus.Members[0].Stop(t)
time.Sleep(10 * time.Second)
log.Fatal("Printed the log")
}

// TestFragmentingWithOverlappingWatchers tests that events are fragmented
// on the server and watch proxy and pieced back together on the client-side
// properly.
func TestFragmentingWithOverlappingWatchers(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
},
}
cli, err := clientv3.New(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use clus.Client(0)?


cli.SetEndpoints(clus.Members[0].GRPCAddr())
firstLease, err := clus.Client(0).Grant(context.Background(), 10000)
if err != nil {
t.Error(err)
}
secondLease, err := clus.Client(0).Grant(context.Background(), 10000)
if err != nil {
t.Error(err)
}

// Create and register watch proxy
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0))
wc := adapter.WatchServerToWatchClient(wp)
w := clientv3.NewWatchFromWatchClient(wc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove proxy stuff since picked up by cluster_proxy


kv := clus.Client(0)
for i := 0; i < 25; i++ {
_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID))
if err != nil {
t.Error(err)
}
_, err = kv.Put(context.TODO(), fmt.Sprintf("buzz%d", i), "fizz", clientv3.WithLease(secondLease.ID))
if err != nil {
t.Error(err)
}
}

w.Watch(context.TODO(), "foo", clientv3.WithRange("z"))
w.Watch(context.TODO(), "buzz", clientv3.WithRange("z "))

_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete(context.TODO(), "foo", clientv3.WithPrefix())

if err != nil {
t.Error(err)
}
_, err = clus.Client(0).Revoke(context.Background(), secondLease.ID)
if err != nil {
t.Error(err)
}

// Wait for the revokation process to finish
time.Sleep(10 * time.Second)
log.Fatal("Printed the log")

}
20 changes: 17 additions & 3 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (w *watchGrpcStream) run() {
}

cancelSet := make(map[int64]struct{})

var combinedFragments *pb.WatchResponse
for {
select {
// Watch() requested
Expand All @@ -450,11 +450,24 @@ func (w *watchGrpcStream) run() {
}
// New events from the watch client
case pbresp := <-w.respc:
fmt.Printf("Clientv3/watcher - watchid: %d, count: %d, frag: %d, events: %v\n", pbresp.WatchId, pbresp.FragmentCount, pbresp.CurrFragment, pbresp.Events)
if combinedFragments == nil || (combinedFragments.WatchId != pbresp.WatchId && combinedFragments.CurrFragment+1 != pbresp.CurrFragment) {
combinedFragments = pbresp
} else {
combinedFragments.Events = append(combinedFragments.Events, pbresp.Events...)
combinedFragments.CurrFragment = pbresp.CurrFragment
}
if combinedFragments.FragmentCount != combinedFragments.CurrFragment {
fmt.Printf("Waiting for more fragments ...\n")
break
}
fmt.Println("All fragments receieved\n")
switch {
case pbresp.Created:
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.addSubstream(combinedFragments, ws)
combinedFragments = nil
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
Expand All @@ -470,7 +483,8 @@ func (w *watchGrpcStream) run() {
}
default:
// dispatch to appropriate watch stream
if ok := w.dispatchEvent(pbresp); ok {
if ok := w.dispatchEvent(combinedFragments); ok {
combinedFragments = nil
break
}
// watch response on unexpected watch id; cancel id
Expand Down
61 changes: 58 additions & 3 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package v3rpc

import (
"fmt"
"io"
"math"
"sync"
"time"

Expand Down Expand Up @@ -320,7 +322,6 @@ func (sws *serverWatchStream) sendLoop() {
}
}
}

wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wresp.Revision),
WatchId: int64(wresp.WatchID),
Expand All @@ -336,8 +337,35 @@ func (sws *serverWatchStream) sendLoop() {
}

mvcc.ReportEventReceived(len(evs))
if err := sws.gRPCStream.Send(wr); err != nil {
return
// The grpc package's defaultServerMaxSendMessageSize, defaultServerMaxReceiveMessageSize
// and the maxSendMesgSize are set to lower numbers in order to demonstrate
// the fragmenting.
maxEventsPerMsg := 20
watchRespSent := false
for !watchRespSent {
// There isn't a reasonable way of deciphering the cause of the send error.
// One solution that comes to mind is comparing the string error message
// to the expected string for a grpc message indicating that the
// message size is too large, but this approach would fail if grpc
// were to change their error message semantics in the future. The
// approach here is to assume that the error is caused by the message
// being too large and incrementally increasing the number of fragments
// until the maxEventsPerMesg becomes 0 and we are therefore sure
// that the error in the send operation was not caused by the message
// size.
if maxEventsPerMsg == 0 {
return
}
for _, fragment := range FragmentWatchResponse(maxEventsPerMsg, wr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the WatchCreateRequest protobuf should have bool fragment; field to optionally enable this feature; if it's not opt-in then older clients will have trouble with revisions being split across responses

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll have to think about how to implement this. Do you think a map between the watchId and fragment boolean that would be stored in the serverWatchStream would be a good idea? It seems like once the WatchCreateRequest reaches the serverWatchStream's recvLoop the fragment boolean has to be stored somewhere rather than passed along to the watch request on the mvcc backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds OK

if err := sws.gRPCStream.Send(fragment); err != nil {
fmt.Printf("Was about to size out. Setting max to %d from %d\n %v\n", maxEventsPerMsg/2, maxEventsPerMsg, err)
maxEventsPerMsg /= 2
watchRespSent = false
break
}
fmt.Printf("etcdserver - watchid: %d, count: %d, frag: %d, events: %v\n", fragment.WatchId, fragment.FragmentCount, fragment.CurrFragment, fragment.Events)
watchRespSent = true
}
}

sws.mu.Lock()
Expand Down Expand Up @@ -388,6 +416,33 @@ func (sws *serverWatchStream) sendLoop() {
}
}

func FragmentWatchResponse(maxSendMesgSize int, wr *pb.WatchResponse) []*pb.WatchResponse {
var fragmentWrs []*pb.WatchResponse
totalFragments := int64(math.Ceil(float64(len(wr.Events)) / float64(maxSendMesgSize)))
currFragmentCount := 1
for i := 0; i < len(wr.Events); i += maxSendMesgSize {
eventRangeEnd := i + maxSendMesgSize
if eventRangeEnd > len(wr.Events) {
eventRangeEnd = len(wr.Events)
}
wresp := &pb.WatchResponse{
Header: wr.Header,
WatchId: int64(wr.WatchId),
Events: wr.Events[i:eventRangeEnd],
CompactRevision: wr.CompactRevision,
FragmentCount: int64(totalFragments),
CurrFragment: int64(currFragmentCount),
}
currFragmentCount++
fmt.Printf("Apending: total: %d, curr: %d\n", wresp.FragmentCount, wresp.CurrFragment)
fragmentWrs = append(fragmentWrs, wresp)
}
if len(fragmentWrs) == 0 {
return []*pb.WatchResponse{wr}
}
return fragmentWrs
}

func (sws *serverWatchStream) close() {
sws.watchStream.Close()
close(sws.closec)
Expand Down
Loading