From cf068024f1066ee391191066039d7ba2668dd3f4 Mon Sep 17 00:00:00 2001 From: shollyman Date: Fri, 31 Mar 2023 09:38:13 -0700 Subject: [PATCH] feat(bigquery/storage/managedwriter): introduce location routing header (#7663) This allows for the backend to more efficient route traffic. Normally we'd extract this from the request, but location is not part of the write identifier. --- bigquery/storage/managedwriter/client.go | 10 ++++-- bigquery/storage/managedwriter/client_test.go | 31 ++++++++++++++++++- bigquery/storage/managedwriter/connection.go | 3 +- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 41ebe352ebe4..6c57889e4022 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -28,6 +28,7 @@ import ( "github.com/google/uuid" "github.com/googleapis/gax-go/v2" "google.golang.org/api/option" + "google.golang.org/grpc/metadata" ) // DetectProjectID is a sentinel value that instructs NewClient to detect the @@ -217,7 +218,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre } // No existing pool available, create one for the location and add to shared pools. - pool, err := c.createPool(ctx, nil, streamFunc) + pool, err := c.createPool(ctx, loc, nil, streamFunc) if err != nil { return nil, err } @@ -226,13 +227,17 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre } // createPool builds a connectionPool. -func (c *Client) createPool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) { +func (c *Client) createPool(ctx context.Context, location string, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) { cCtx, cancel := context.WithCancel(ctx) if c.cfg == nil { cancel() return nil, fmt.Errorf("missing client config") } + if location != "" { + // add location header to the retained pool context. + cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location)) + } fcRequests := c.cfg.defaultInflightRequests fcBytes := c.cfg.defaultInflightBytes arOpts := c.cfg.defaultAppendRowsCallOptions @@ -250,6 +255,7 @@ func (c *Client) createPool(ctx context.Context, settings *streamSettings, strea pool := &connectionPool{ id: newUUID(poolIDPrefix), + location: location, ctx: cCtx, cancel: cancel, open: createOpenF(ctx, streamFunc), diff --git a/bigquery/storage/managedwriter/client_test.go b/bigquery/storage/managedwriter/client_test.go index 8fffbf4362df..393860919fd2 100644 --- a/bigquery/storage/managedwriter/client_test.go +++ b/bigquery/storage/managedwriter/client_test.go @@ -20,6 +20,7 @@ import ( "github.com/googleapis/gax-go/v2" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) func TestTableParentFromStreamName(t *testing.T) { @@ -53,6 +54,34 @@ func TestTableParentFromStreamName(t *testing.T) { } } +func TestCreatePool_Location(t *testing.T) { + c := &Client{ + cfg: &writerClientConfig{}, + } + pool, err := c.createPool(context.Background(), "foo", nil, nil) + if err != nil { + t.Fatalf("createPool: %v", err) + } + meta, ok := metadata.FromOutgoingContext(pool.ctx) + if !ok { + t.Fatalf("no metadata in outgoing context") + } + vals, ok := meta["x-goog-request-params"] + if !ok { + t.Fatalf("metadata key not present") + } + found := false + for _, v := range vals { + if v == "write_location=foo" { + found = true + break + } + } + if !found { + t.Fatal("expected location header not found") + } +} + // TestCreatePool tests the result of calling createPool with different combinations // of global configuration and per-writer configuration. func TestCreatePool(t *testing.T) { @@ -126,7 +155,7 @@ func TestCreatePool(t *testing.T) { c := &Client{ cfg: tc.cfg, } - got, err := c.createPool(context.Background(), tc.settings, nil) + got, err := c.createPool(context.Background(), "", tc.settings, nil) if err != nil { if !tc.wantErr { t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err) diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index 54685a873d67..c592aa6ca2db 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -43,7 +43,8 @@ var ( // The pool retains references to connections, and maintains the mapping between writers // and connections. type connectionPool struct { - id string + id string + location string // BQ region associated with this pool. // the pool retains the long-lived context responsible for opening/maintaining bidi connections. ctx context.Context