Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): more advanced routing (#7587)
Browse files Browse the repository at this point in the history
  • Loading branch information
shollyman authored Mar 30, 2023
1 parent 3c4d7cf commit c24f0a1
Show file tree
Hide file tree
Showing 10 changed files with 1,040 additions and 219 deletions.
84 changes: 42 additions & 42 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,21 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
// Close releases resources held by the client.
func (c *Client) Close() error {

// TODO: Consider if we should actively close pools on client close.
// The underlying client closing will cause future calls to the underlying client
// to fail terminally, but long-lived calls like an open AppendRows stream may remain
// viable for some time.
//
// If we do want to actively close pools, we need to maintain a list of the singleton pools
// as well as the regional pools.
c.rawClient.Close()
return nil
// Shutdown the per-region pools.
c.mu.Lock()
defer c.mu.Unlock()
var firstErr error
for _, pool := range c.pools {
if err := pool.Close(); err != nil && firstErr == nil {
firstErr = err
}
}

// Close the underlying client stub.
if err := c.rawClient.Close(); err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}

// NewManagedStream establishes a new managed stream for appending data into a table.
Expand Down Expand Up @@ -155,17 +161,12 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
writer.streamSettings.streamID = streamName
}
}
// Resolve behavior for connectionPool interactions. In the multiplex case we use shared pools, in the
// default case we setup a connectionPool per writer, and that pool gets a single connection instance.
mode := simplexConnectionMode
if c.cfg != nil && c.cfg.useMultiplex {
mode = multiplexConnectionMode
}
pool, err := c.resolvePool(ctx, writer.streamSettings, streamFunc, newSimpleRouter(mode))
// we maintain a pool per region, and attach all exclusive and multiplex writers to that pool.
pool, err := c.resolvePool(ctx, writer.streamSettings, streamFunc)
if err != nil {
return nil, err
}
// Add the pool to the router, and set it's context based on the owning pool.
// Add the writer to the pool, and derive context from the pool.
if err := pool.addWriter(writer); err != nil {
return nil, err
}
Expand Down Expand Up @@ -202,32 +203,30 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
return nil
}

// resolvePool either returns an existing connectionPool (in the case of multiplex), or returns a new pool.
func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc, router poolRouter) (*connectionPool, error) {
// resolvePool either returns an existing connectionPool, or returns a new pool if this is the first writer in a given region.
func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.cfg.useMultiplex && canMultiplex(settings.streamID) {
resp, err := c.getWriteStream(ctx, settings.streamID, false)
if err != nil {
return nil, err
}
loc := resp.GetLocation()
if pool, ok := c.pools[loc]; ok {
return pool, nil
}
// No existing pool available, create one for the location and add to shared pools.
pool, err := c.createPool(ctx, nil, streamFunc, router, true)
if err != nil {
return nil, err
}
c.pools[loc] = pool
resp, err := c.getWriteStream(ctx, settings.streamID, false)
if err != nil {
return nil, err
}
loc := resp.GetLocation()
if pool, ok := c.pools[loc]; ok {
return pool, nil
}
return c.createPool(ctx, settings, streamFunc, router, false)

// No existing pool available, create one for the location and add to shared pools.
pool, err := c.createPool(ctx, nil, streamFunc)
if err != nil {
return nil, err
}
c.pools[loc] = pool
return pool, nil
}

// createPool builds a connectionPool.
func (c *Client) createPool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc, router poolRouter, allowMultipleWriters bool) (*connectionPool, error) {
func (c *Client) createPool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
cCtx, cancel := context.WithCancel(ctx)

if c.cfg == nil {
Expand All @@ -250,13 +249,14 @@ func (c *Client) createPool(ctx context.Context, settings *streamSettings, strea
}

pool := &connectionPool{
ctx: cCtx,
allowMultipleWriters: allowMultipleWriters,
cancel: cancel,
open: createOpenF(ctx, streamFunc),
callOptions: arOpts,
baseFlowController: newFlowController(fcRequests, fcBytes),
id: newUUID(poolIDPrefix),
ctx: cCtx,
cancel: cancel,
open: createOpenF(ctx, streamFunc),
callOptions: arOpts,
baseFlowController: newFlowController(fcRequests, fcBytes),
}
router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize)
if err := pool.activateRouter(router); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestCreatePool(t *testing.T) {
c := &Client{
cfg: tc.cfg,
}
got, err := c.createPool(context.Background(), tc.settings, nil, newSimpleRouter(""), false)
got, err := c.createPool(context.Background(), tc.settings, nil)
if err != nil {
if !tc.wantErr {
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
Expand Down
169 changes: 55 additions & 114 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,8 @@ var (
//
// The pool retains references to connections, and maintains the mapping between writers
// and connections.
//
// TODO: connection and writer mappings will be added in a subsequent PR.
type connectionPool struct {
id string
allowMultipleWriters bool // whether this pool can be used by multiple writers.
id string

// the pool retains the long-lived context responsible for opening/maintaining bidi connections.
ctx context.Context
Expand Down Expand Up @@ -118,12 +115,6 @@ func (pool *connectionPool) removeWriter(writer *ManagedStream) error {
return errNoRouterForPool
}
detachErr := pool.router.writerDetach(writer)
// trigger single-writer pool closure regardless of detach errors
if !pool.allowMultipleWriters {
if err := pool.Close(); detachErr == nil {
detachErr = err
}
}
return detachErr
}

Expand Down Expand Up @@ -192,6 +183,9 @@ type connection struct {
reconnect bool //
err error // terminal connection error
pending chan *pendingWrite

loadBytesThreshold int
loadCountThreshold int
}

type connectionMode string
Expand All @@ -212,16 +206,39 @@ func newConnection(pool *connectionPool, mode connectionMode) *connection {
if pool != nil {
fc = copyFlowController(pool.baseFlowController)
}
countLimit, byteLimit := computeLoadThresholds(fc)

return &connection{
id: newUUID(connIDPrefix),
pool: pool,
fc: fc,
ctx: connCtx,
cancel: cancel,
optimizer: optimizer(mode),
id: newUUID(connIDPrefix),
pool: pool,
fc: fc,
ctx: connCtx,
cancel: cancel,
optimizer: optimizer(mode),
loadBytesThreshold: byteLimit,
loadCountThreshold: countLimit,
}
}

func computeLoadThresholds(fc *flowController) (countLimit, byteLimit int) {
countLimit = 1000
byteLimit = 0
if fc != nil {
if fc.maxInsertBytes > 0 {
// 20% of byte limit
byteLimit = int(float64(fc.maxInsertBytes) * 0.2)
}
if fc.maxInsertCount > 0 {
// MIN(1, 20% of insert limit)
countLimit = int(float64(fc.maxInsertCount) * 0.2)
if countLimit < 1 {
countLimit = 1
}
}
}
return
}

func optimizer(mode connectionMode) sendOptimizer {
switch mode {
case multiplexConnectionMode:
Expand All @@ -239,6 +256,28 @@ func (co *connection) release(pw *pendingWrite) {
co.fc.release(pw.reqSize)
}

// signal indicating that multiplex traffic level is high enough to warrant adding more connections.
func (co *connection) isLoaded() bool {
if co.loadCountThreshold > 0 && co.fc.count() > co.loadCountThreshold {
return true
}
if co.loadBytesThreshold > 0 && co.fc.bytes() > co.loadBytesThreshold {
return true
}
return false
}

// curLoad is a representation of connection load.
// Its primary purpose is comparing the load of different connections.
func (co *connection) curLoad() float64 {
load := float64(co.fc.count()) / float64(co.loadCountThreshold+1)
if co.fc.maxInsertBytes > 0 {
load += (float64(co.fc.bytes()) / float64(co.loadBytesThreshold+1))
load = load / 2
}
return load
}

// close closes a connection.
func (co *connection) close() {
co.mu.Lock()
Expand Down Expand Up @@ -440,101 +479,3 @@ func connRecvProcessor(co *connection, arc storagepb.BigQueryWrite_AppendRowsCli
}
}
}

type poolRouter interface {

// poolAttach is called once to signal a router that it is responsible for a given pool.
poolAttach(pool *connectionPool) error

// poolDetach is called as part of clean connectionPool shutdown.
// It provides an opportunity for the router to shut down internal state.
poolDetach() error

// writerAttach is a hook to notify the router that a new writer is being attached to the pool.
// It provides an opportunity for the router to allocate resources and update internal state.
writerAttach(writer *ManagedStream) error

// writerAttach signals the router that a given writer is being removed from the pool. The router
// does not have responsibility for closing the writer, but this is called as part of writer close.
writerDetach(writer *ManagedStream) error

// pickConnection is used to select a connection for a given pending write.
pickConnection(pw *pendingWrite) (*connection, error)
}

// simpleRouter is a primitive traffic router that routes all traffic to its single connection instance.
//
// This router is designed for our migration case, where an single ManagedStream writer has as 1:1 relationship
// with a connectionPool. You can multiplex with this router, but it will never scale beyond a single connection.
type simpleRouter struct {
mode connectionMode
pool *connectionPool

mu sync.RWMutex
conn *connection
writers map[string]struct{}
}

func (rtr *simpleRouter) poolAttach(pool *connectionPool) error {
if rtr.pool == nil {
rtr.pool = pool
return nil
}
return fmt.Errorf("router already attached to pool %q", rtr.pool.id)
}

func (rtr *simpleRouter) poolDetach() error {
rtr.mu.Lock()
defer rtr.mu.Unlock()
if rtr.conn != nil {
rtr.conn.close()
rtr.conn = nil
}
return nil
}

func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error {
if writer.id == "" {
return fmt.Errorf("writer has no ID")
}
rtr.mu.Lock()
defer rtr.mu.Unlock()
rtr.writers[writer.id] = struct{}{}
if rtr.conn == nil {
rtr.conn = newConnection(rtr.pool, rtr.mode)
}
return nil
}

func (rtr *simpleRouter) writerDetach(writer *ManagedStream) error {
if writer.id == "" {
return fmt.Errorf("writer has no ID")
}
rtr.mu.Lock()
defer rtr.mu.Unlock()
delete(rtr.writers, writer.id)
if len(rtr.writers) == 0 && rtr.conn != nil {
// no attached writers, cleanup and remove connection.
defer rtr.conn.close()
rtr.conn = nil
}
return nil
}

// Picking a connection is easy; there's only one.
func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {
rtr.mu.RLock()
defer rtr.mu.RUnlock()
if rtr.conn != nil {
return rtr.conn, nil
}
return nil, fmt.Errorf("no connection available")
}

func newSimpleRouter(mode connectionMode) *simpleRouter {
return &simpleRouter{
// We don't add a connection until writers attach.
mode: mode,
writers: make(map[string]struct{}),
}
}
44 changes: 0 additions & 44 deletions bigquery/storage/managedwriter/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,47 +389,3 @@ func TestConnection_Receiver(t *testing.T) {
cancel()
}
}

func TestSimpleRouter(t *testing.T) {

ctx := context.Background()

pool := &connectionPool{
ctx: ctx,
open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return &testAppendRowsClient{}, nil
},
}

router := newSimpleRouter("")
if err := pool.activateRouter(router); err != nil {
t.Errorf("activateRouter: %v", err)
}

ms := &ManagedStream{
ctx: ctx,
retry: newStatelessRetryer(),
}

pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")

// picking before attaching should yield error
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
writer := &ManagedStream{
id: "writer",
}
if err := pool.addWriter(writer); err != nil {
t.Errorf("addWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err != nil {
t.Errorf("pickConnection error: %v", err)
}
if err := pool.removeWriter(writer); err != nil {
t.Errorf("disconnectWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
}
Loading

0 comments on commit c24f0a1

Please sign in to comment.