-
Notifications
You must be signed in to change notification settings - Fork 9.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fragment Watch Reponse Messages #8371
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
// Copyright 2017 The etcd Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package integration | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"testing" | ||
"time" | ||
|
||
"github.com/coreos/etcd/clientv3" | ||
"github.com/coreos/etcd/integration" | ||
"github.com/coreos/etcd/pkg/testutil" | ||
"github.com/etcd/proxy/grpcproxy" | ||
"github.com/etcd/proxy/grpcproxy/adapter" | ||
) | ||
|
||
// TestFragmentationStopsAfterServerFailure tests the edge case where either | ||
// the server of watch proxy fails to send a message due to errors not related to | ||
// the fragment size, such as a member failure. In that case, | ||
// the server or watch proxy should continue to reduce the message size (a default | ||
// action choosen because there is no way of telling whether the error caused | ||
// by the send operation is message size related or caused by some other issue) | ||
// until the message size becomes zero and thus we are certain that the message | ||
// size is not the issue causing the send operation to fail. | ||
func TestFragmentationStopsAfterServerFailure(t *testing.T) { | ||
defer testutil.AfterTest(t) | ||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) | ||
defer clus.Terminate(t) | ||
|
||
cfg := clientv3.Config{ | ||
Endpoints: []string{ | ||
clus.Members[0].GRPCAddr(), | ||
clus.Members[1].GRPCAddr(), | ||
clus.Members[2].GRPCAddr(), | ||
}, | ||
} | ||
cli, err := clientv3.New(cfg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't make a special client if it's not needed; just use clus.Client(0) |
||
|
||
cli.SetEndpoints(clus.Members[0].GRPCAddr()) | ||
firstLease, err := clus.Client(0).Grant(context.Background(), 10000) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
||
kv := clus.Client(0) | ||
for i := 0; i < 25; i++ { | ||
_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID)) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
} | ||
|
||
kv.Watch(context.TODO(), "foo", clientv3.WithRange("z")) | ||
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of using a lease to bulk delete, `Delete(context.TODO(), "foo", clientv3.WithPrefix())`` would be clearer / less code |
||
if err != nil { | ||
t.Error(err) | ||
} | ||
clus.Members[0].Stop(t) | ||
time.Sleep(10 * time.Second) | ||
log.Fatal("Printed the log") | ||
} | ||
|
||
// TestFragmentingWithOverlappingWatchers tests that events are fragmented | ||
// on the server and watch proxy and pieced back together on the client-side | ||
// properly. | ||
func TestFragmentingWithOverlappingWatchers(t *testing.T) { | ||
defer testutil.AfterTest(t) | ||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) | ||
defer clus.Terminate(t) | ||
|
||
cfg := clientv3.Config{ | ||
Endpoints: []string{ | ||
clus.Members[0].GRPCAddr(), | ||
clus.Members[1].GRPCAddr(), | ||
clus.Members[2].GRPCAddr(), | ||
}, | ||
} | ||
cli, err := clientv3.New(cfg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just use clus.Client(0)? |
||
|
||
cli.SetEndpoints(clus.Members[0].GRPCAddr()) | ||
firstLease, err := clus.Client(0).Grant(context.Background(), 10000) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
secondLease, err := clus.Client(0).Grant(context.Background(), 10000) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
||
// Create and register watch proxy | ||
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0)) | ||
wc := adapter.WatchServerToWatchClient(wp) | ||
w := clientv3.NewWatchFromWatchClient(wc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can remove proxy stuff since picked up by cluster_proxy |
||
|
||
kv := clus.Client(0) | ||
for i := 0; i < 25; i++ { | ||
_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID)) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
_, err = kv.Put(context.TODO(), fmt.Sprintf("buzz%d", i), "fizz", clientv3.WithLease(secondLease.ID)) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
} | ||
|
||
w.Watch(context.TODO(), "foo", clientv3.WithRange("z")) | ||
w.Watch(context.TODO(), "buzz", clientv3.WithRange("z ")) | ||
|
||
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if err != nil { | ||
t.Error(err) | ||
} | ||
_, err = clus.Client(0).Revoke(context.Background(), secondLease.ID) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
||
// Wait for the revokation process to finish | ||
time.Sleep(10 * time.Second) | ||
log.Fatal("Printed the log") | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,9 @@ | |
package v3rpc | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"math" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -320,7 +322,6 @@ func (sws *serverWatchStream) sendLoop() { | |
} | ||
} | ||
} | ||
|
||
wr := &pb.WatchResponse{ | ||
Header: sws.newResponseHeader(wresp.Revision), | ||
WatchId: int64(wresp.WatchID), | ||
|
@@ -336,8 +337,35 @@ func (sws *serverWatchStream) sendLoop() { | |
} | ||
|
||
mvcc.ReportEventReceived(len(evs)) | ||
if err := sws.gRPCStream.Send(wr); err != nil { | ||
return | ||
// The grpc package's defaultServerMaxSendMessageSize, defaultServerMaxReceiveMessageSize | ||
// and the maxSendMesgSize are set to lower numbers in order to demonstrate | ||
// the fragmenting. | ||
maxEventsPerMsg := 20 | ||
watchRespSent := false | ||
for !watchRespSent { | ||
// There isn't a reasonable way of deciphering the cause of the send error. | ||
// One solution that comes to mind is comparing the string error message | ||
// to the expected string for a grpc message indicating that the | ||
// message size is too large, but this approach would fail if grpc | ||
// were to change their error message semantics in the future. The | ||
// approach here is to assume that the error is caused by the message | ||
// being too large and incrementally increasing the number of fragments | ||
// until the maxEventsPerMesg becomes 0 and we are therefore sure | ||
// that the error in the send operation was not caused by the message | ||
// size. | ||
if maxEventsPerMsg == 0 { | ||
return | ||
} | ||
for _, fragment := range FragmentWatchResponse(maxEventsPerMsg, wr) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I'll have to think about how to implement this. Do you think a map between the watchId and fragment boolean that would be stored in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that sounds OK |
||
if err := sws.gRPCStream.Send(fragment); err != nil { | ||
fmt.Printf("Was about to size out. Setting max to %d from %d\n %v\n", maxEventsPerMsg/2, maxEventsPerMsg, err) | ||
maxEventsPerMsg /= 2 | ||
watchRespSent = false | ||
break | ||
} | ||
fmt.Printf("etcdserver - watchid: %d, count: %d, frag: %d, events: %v\n", fragment.WatchId, fragment.FragmentCount, fragment.CurrFragment, fragment.Events) | ||
watchRespSent = true | ||
} | ||
} | ||
|
||
sws.mu.Lock() | ||
|
@@ -388,6 +416,33 @@ func (sws *serverWatchStream) sendLoop() { | |
} | ||
} | ||
|
||
func FragmentWatchResponse(maxSendMesgSize int, wr *pb.WatchResponse) []*pb.WatchResponse { | ||
var fragmentWrs []*pb.WatchResponse | ||
totalFragments := int64(math.Ceil(float64(len(wr.Events)) / float64(maxSendMesgSize))) | ||
currFragmentCount := 1 | ||
for i := 0; i < len(wr.Events); i += maxSendMesgSize { | ||
eventRangeEnd := i + maxSendMesgSize | ||
if eventRangeEnd > len(wr.Events) { | ||
eventRangeEnd = len(wr.Events) | ||
} | ||
wresp := &pb.WatchResponse{ | ||
Header: wr.Header, | ||
WatchId: int64(wr.WatchId), | ||
Events: wr.Events[i:eventRangeEnd], | ||
CompactRevision: wr.CompactRevision, | ||
FragmentCount: int64(totalFragments), | ||
CurrFragment: int64(currFragmentCount), | ||
} | ||
currFragmentCount++ | ||
fmt.Printf("Apending: total: %d, curr: %d\n", wresp.FragmentCount, wresp.CurrFragment) | ||
fragmentWrs = append(fragmentWrs, wresp) | ||
} | ||
if len(fragmentWrs) == 0 { | ||
return []*pb.WatchResponse{wr} | ||
} | ||
return fragmentWrs | ||
} | ||
|
||
func (sws *serverWatchStream) close() { | ||
sws.watchStream.Close() | ||
close(sws.closec) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be part of clientv3/integration/watch_test.go