-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: when a endpoint is unavailable, switch it in retry #8327
Conversation
HardySimpson
commented
Jul 28, 2017
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs tests; dangerously broken as-is
clientv3/retry.go
Outdated
@@ -36,14 +36,22 @@ func (c *Client) newRetryWrapper() retryRpcFunc { | |||
eErr := rpctypes.Error(err) | |||
// always stop retry on etcd errors | |||
if _, ok := eErr.(rpctypes.EtcdError); ok { | |||
return err | |||
if grpc.Code(err) != codes.Unavailable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't safe. Retrying will violate at-most-once semantics for Put/Del/Txn since a request timed out
error could be returned even if the request makes it through raft. This can happen if there's a partition between the time of submitting a raft proposal and commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code changed
clientv3/balancer.go
Outdated
b.updateAddrs(allEpsWithoutPinAddr) | ||
|
||
|
||
// at most sleep 10 seconds here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depending on sleeps is slow and unreliable; the balancer probably needs to directly reason about graylisted endpoints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code also changed
I thought our balance knows how to switch endpoints on failures/timeouts already. No? |
@xiang90 no, it's still an open issue. It will only detect/switch on transport errors, not errors returned by etcd. |
isnt timeout a transport error? |
@xiang90 transport error between the client and server, not transport errors between peers |
It that is the case, then this should be done at server side. server will cut off the connection if it is partitioned. |
@xiang90 I don't think that's enough; the client could retry the same endpoint without graylisting. |
106bf59
to
0b87a29
Compare
@heyitsanthony agree to your opinion, server side switch is not enough, as server can not tell whether client's request is actually success or fail or request time out |
about previous code commet.
|
clientv3/retry.go
Outdated
// at most switch one time per 30 seconds | ||
// can be securely called by concurrently | ||
c.SwitchEndpoint() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- about the put/txn 's only-once semantic, only do switch here but not retry. keep this request fail and wish other RPC will success
@@ -44,6 +52,7 @@ func (c *Client) newRetryWrapper() retryRpcFunc { | |||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- But, if client fall into a grpc level request time out, it also violate only-once semantic and not OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are only returned before the request is sent over the wire afaik
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I search the doc, see comments below
4c29508
to
7398596
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not adequately tested
@@ -44,6 +52,7 @@ func (c *Client) newRetryWrapper() retryRpcFunc { | |||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are only returned before the request is sent over the wire afaik
clientv3/balancer.go
Outdated
|
||
b.mu.Lock() | ||
for (prevPinAddr == b.pinAddr) { | ||
b.pinAddrCond.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no way for the ctx attached to the rpc to cancel and break out of this wait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same to the previous comment, cond is relate to sb.mu
clientv3/balancer.go
Outdated
} | ||
|
||
|
||
b.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will deadlock because b.mu.Lock isn't released on the Wait; anything that tries to update pinAddrCond will wait forever to acquire the lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forget to do the cond init, add it
clientv3/retry.go
Outdated
@@ -44,6 +52,7 @@ func (c *Client) newRetryWrapper() retryRpcFunc { | |||
return err | |||
} | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't add blank lines if there's no new code attached
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, fixed
clientv3/retry.go
Outdated
// try to request another endpoint at next rpc | ||
// leave this fail | ||
// already do anti-too-frequently switch | ||
// at most switch one time per 30 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the implementation details of the balancer policy shouldn't be included in the comment at the callsite since if the balancer code changes, this comment will have to be updated (which probably won't happen)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I delete all comments, and rename to TrySwitchEndpoint, that is a better name tells what the function does
clientv3/balancer.go
Outdated
|
||
b.mu.RUnlock() | ||
|
||
if lastSwitchTime.Add(30 * time.Second).After(tNow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be endpoint graylisting; a single timeout isn't enough. Suppose there are 5 members with 2 partitioned. The client could end up bouncing between both partitioned endpoints for a while.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
graylist has one problem, what time should we put the graylisted endpoint back to noramal? in the client we don't have enough info
I think a good way it to put a counter on each endpoint, every time switch the endpoint we choose a minimal retry count endpoint to connect. And all counter will reset every time updateEndpoints happened, periodically.
Still this will change a lot of code in balancer now. and it may cause some kind of un-balance between multiple clients. after a period maybe most client will attach to a health member and leave recover-health member without clients.
The algorithm now is random choose. when a client found a good endpoint, it will stick to it. the random choose is near to balance. as once it found a health member it will stick to it. and will cause 'bouncing between both partitioned endpoints' a while and at last worked well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a more better way it to choose endpoint both random and retry-count based.
for example, we put a 100 points on every endpoint, evey time the switch happens reduce one count.
and the switch choose is to select by the weight probability, if there are 3 memeber. their points is like this:
member | point | choose probability |
---|---|---|
etcd-0 | 100 | 100/(100+80+60) = 42% |
etcd-1 | 80 | 80/(100+80+60) = 33% |
etcd-2 | 60 | 60/(100+80+60) = 25% |
if this is OK, should be implement in another PR
1a3ef27
to
fc2fcd5
Compare
about retry, I found in grpc's doc
As it mentioned, when face to a But in etcd's error now
So, as a result, should we re-classify these errors? and the retry code will be like this
|
9da2545
to
6c4e421
Compare
1e1ca75
to
8421b28
Compare
f5910ea
to
a621ee2
Compare
@heyitsanthony I tried for a week, to implement a good switch, the best I can achieve is in this PR. though I found as the balance code now is async, not sync. so can not write a stable unit test. the unit test's output now is like this.
though it works, with sb.trySwitchEndpoint(0), in a duration of 0 second, it actually does the switch. so, what's next? could you solve this problem perfectly in next version etcd? as we really need it. |
@HardySimpson I'll try to put some time into this. The balancer doesn't really need synchronous endpoint switching as much as the client should eventually switch endpoints if RPCs begin returning unavailable responses so that it can at least make forward progress. |
978a6ea
to
441c9cf
Compare
441c9cf
to
5c434bd
Compare
when a endpoint is unavailable, switch it in retry fix: etcd-io#8326
5c434bd
to
fcee836
Compare
@heyitsanthony , I have refactor some code, base on test the last pinAddr equals pinAddr now, something like graylist, and add unit test to verify the correctness. So I think maybe this PR can be merged ? |
// TrySwitchEndpoint try to make balancer change it's fix endpoint. | ||
// It may do nothing while their is only one endpoint, or switch has just happened | ||
// It is called when the endpoint now is not available, like network partition happens | ||
func (c *Client) TrySwitchEndpoint() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be a public interface; the user shouldn't have to give hints to the client to switch endpoints. Just call c.balancer.trySwitchEndpoint
in retry.go
?
@@ -237,3 +238,82 @@ func (kcl *killConnListener) close() { | |||
close(kcl.stopc) | |||
kcl.wg.Wait() | |||
} | |||
|
|||
func TestBalancerTrySwitch(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock grpc notifications this test uses could diverge from grpc behavior and the test itself is rather complicated. A clientv3/integration/kv_test.go test would be better instead since what matters is the Get failing over:
func TestKVGetFailoverPartition(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cli, err := clientv3.New(clientv3.Config{
Endpoints: clus.Client(0).Endpoints(),
DialTimeout: time.Second,
})
testutil.AssertNil(t, err)
defer cli.Close()
// force to connect to member 0
_, err = cli.Get(context.TODO(), "abc")
testutil.AssertNil(t, err)
// permit failover to 1
cli.SetEndpoints(append(cli.Endpoints(), clus.Client(1).Endpoints()...)...)
// partition member 0 from rest of cluster
clus.Members[0].InjectPartition(t, clus.Members[1:])
// may have triggered a leader election; expect timeouts
timeout := clus.Members[1].ServerConfig.ReqTimeout()
// member 0 fails to get consensus for the l-read, balancer switches
ctx, cancel := context.WithTimeout(context.TODO(), 3*timeout)
_, err = cli.Get(ctx, "abc")
cancel()
testutil.AssertNil(t, err)
}
@@ -310,7 +323,9 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) | |||
return grpc.Address{Addr: addr}, func() {}, nil | |||
} | |||
|
|||
func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh } | |||
func (b *simpleBalancer) Notify() <-chan []grpc.Address { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't change
// notify client that a connection is up | ||
b.readyOnce.Do(func() { close(b.readyc) }) | ||
b.readyOnce.Do(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't change
close(sb.downc) | ||
go sb.updateNotifyLoop() | ||
return sb | ||
} | ||
|
||
func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { return nil } | ||
func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't change
|
||
func() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deferring will deadlock
- acquire b.mu.Lock
- block on b.updateAddrsC <- struct{}{}
- updateNotifyLoop tries to acquire b.mu.RLock and deadlocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, put it outside the func()
b.pinAddrCond.Wait() | ||
} | ||
|
||
b.host2ep[prevPinAddr] = prevEp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could corrupt the balancer endpoint list.
- start trySwitchEndpoint, delete prevPinAddr "a" from the addr list
- some other goroutine calls client.SetEndpoints("b", "c")
- trySwitchEndpoint gets notification from pinAddrCond, puts prevEp back into endpoint list.
- client.Endpoints() == {"a", "b", "c"}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think this is the main reason that this implementation not good. A better way, I think, is not to modify the balancer's endpoints list, but erase the pinAddr, and then put the (endpoints - pinAddr) directly to notifyCh, wait grpc to Up() one of them. it this OK?
@HardySimpson Closing in favor of #8545. |