-
Notifications
You must be signed in to change notification settings - Fork 9.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fragment Watch Reponse Messages #8371
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
// Copyright 2017 The etcd Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package integration | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
"testing" | ||
|
||
"github.com/coreos/etcd/clientv3" | ||
"github.com/coreos/etcd/integration" | ||
"github.com/coreos/etcd/mvcc/mvccpb" | ||
"github.com/coreos/etcd/pkg/testutil" | ||
"github.com/coreos/etcd/proxy/grpcproxy" | ||
"github.com/coreos/etcd/proxy/grpcproxy/adapter" | ||
) | ||
|
||
func TestResponseWithoutFragmenting(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's not clear to me what these two tests are trying to do. It seems like both should have a big delete; one test checks it combines correctly and another checks it returns fragments if requested. |
||
defer testutil.AfterTest(t) | ||
// MaxResponseBytes will overflow to 1000 once the grpcOverheadBytes, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be done in a way that doesn't rely on overflow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think another way would be to turn the |
||
// which have a value of 512 * 1024, are added to MaxResponseBytes. | ||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, MaxResponseBytes: ^uint(0) - (512*1024 - 1 - 1000)}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
defer clus.Terminate(t) | ||
|
||
cfg := clientv3.Config{ | ||
Endpoints: []string{ | ||
clus.Members[0].GRPCAddr(), | ||
clus.Members[1].GRPCAddr(), | ||
clus.Members[2].GRPCAddr(), | ||
}, | ||
} | ||
cli, err := clientv3.New(cfg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't make a special client if it's not needed; just use clus.Client(0) |
||
|
||
cli.SetEndpoints(clus.Members[0].GRPCAddr()) | ||
firstLease, err := clus.Client(0).Grant(context.Background(), 10000) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
||
// Create and register watch proxy. | ||
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to create a proxy here; the tag |
||
wc := adapter.WatchServerToWatchClient(wp) | ||
w := clientv3.NewWatchFromWatchClient(wc) | ||
|
||
kv := clus.Client(0) | ||
for i := 0; i < 10; i++ { | ||
_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID)) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
} | ||
|
||
// Does not include the clientv3.WithFragmentedResponse option. | ||
wChannel := w.Watch(context.TODO(), "foo", clientv3.WithRange("z")) | ||
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of using a lease to bulk delete, `Delete(context.TODO(), "foo", clientv3.WithPrefix())`` would be clearer / less code |
||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
||
r, ok := <-wChannel | ||
if !ok { | ||
t.Error() | ||
} | ||
keyDigitSum := 0 | ||
responseSum := 0 | ||
if len(r.Events) != 10 { | ||
t.Errorf("Expected 10 events, got %d\n", len(r.Events)) | ||
} | ||
for i := 0; i < 10; i++ { | ||
if r.Events[i].Type != mvccpb.DELETE { | ||
t.Errorf("Expected DELETE event, got %d", r.Events[i].Type) | ||
} | ||
keyDigitSum += i | ||
digit, err := strconv.Atoi((string(r.Events[i].Kv.Key)[3:])) | ||
if err != nil { | ||
t.Error("Failed to convert %s to int", (string(r.Events[i].Kv.Key)[3:])) | ||
} | ||
responseSum += digit | ||
} | ||
if keyDigitSum != responseSum { | ||
t.Errorf("Expected digits of keys received in the response to sum to %d, but got %d\n", keyDigitSum, responseSum) | ||
} | ||
} | ||
|
||
func TestFragmenting(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
defer testutil.AfterTest(t) | ||
// MaxResponseBytes will overflow to 1000 once the grpcOverheadBytes, | ||
// which have a value of 512 * 1024, are added to MaxResponseBytes. | ||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, MaxResponseBytes: ^uint(0) - (512*1024 - 1 - 1000)}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
defer clus.Terminate(t) | ||
|
||
cfg := clientv3.Config{ | ||
Endpoints: []string{ | ||
clus.Members[0].GRPCAddr(), | ||
clus.Members[1].GRPCAddr(), | ||
clus.Members[2].GRPCAddr(), | ||
}, | ||
} | ||
cli, err := clientv3.New(cfg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just use clus.Client(0)? |
||
|
||
cli.SetEndpoints(clus.Members[0].GRPCAddr()) | ||
firstLease, err := clus.Client(0).Grant(context.Background(), 10000) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
||
// Create and register watch proxy | ||
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0)) | ||
wc := adapter.WatchServerToWatchClient(wp) | ||
w := clientv3.NewWatchFromWatchClient(wc) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can remove proxy stuff since picked up by cluster_proxy |
||
|
||
kv := clus.Client(0) | ||
for i := 0; i < 100; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably do this concurrently; 100 puts is a little pricey There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the flag to concurrently put? |
||
_, err = kv.Put(context.TODO(), fmt.Sprintf("foo%d", i), "bar", clientv3.WithLease(firstLease.ID)) | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
} | ||
wChannel := w.Watch(context.TODO(), "foo", clientv3.WithRange("z"), clientv3.WithFragmentedResponse()) | ||
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if err != nil { | ||
t.Error(err) | ||
} | ||
|
||
r, ok := <-wChannel | ||
if !ok { | ||
t.Error() | ||
} | ||
keyDigitSum := 0 | ||
responseSum := 0 | ||
if len(r.Events) != 100 { | ||
t.Errorf("Expected 100 events, got %d\n", len(r.Events)) | ||
} | ||
for i := 0; i < 100; i++ { | ||
if r.Events[i].Type != mvccpb.DELETE { | ||
t.Errorf("Expected DELETE event, got %d", r.Events[i].Type) | ||
} | ||
keyDigitSum += i | ||
digit, err := strconv.Atoi((string(r.Events[i].Kv.Key)[3:])) | ||
if err != nil { | ||
t.Error("Failed to convert %s to int", (string(r.Events[i].Kv.Key)[3:])) | ||
} | ||
responseSum += digit | ||
} | ||
if keyDigitSum != responseSum { | ||
t.Errorf("Expected digits of keys received in the response to sum to %d, but got %d\n", keyDigitSum, responseSum) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,15 @@ type Op struct { | |
cmps []Cmp | ||
thenOps []Op | ||
elseOps []Op | ||
|
||
// fragmentResponse allows watch clients to toggle whether to send | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put these below |
||
// watch responses that are too large to send over a rpc stream in fragments. | ||
// Sending in fragments is an opt-in feature in order to preserve compatibility | ||
// with older clients that can not handle responses being split into fragments. | ||
fragmentResponse bool | ||
// combineFragments indicates whether watch clients should combine | ||
// fragments or relay the watch response in fragmented form. | ||
combineFragments bool | ||
} | ||
|
||
// accesors / mutators | ||
|
@@ -415,6 +424,22 @@ func WithCreatedNotify() OpOption { | |
} | ||
} | ||
|
||
// WithFragmentedResponse makes the watch server send watch responses | ||
// that are too large to send over the rpc stream in fragments. | ||
// The fragmenting feature is an opt-in feature in order to maintain | ||
// compatibility with older versions of clients. | ||
func WithFragmentedResponse() OpOption { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can remove this since the client here can default to fragmenting. If it's an old server, it will ignore the flag and default to non-fragmenting. For backwards compatibility, it only needs to be optional on the server side. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little confused, what's the point of the fragment flag if I remove the code to toggle the flag? Won't it always be true? |
||
return func(op *Op) { op.fragmentResponse = true } | ||
} | ||
|
||
// WithFragments allows watch clients to passively relay their receieved | ||
// fragmented watch responses. | ||
// This option is handy for the watch client's belonging to watch proxies, | ||
// because these watch clients can be set to passively send along fragments | ||
// rather than reassembling and then fragmenting them such that they can be | ||
// sent to the user's watch client. | ||
func WithFragments() OpOption { return func(op *Op) { op.combineFragments = true } } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would the use case be for having the client return raw watch responses to the user be? I was under the impression that |
||
|
||
// WithFilterPut discards PUT events from the watcher. | ||
func WithFilterPut() OpOption { | ||
return func(op *Op) { op.filterPut = true } | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,10 @@ type WatchResponse struct { | |
|
||
// cancelReason is a reason of canceling watch | ||
cancelReason string | ||
|
||
// MoreFragments indicates that more fragments composing one large | ||
// watch fragment are expected. | ||
MoreFragments bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just |
||
} | ||
|
||
// IsCreate returns true if the event tells that the key is newly created. | ||
|
@@ -145,6 +149,10 @@ type watchGrpcStream struct { | |
resumec chan struct{} | ||
// closeErr is the error that closed the watch stream | ||
closeErr error | ||
|
||
// combineFragments indicates whether watch clients should combine | ||
// fragments or relay the watch response in fragmented form. | ||
combineFragments bool | ||
} | ||
|
||
// watchRequest is issued by the subscriber to start a new watcher | ||
|
@@ -157,6 +165,9 @@ type watchRequest struct { | |
createdNotify bool | ||
// progressNotify is for progress updates | ||
progressNotify bool | ||
// fragmentResponse allows watch clients to toggle whether to send | ||
// watch responses that are too large to send over a rpc stream in fragments. | ||
fragmentResponse bool | ||
// filters is the list of events to filter out | ||
filters []pb.WatchCreateRequest_FilterType | ||
// get the previous key-value pair before the event happens | ||
|
@@ -241,15 +252,16 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch | |
} | ||
|
||
wr := &watchRequest{ | ||
ctx: ctx, | ||
createdNotify: ow.createdNotify, | ||
key: string(ow.key), | ||
end: string(ow.end), | ||
rev: ow.rev, | ||
progressNotify: ow.progressNotify, | ||
filters: filters, | ||
prevKV: ow.prevKV, | ||
retc: make(chan chan WatchResponse, 1), | ||
ctx: ctx, | ||
createdNotify: ow.createdNotify, | ||
key: string(ow.key), | ||
end: string(ow.end), | ||
rev: ow.rev, | ||
progressNotify: ow.progressNotify, | ||
fragmentResponse: ow.fragmentResponse, | ||
filters: filters, | ||
prevKV: ow.prevKV, | ||
retc: make(chan chan WatchResponse, 1), | ||
} | ||
|
||
ok := false | ||
|
@@ -267,6 +279,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch | |
wgs := w.streams[ctxKey] | ||
if wgs == nil { | ||
wgs = w.newWatcherGrpcStream(ctx) | ||
wgs.combineFragments = ow.combineFragments | ||
w.streams[ctxKey] = wgs | ||
} | ||
donec := wgs.donec | ||
|
@@ -424,7 +437,7 @@ func (w *watchGrpcStream) run() { | |
} | ||
|
||
cancelSet := make(map[int64]struct{}) | ||
|
||
var fragmentsToSend *pb.WatchResponse | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
for { | ||
select { | ||
// Watch() requested | ||
|
@@ -450,12 +463,26 @@ func (w *watchGrpcStream) run() { | |
} | ||
// New events from the watch client | ||
case pbresp := <-w.respc: | ||
if w.combineFragments { | ||
fragmentsToSend = pbresp | ||
} else { | ||
if fragmentsToSend == nil || fragmentsToSend.WatchId != pbresp.WatchId { | ||
fragmentsToSend = pbresp | ||
} else { | ||
fragmentsToSend.Events = append(fragmentsToSend.Events, pbresp.Events...) | ||
fragmentsToSend.MoreFragments = pbresp.MoreFragments | ||
} | ||
if fragmentsToSend.MoreFragments { | ||
break | ||
} | ||
} | ||
switch { | ||
case pbresp.Created: | ||
// response to head of queue creation | ||
if ws := w.resuming[0]; ws != nil { | ||
w.addSubstream(pbresp, ws) | ||
w.dispatchEvent(pbresp) | ||
w.addSubstream(fragmentsToSend, ws) | ||
w.dispatchEvent(fragmentsToSend) | ||
fragmentsToSend = nil | ||
w.resuming[0] = nil | ||
} | ||
if ws := w.nextResume(); ws != nil { | ||
|
@@ -470,7 +497,8 @@ func (w *watchGrpcStream) run() { | |
} | ||
default: | ||
// dispatch to appropriate watch stream | ||
if ok := w.dispatchEvent(pbresp); ok { | ||
if ok := w.dispatchEvent(fragmentsToSend); ok { | ||
fragmentsToSend = nil | ||
break | ||
} | ||
// watch response on unexpected watch id; cancel id | ||
|
@@ -537,6 +565,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { | |
Created: pbresp.Created, | ||
Canceled: pbresp.Canceled, | ||
cancelReason: pbresp.CancelReason, | ||
MoreFragments: pbresp.MoreFragments, | ||
} | ||
ws, ok := w.substreams[pbresp.WatchId] | ||
if !ok { | ||
|
@@ -784,12 +813,13 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) | |
// toPB converts an internal watch request structure to its protobuf messagefunc (wr *watchRequest) | ||
func (wr *watchRequest) toPB() *pb.WatchRequest { | ||
req := &pb.WatchCreateRequest{ | ||
StartRevision: wr.rev, | ||
Key: []byte(wr.key), | ||
RangeEnd: []byte(wr.end), | ||
ProgressNotify: wr.progressNotify, | ||
Filters: wr.filters, | ||
PrevKv: wr.prevKV, | ||
StartRevision: wr.rev, | ||
Key: []byte(wr.key), | ||
RangeEnd: []byte(wr.end), | ||
ProgressNotify: wr.progressNotify, | ||
FragmentResponse: wr.fragmentResponse, | ||
Filters: wr.filters, | ||
PrevKv: wr.prevKV, | ||
} | ||
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} | ||
return &pb.WatchRequest{RequestUnion: cr} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be part of clientv3/integration/watch_test.go