Skip to content

Commit

Permalink
Merge pull request #9291 from gyuho/fragment-watch
Browse files Browse the repository at this point in the history
*: fragment watch response by server request limit
  • Loading branch information
gyuho authored May 14, 2018
2 parents 0a8dd44 + 4cbfcfe commit 53373fe
Show file tree
Hide file tree
Showing 77 changed files with 848 additions and 361 deletions.
26 changes: 21 additions & 5 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- Previously, `Repair(dirpath string) bool`, now `Repair(lg *zap.Logger, dirpath string) bool`.
- Previously, `Create(dirpath string, metadata []byte) (*WAL, error)`, now `Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error)`.
- Remove [`pkg/cors` package](https://github.com/coreos/etcd/pull/9490).
- Change [`--experimental-enable-v2v3`](TODO) flag to `--enable-v2v3`; v2 storage emulation is now stable.
- Move internal package `"github.com/coreos/etcd/snap"` to [`"github.com/coreos/etcd/raftsnap"`](https://github.com/coreos/etcd/pull/9211).
- Move internal package `"github.com/coreos/etcd/etcdserver/auth"` to [`"github.com/coreos/etcd/etcdserver/v2auth"`](https://github.com/coreos/etcd/pull/9275).
- Move internal package `"github.com/coreos/etcd/error"` to [`"github.com/coreos/etcd/etcdserver/v2error"`](https://github.com/coreos/etcd/pull/9274).
- Move internal package `"github.com/coreos/etcd/store"` to [`"github.com/coreos/etcd/etcdserver/v2store"`](https://github.com/coreos/etcd/pull/9274).
- [`--experimental-enable-v2v3`](TODO) has been deprecated, `--enable-v2v3` flag is now stable.
- Move internal package `"github.com/coreos/etcd/etcdserver/auth"` to `"github.com/coreos/etcd/etcdserver/api/v2auth"`.
- Move internal package `"github.com/coreos/etcd/etcdserver/stats"` to `"github.com/coreos/etcd/etcdserver/api/v2stats"`.
- Move internal package `"github.com/coreos/etcd/error"` to `"github.com/coreos/etcd/etcdserver/api/v2error"`.
- Move internal package `"github.com/coreos/etcd/store"` to `"github.com/coreos/etcd/etcdserver/api/v2store"`.

### Dependency

Expand Down Expand Up @@ -198,8 +199,14 @@ See [security doc](https://github.com/coreos/etcd/blob/master/Documentation/op-g
### API

- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for snapshot restore/save operations (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
- Add [`watch_id` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9065), allow user-provided watch ID to `mvcc`.
- Add [`watch_id` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9065) to allow user-provided watch ID to `mvcc`.
- Corresponding `watch_id` is returned via `etcdserverpb.WatchResponse`, if any.
- Add [`fragment` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9291) to request etcd server to [split watch events](https://github.com/coreos/etcd/issues/9294) when the total size of events exceeds `--max-request-bytes` flag value plus gRPC-overhead 512 bytes.
- The default server-side request bytes limit is `embed.DefaultMaxRequestBytes` which is 1.5 MiB plus gRPC-overhead 512 bytes.
- If watch response events exceed this server-side request limit and watch request is created with `fragment` field `true`, the server will split watch events into a set of chunks, each of which is a subset of watch events below server-side request limit.
- For example, watch response contains 10 events, where each event is 1 MiB. And server `--max-request-bytes` flag value is 1 MiB. Then, server will send 10 separate fragmented events to the client.
- For example, watch response contains 5 events, where each event is 2 MiB. And server `--max-request-bytes` flag value is 1 MiB and `clientv3.Config.MaxCallRecvMsgSize` is 1 MiB. Then, server will try to send 5 separate fragmented events to the client, and the client will error with `"code = ResourceExhausted desc = grpc: received message larger than max (...)"`.
- Client must implement fragmented watch event merge (which `clientv3` does in etcd v3.4).
- Add [`raftAppliedIndex` field to `etcdserverpb.StatusResponse`](https://github.com/coreos/etcd/pull/9176) for current Raft applied index.
- Add [`errors` field to `etcdserverpb.StatusResponse`](https://github.com/coreos/etcd/pull/9206) for server-side error.
- e.g. `"etcdserver: no leader", "NOSPACE", "CORRUPT"`
Expand All @@ -223,6 +230,15 @@ See [security doc](https://github.com/coreos/etcd/blob/master/Documentation/op-g
- Add [`CLUSTER_DEBUG` to enable test cluster logging](https://github.com/coreos/etcd/pull/9678).
- Deprecated `capnslog` in integration tests.

### client v3

- Add [`WithFragment` `OpOption`](https://github.com/coreos/etcd/pull/9291) to support [watch events fragmentation](https://github.com/coreos/etcd/issues/9294) when the total size of events exceeds `--max-request-bytes` flag value plus gRPC-overhead 512 bytes.
- Watch fragmentation is disabled by default.
- The default server-side request bytes limit is `embed.DefaultMaxRequestBytes` which is 1.5 MiB plus gRPC-overhead 512 bytes.
- If watch response events exceed this server-side request limit and watch request is created with `fragment` field `true`, the server will split watch events into a set of chunks, each of which is a subset of watch events below server-side request limit.
- For example, watch response contains 10 events, where each event is 1 MiB. And server `--max-request-bytes` flag value is 1 MiB. Then, server will send 10 separate fragmented events to the client.
- For example, watch response contains 5 events, where each event is 2 MiB. And server `--max-request-bytes` flag value is 1 MiB and `clientv3.Config.MaxCallRecvMsgSize` is 1 MiB. Then, server will try to send 5 separate fragmented events to the client, and the client will error with `"code = ResourceExhausted desc = grpc: received message larger than max (...)"`.

### etcdctl v3

- Add [`check datascale`](https://github.com/coreos/etcd/pull/9185) command.
Expand Down
2 changes: 2 additions & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| filters | filters filter the events at server side before it sends back to the watcher. | (slice of) FilterType |
| prev_kv | If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned. | bool |
| watch_id | If watch_id is provided and non-zero, it will be assigned to this watcher. Since creating a watcher in etcd is not a synchronous operation, this can be used ensure that ordering is correct when creating multiple watchers on the same stream. Creating a watcher with an ID already in use on the stream will cause an error to be returned. | int64 |
| fragment | fragment enables splitting large revisions into multiple watch responses. | bool |



Expand All @@ -859,6 +860,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| canceled | canceled is set to true if the response is for a cancel watch request. No further events will be sent to the canceled watcher. | bool |
| compact_revision | compact_revision is set to the minimum index if a watcher tries to watch at a compacted index. This happens when creating a watcher at a compacted revision or the watcher cannot catch up with the progress of the key-value store. The client should treat the watcher as canceled and should not try to create any watcher with the same start_revision again. | int64 |
| cancel_reason | cancel_reason indicates the reason for canceling the watcher. | string |
| fragment | framgment is true if large watch response was split over multiple responses. | bool |
| events | | (slice of) mvccpb.Event |


Expand Down
10 changes: 10 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2359,6 +2359,11 @@
"$ref": "#/definitions/WatchCreateRequestFilterType"
}
},
"fragment": {
"description": "fragment enables splitting large revisions into multiple watch responses.",
"type": "boolean",
"format": "boolean"
},
"key": {
"description": "key is the key to register for watching.",
"type": "string",
Expand Down Expand Up @@ -2430,6 +2435,11 @@
"$ref": "#/definitions/mvccpbEvent"
}
},
"fragment": {
"description": "framgment is true if large watch response was split over multiple responses.",
"type": "boolean",
"format": "boolean"
},
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
Expand Down
123 changes: 123 additions & 0 deletions clientv3/integration/watch_fragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

// TestWatchFragmentDisable ensures that large watch
// response exceeding server-side request limit can
// arrive even without watch response fragmentation.
func TestWatchFragmentDisable(t *testing.T) {
testWatchFragment(t, false, false)
}

// TestWatchFragmentDisableWithGRPCLimit verifies
// large watch response exceeding server-side request
// limit and client-side gRPC response receive limit
// cannot arrive without watch events fragmentation,
// because multiple events exceed client-side gRPC
// response receive limit.
func TestWatchFragmentDisableWithGRPCLimit(t *testing.T) {
testWatchFragment(t, false, true)
}

// TestWatchFragmentEnable ensures that large watch
// response exceeding server-side request limit arrive
// with watch response fragmentation.
func TestWatchFragmentEnable(t *testing.T) {
testWatchFragment(t, true, false)
}

// TestWatchFragmentEnableWithGRPCLimit verifies
// large watch response exceeding server-side request
// limit and client-side gRPC response receive limit
// can arrive only when watch events are fragmented.
func TestWatchFragmentEnableWithGRPCLimit(t *testing.T) {
testWatchFragment(t, true, true)
}

// testWatchFragment triggers watch response that spans over multiple
// revisions exceeding server request limits when combined.
func testWatchFragment(t *testing.T, fragment, exceedRecvLimit bool) {
cfg := &integration.ClusterConfig{
Size: 1,
MaxRequestBytes: 1.5 * 1024 * 1024,
}
if exceedRecvLimit {
cfg.ClientMaxCallRecvMsgSize = 1.5 * 1024 * 1024
}
clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t)

cli := clus.Client(0)
errc := make(chan error)
for i := 0; i < 10; i++ {
go func(i int) {
_, err := cli.Put(context.TODO(),
fmt.Sprint("foo", i),
strings.Repeat("a", 1024*1024),
)
errc <- err
}(i)
}
for i := 0; i < 10; i++ {
if err := <-errc; err != nil {
t.Fatalf("failed to put: %v", err)
}
}

opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithRev(1)}
if fragment {
opts = append(opts, clientv3.WithFragment())
}
wch := cli.Watch(context.TODO(), "foo", opts...)

// expect 10 MiB watch response
select {
case ws := <-wch:
// without fragment, should exceed gRPC client receive limit
if !fragment && exceedRecvLimit {
if len(ws.Events) != 0 {
t.Fatalf("expected 0 events with watch fragmentation, got %d", len(ws.Events))
}
exp := "code = ResourceExhausted desc = grpc: received message larger than max ("
if !strings.Contains(ws.Err().Error(), exp) {
t.Fatalf("expected 'ResourceExhausted' error, got %v", ws.Err())
}
return
}

// still expect merged watch events
if len(ws.Events) != 10 {
t.Fatalf("expected 10 events with watch fragmentation, got %d", len(ws.Events))
}
if ws.Err() != nil {
t.Fatalf("unexpected error %v", ws.Err())
}

case <-time.After(testutil.RequestTimeout):
t.Fatalf("took too long to receive events")
}
}
36 changes: 31 additions & 5 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ const (
tTxn
)

var (
noPrefixEnd = []byte{0}
)
var noPrefixEnd = []byte{0}

// Op represents an Operation that kv can execute.
type Op struct {
Expand All @@ -53,6 +51,12 @@ type Op struct {
// for watch, put, delete
prevKV bool

// for watch
// fragmentation should be disabled by default
// if true, split watch events when total exceeds
// "--max-request-bytes" flag value + 512-byte
fragment bool

// for put
ignoreValue bool
ignoreLease bool
Expand All @@ -77,8 +81,15 @@ type Op struct {

// accessors / mutators

func (op Op) IsTxn() bool { return op.t == tTxn }
func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
// IsTxn returns true if the "Op" type is transaction.
func (op Op) IsTxn() bool {
return op.t == tTxn
}

// Txn returns the comparison(if) operations, "then" operations, and "else" operations.
func (op Op) Txn() ([]Cmp, []Op, []Op) {
return op.cmps, op.thenOps, op.elseOps
}

// KeyBytes returns the byte slice holding the Op's key.
func (op Op) KeyBytes() []byte { return op.key }
Expand Down Expand Up @@ -205,12 +216,14 @@ func (op Op) isWrite() bool {
return op.t != tRange
}

// OpGet returns "get" operation based on given key and operation options.
func OpGet(key string, opts ...OpOption) Op {
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
return ret
}

// OpDelete returns "delete" operation based on given key and operation options.
func OpDelete(key string, opts ...OpOption) Op {
ret := Op{t: tDeleteRange, key: []byte(key)}
ret.applyOpts(opts)
Expand Down Expand Up @@ -239,6 +252,7 @@ func OpDelete(key string, opts ...OpOption) Op {
return ret
}

// OpPut returns "put" operation based on given key-value and operation options.
func OpPut(key, val string, opts ...OpOption) Op {
ret := Op{t: tPut, key: []byte(key), val: []byte(val)}
ret.applyOpts(opts)
Expand Down Expand Up @@ -267,6 +281,7 @@ func OpPut(key, val string, opts ...OpOption) Op {
return ret
}

// OpTxn returns "txn" operation based on given transaction conditions.
func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
}
Expand Down Expand Up @@ -466,6 +481,17 @@ func WithPrevKV() OpOption {
}
}

// WithFragment to receive raw watch response with fragmentation.
// Fragmentation is disabled by default. If fragmentation is enabled,
// etcd watch server will split watch response before sending to clients
// when the total size of watch events exceed server-side request limit.
// The default server-side request limit is 1.5 MiB, which can be configured
// as "--max-request-bytes" flag value + gRPC-overhead 512 bytes.
// See "etcdserver/api/v3rpc/watch.go" for more details.
func WithFragment() OpOption {
return func(op *Op) { op.fragment = true }
}

// WithIgnoreValue updates the key using its current value.
// This option can not be combined with non-empty values.
// Returns an error if the key does not exist.
Expand Down
Loading

0 comments on commit 53373fe

Please sign in to comment.