Skip to content

Commit

Permalink
Avoid using connections until they are healthy. (#498)
Browse files Browse the repository at this point in the history
* Avoid using connections until they are healthy.

* Change Balancer to track the set of healthy connections.
* Added state machine to clientConnection.
* Create clientConnection as soon as a resolver returns an address.
* Do version handshake on connection before adding it to the balancer.
* Removed CallOptions.Balancer (call.Connection now has one balancer).
* Dropped unused Sharded balancer.
* Dropped some obsolete tests.
* Tweaked some tests to account for changed behavior.
* Added deadlines to some tests to make them behave better when things
  hang due to a bug.
* Undid earlier bad renaming of object to component.
* Split a large test into multiple tests.

We also disable the method call panic weavertest since it does not work.

Explanation: if a remote weavelet paniced, it's exit raced with
weavertest cleanup. If the code reading from the remote weavelet
detected the broken connection before weavertest got a chance to mark
the test as done, we would print an error message and exit the test
process. This interacted poorly with the weavertest/internal/generate
test that intentionally triggers a panic in a remote component.

To elaborate on what is going wrong, if a subprocess in a weavertest
panics, this code will error out:

```
weaver/weavertest/deployer.go Line 374 in c893d9a
err := e.Serve(handler)
```

Which leads to stopLocked being called which cancels a context:

```
weaver/weavertest/deployer.go Line 218 in c893d9a
d.ctxCancel()
```

This context is used to create all weavelets. When this context is
cancelled, the pipes between all envelopes and weavelets shut
down. The main weavelet detects this and self-terminates, causing the
test to fail even though it should pass.

```
weaver/internal/weaver/remoteweavelet.go Line 178 in c893d9a
return w.conn.Serve(w)
```
  • Loading branch information
ghemawat authored Aug 3, 2023
1 parent c893d9a commit dae5362
Show file tree
Hide file tree
Showing 10 changed files with 592 additions and 476 deletions.
1 change: 0 additions & 1 deletion godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ github.com/ServiceWeaver/weaver/internal/net/call
go.opentelemetry.io/otel/trace
golang.org/x/exp/slog
io
math/rand
net
strings
sync
Expand Down
114 changes: 52 additions & 62 deletions internal/net/call/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,63 +14,55 @@

package call

import (
"fmt"
"math/rand"
)
// ReplicaConnection is a connection to a single replica. A single Connection
// may consist of many ReplicaConnections (typically one per replica).
type ReplicaConnection interface {
// Address returns the name of the endpoint to which the ReplicaConnection
// is connected.
Address() string
}

// A Balancer picks the endpoint to which which an RPC client performs a call. A
// Balancer should only be used by a single goroutine.
// Balancer manages a set of ReplicaConnections and picks one of them per
// call. A Balancer requires external synchronization (no concurrent calls
// should be made to the same Balancer).
//
// TODO(mwhittaker): Right now, balancers have no load information about
// endpoints. In the short term, we can at least add information about the
// number of pending requests for every endpoint.
//
// TODO(mwhittaker): Right now, we pass a balancer the set of all endpoints. We
// instead probably want to pass it only the endpoints for which we have a
// connection. This means we may have to form connections more eagerly.
//
// TODO(mwhittaker): We may want to guarantee that Update() is never called
// with an empty list of addresses. If we don't have addresses, then we don't
// need to do balancing.
type Balancer interface {
// Update updates the current set of endpoints from which the Balancer can
// pick. Before Update is called for the first time, the set of endpoints
// is empty.
Update(endpoints []Endpoint)

// Pick picks an endpoint. Pick is guaranteed to return an endpoint that
// was passed to the most recent call of Update. If there are no endpoints,
// then Pick returns an error that includes Unreachable.
Pick(CallOptions) (Endpoint, error)
// Add adds a ReplicaConnection to the set of connections.
Add(ReplicaConnection)

// Remove removes a ReplicaConnection from the set of connections.
Remove(ReplicaConnection)

// Pick picks a ReplicaConnection from the set of connections.
// Pick returns _,false if no connections are available.
Pick(CallOptions) (ReplicaConnection, bool)
}

// balancerFuncImpl is the implementation of the "functional" balancer
// returned by BalancerFunc.
type balancerFuncImpl struct {
endpoints []Endpoint
pick func([]Endpoint, CallOptions) (Endpoint, error)
connList
pick func([]ReplicaConnection, CallOptions) (ReplicaConnection, bool)
}

var _ Balancer = &balancerFuncImpl{}

// BalancerFunc returns a stateless, purely functional load balancer that uses
// the provided picking function.
func BalancerFunc(pick func([]Endpoint, CallOptions) (Endpoint, error)) Balancer {
// BalancerFunc returns a stateless, purely functional load balancer that calls
// pick to pick the connection to use.
func BalancerFunc(pick func([]ReplicaConnection, CallOptions) (ReplicaConnection, bool)) Balancer {
return &balancerFuncImpl{pick: pick}
}

func (bf *balancerFuncImpl) Update(endpoints []Endpoint) {
bf.endpoints = endpoints
}

func (bf *balancerFuncImpl) Pick(opts CallOptions) (Endpoint, error) {
return bf.pick(bf.endpoints, opts)
func (bf *balancerFuncImpl) Pick(opts CallOptions) (ReplicaConnection, bool) {
return bf.pick(bf.list, opts)
}

type roundRobin struct {
endpoints []Endpoint
next int
connList
next int
}

var _ Balancer = &roundRobin{}
Expand All @@ -80,37 +72,35 @@ func RoundRobin() *roundRobin {
return &roundRobin{}
}

func (rr *roundRobin) Update(endpoints []Endpoint) {
rr.endpoints = endpoints
}

func (rr *roundRobin) Pick(CallOptions) (Endpoint, error) {
if len(rr.endpoints) == 0 {
return nil, fmt.Errorf("%w: no endpoints available", Unreachable)
func (rr *roundRobin) Pick(CallOptions) (ReplicaConnection, bool) {
if len(rr.list) == 0 {
return nil, false
}
if rr.next >= len(rr.endpoints) {
if rr.next >= len(rr.list) {
rr.next = 0
}
endpoint := rr.endpoints[rr.next]
c := rr.list[rr.next]
rr.next += 1
return endpoint, nil
return c, true
}

// Sharded returns a new sharded balancer.
//
// Given a list of n endpoints e1, ..., en, for a request with shard key k, a
// sharded balancer will pick endpoint ei where i = k mod n. If no shard key is
// provided, an endpoint is picked at random.
func Sharded() Balancer {
return BalancerFunc(func(endpoints []Endpoint, opts CallOptions) (Endpoint, error) {
n := len(endpoints)
if n == 0 {
return nil, fmt.Errorf("%w: no endpoints available", Unreachable)
}
if opts.ShardKey == 0 {
// There is no ShardKey. Pick an endpoint at random.
return endpoints[rand.Intn(n)], nil
// connList is a helper type used by balancers to maintain set of connections.
type connList struct {
list []ReplicaConnection
}

func (cl *connList) Add(c ReplicaConnection) {
cl.list = append(cl.list, c)
}

func (cl *connList) Remove(c ReplicaConnection) {
for i, elem := range cl.list {
if elem != c {
continue
}
return endpoints[opts.ShardKey%uint64(n)], nil
})
// Replace removed entry with last entry.
cl.list[i] = cl.list[len(cl.list)-1]
cl.list = cl.list[:len(cl.list)-1]
return
}
}
Loading

0 comments on commit dae5362

Please sign in to comment.