|
19 | 19 | package grpc
|
20 | 20 |
|
21 | 21 | import (
|
| 22 | + "context" |
22 | 23 | "fmt"
|
23 | 24 | "strings"
|
24 | 25 | "sync"
|
25 | 26 |
|
26 | 27 | "google.golang.org/grpc/balancer"
|
| 28 | + "google.golang.org/grpc/codes" |
27 | 29 | "google.golang.org/grpc/connectivity"
|
28 | 30 | "google.golang.org/grpc/internal/balancer/gracefulswitch"
|
29 | 31 | "google.golang.org/grpc/internal/buffer"
|
30 | 32 | "google.golang.org/grpc/internal/channelz"
|
31 | 33 | "google.golang.org/grpc/internal/grpcsync"
|
32 | 34 | "google.golang.org/grpc/resolver"
|
| 35 | + "google.golang.org/grpc/status" |
33 | 36 | )
|
34 | 37 |
|
35 | 38 | // ccBalancerWrapper sits between the ClientConn and the Balancer.
|
@@ -305,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
|
305 | 308 | channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
|
306 | 309 | return nil, err
|
307 | 310 | }
|
308 |
| - acbw := &acBalancerWrapper{ac: ac} |
| 311 | + acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} |
309 | 312 | acbw.ac.mu.Lock()
|
310 | 313 | ac.acbw = acbw
|
311 | 314 | acbw.ac.mu.Unlock()
|
@@ -359,8 +362,9 @@ func (ccb *ccBalancerWrapper) Target() string {
|
359 | 362 | // acBalancerWrapper is a wrapper on top of ac for balancers.
|
360 | 363 | // It implements balancer.SubConn interface.
|
361 | 364 | type acBalancerWrapper struct {
|
362 |
| - mu sync.Mutex |
363 |
| - ac *addrConn |
| 365 | + mu sync.Mutex |
| 366 | + ac *addrConn |
| 367 | + producers map[balancer.ProducerBuilder]*refCountedProducer |
364 | 368 | }
|
365 | 369 |
|
366 | 370 | func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
|
@@ -414,3 +418,64 @@ func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
|
414 | 418 | defer acbw.mu.Unlock()
|
415 | 419 | return acbw.ac
|
416 | 420 | }
|
| 421 | + |
| 422 | +var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected") |
| 423 | + |
| 424 | +// NewStream begins a streaming RPC on the addrConn. If the addrConn is not |
| 425 | +// ready, returns errSubConnNotReady. |
| 426 | +func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { |
| 427 | + transport := acbw.ac.getReadyTransport() |
| 428 | + if transport == nil { |
| 429 | + return nil, errSubConnNotReady |
| 430 | + } |
| 431 | + return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...) |
| 432 | +} |
| 433 | + |
| 434 | +// Invoke performs a unary RPC. If the addrConn is not ready, returns |
| 435 | +// errSubConnNotReady. |
| 436 | +func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error { |
| 437 | + cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...) |
| 438 | + if err != nil { |
| 439 | + return err |
| 440 | + } |
| 441 | + if err := cs.SendMsg(args); err != nil { |
| 442 | + return err |
| 443 | + } |
| 444 | + return cs.RecvMsg(reply) |
| 445 | +} |
| 446 | + |
| 447 | +type refCountedProducer struct { |
| 448 | + producer balancer.Producer |
| 449 | + refs int // number of current refs to the producer |
| 450 | + close func() // underlying producer's close function |
| 451 | +} |
| 452 | + |
| 453 | +func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { |
| 454 | + acbw.mu.Lock() |
| 455 | + defer acbw.mu.Unlock() |
| 456 | + |
| 457 | + // Look up existing producer from this builder. |
| 458 | + pData := acbw.producers[pb] |
| 459 | + if pData == nil { |
| 460 | + // Not found; create a new one and add it to the producers map. |
| 461 | + p, close := pb.Build(acbw) |
| 462 | + pData = &refCountedProducer{producer: p, close: close} |
| 463 | + acbw.producers[pb] = pData |
| 464 | + } |
| 465 | + // Account for this new reference. |
| 466 | + pData.refs++ |
| 467 | + |
| 468 | + // Return a cleanup function wrapped in a OnceFunc to remove this reference |
| 469 | + // and delete the refCountedProducer from the map if the total reference |
| 470 | + // count goes to zero. |
| 471 | + unref := func() { |
| 472 | + acbw.mu.Lock() |
| 473 | + pData.refs-- |
| 474 | + if pData.refs == 0 { |
| 475 | + defer pData.close() // Run outside the acbw mutex |
| 476 | + delete(acbw.producers, pb) |
| 477 | + } |
| 478 | + acbw.mu.Unlock() |
| 479 | + } |
| 480 | + return pData.producer, grpcsync.OnceFunc(unref) |
| 481 | +} |
0 commit comments