diff --git a/bigtable/admin.go b/bigtable/admin.go index c5666e51e36c..d0f03664cdee 100644 --- a/bigtable/admin.go +++ b/bigtable/admin.go @@ -75,7 +75,7 @@ func (ac *AdminClient) instancePrefix() string { // Tables returns a list of the tables in the instance. func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) { - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.ListTablesRequest{ Parent: prefix, @@ -94,7 +94,7 @@ func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) { // CreateTable creates a new table in the instance. // This method may return before the table's creation is complete. func (ac *AdminClient) CreateTable(ctx context.Context, table string) error { - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.CreateTableRequest{ Parent: prefix, @@ -114,7 +114,7 @@ func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, sp for _, split := range split_keys { req_splits = append(req_splits, &btapb.CreateTableRequest_Split{[]byte(split)}) } - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.CreateTableRequest{ Parent: prefix, @@ -128,7 +128,7 @@ func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, sp // CreateColumnFamily creates a new column family in a table. func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error { // TODO(dsymonds): Permit specifying gcexpr and any other family settings. - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.ModifyColumnFamiliesRequest{ Name: prefix + "/tables/" + table, @@ -143,7 +143,7 @@ func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family str // DeleteTable deletes a table and all of its data. func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error { - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.DeleteTableRequest{ Name: prefix + "/tables/" + table, @@ -154,7 +154,7 @@ func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error { // DeleteColumnFamily deletes a column family in a table and all of its data. func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error { - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.ModifyColumnFamiliesRequest{ Name: prefix + "/tables/" + table, @@ -174,7 +174,7 @@ type TableInfo struct { // TableInfo retrieves information about a table. func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) { - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.GetTableRequest{ Name: prefix + "/tables/" + table, @@ -194,7 +194,7 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, // GC executes opportunistically in the background; table reads may return data // matching the GC policy. func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error { - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.ModifyColumnFamiliesRequest{ Name: prefix + "/tables/" + table, @@ -209,7 +209,7 @@ func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, po // DropRowRange permanently deletes a row range from the specified table. func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error { - ctx = mergeMetadata(ctx, ac.md) + ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() req := &btapb.DropRowRangeRequest{ Name: prefix + "/tables/" + table, @@ -291,7 +291,7 @@ var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][ // CreateInstance creates a new instance in the project. // This method will return when the instance has been created or when an error occurs. func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error { - ctx = mergeMetadata(ctx, iac.md) + ctx = mergeOutgoingMetadata(ctx, iac.md) req := &btapb.CreateInstanceRequest{ Parent: "projects/" + iac.project, InstanceId: conf.InstanceId, @@ -315,7 +315,7 @@ func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *Instan // DeleteInstance deletes an instance from the project. func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceId string) error { - ctx = mergeMetadata(ctx, iac.md) + ctx = mergeOutgoingMetadata(ctx, iac.md) req := &btapb.DeleteInstanceRequest{"projects/" + iac.project + "/instances/" + instanceId} _, err := iac.iClient.DeleteInstance(ctx, req) return err @@ -323,7 +323,7 @@ func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceId s // Instances returns a list of instances in the project. func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) { - ctx = mergeMetadata(ctx, iac.md) + ctx = mergeOutgoingMetadata(ctx, iac.md) req := &btapb.ListInstancesRequest{ Parent: "projects/" + iac.project, } diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 374ed573273f..5c2129cb7723 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -73,7 +73,7 @@ func (c *Client) Close() error { } var ( - idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted,codes.Internal} + idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted, codes.Internal} isIdempotentRetryCode = make(map[codes.Code]bool) retryOptions = []gax.CallOption{ gax.WithDelayTimeoutSettings(100*time.Millisecond, 2000*time.Millisecond, 1.2), @@ -120,7 +120,7 @@ func (c *Client) Open(table string) *Table { // By default, the yielded rows will contain all values in all cells. // Use RowFilter to limit the cells returned. func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) var prevRowKey string err := gax.Invoke(ctx, func(ctx context.Context) error { @@ -337,7 +337,7 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet { } func (r RowRangeList) valid() bool { - for _, rr := range r { + for _, rr := range r { if rr.valid() { return true } @@ -420,7 +420,7 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool { // Apply applies a Mutation to a specific row. func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) after := func(res proto.Message) { for _, o := range opts { o.after(res) @@ -582,7 +582,7 @@ type entryErr struct { // // Conditional mutations cannot be applied in bulk and providing one will result in an error. func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) if len(rowKeys) != len(muts) { return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts)) } @@ -711,13 +711,13 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp { if ts == ServerTime { return ts } - return ts - ts % 1000 + return ts - ts%1000 } // ApplyReadModifyWrite applies a ReadModifyWrite to a specific row. // It returns the newly written cells. func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { - ctx = mergeMetadata(ctx, t.md) + ctx = mergeOutgoingMetadata(ctx, t.md) req := &btpb.ReadModifyWriteRowRequest{ TableName: t.c.fullTableName(t.table), RowKey: []byte(row), @@ -774,9 +774,9 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) { }) } -// mergeMetadata returns a context populated by the existing metadata, if any, -// joined with internal metadata. -func mergeMetadata(ctx context.Context, md metadata.MD) context.Context { - mdCopy, _ := metadata.FromContext(ctx) - return metadata.NewContext(ctx, metadata.Join(mdCopy, md)) +// mergeOutgoingMetadata returns a context populated by the existing outgoing metadata, +// if any, joined with internal metadata. +func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { + mdCopy, _ := metadata.FromOutgoingContext(ctx) + return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md)) } diff --git a/datastore/datastore.go b/datastore/datastore.go index 91f92c437b51..e9ffbfab7f7c 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -71,27 +71,27 @@ func newDatastoreClient(conn *grpc.ClientConn, projectID string) pb.DatastoreCli } func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (*pb.LookupResponse, error) { - return dc.c.Lookup(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.Lookup(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (*pb.RunQueryResponse, error) { - return dc.c.RunQuery(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.RunQuery(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (*pb.BeginTransactionResponse, error) { - return dc.c.BeginTransaction(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.BeginTransaction(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (*pb.CommitResponse, error) { - return dc.c.Commit(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.Commit(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (*pb.RollbackResponse, error) { - return dc.c.Rollback(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.Rollback(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (*pb.AllocateIdsResponse, error) { - return dc.c.AllocateIds(metadata.NewContext(ctx, dc.md), in, opts...) + return dc.c.AllocateIds(metadata.NewOutgoingContext(ctx, dc.md), in, opts...) } // Client is a client for reading and writing data in a datastore dataset. diff --git a/spanner/client.go b/spanner/client.go index 92e7381b6ac6..3c979e740a85 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -96,12 +96,12 @@ func errDial(ci int, err error) error { return e } -func contextWithMetadata(ctx context.Context, md metadata.MD) context.Context { - existing, ok := metadata.FromContext(ctx) +func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { + existing, ok := metadata.FromOutgoingContext(ctx) if ok { md = metadata.Join(existing, md) } - return metadata.NewContext(ctx, md) + return metadata.NewOutgoingContext(ctx, md) } // NewClient creates a client to a database. A valid database name has the diff --git a/spanner/session.go b/spanner/session.go index 58eb96d317d2..95c88782d45e 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -166,7 +166,7 @@ func (s *session) ping() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() return runRetryable(ctx, func(ctx context.Context) error { - _, err := s.client.GetSession(contextWithMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid. + _, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid. return err }) } @@ -185,7 +185,7 @@ func (s *session) refreshIdle() bool { defer cancel() var sid string err := runRetryable(ctx, func(ctx context.Context) error { - session, e := s.client.CreateSession(contextWithMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db}) + session, e := s.client.CreateSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db}) if e != nil { return e } @@ -216,7 +216,7 @@ func (s *session) refreshIdle() bool { // If we fail to explicitly destroy the session, it will be eventually garbage collected by // Cloud Spanner. if err = runRetryable(ctx, func(ctx context.Context) error { - _, e := s.client.DeleteSession(contextWithMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid}) + _, e := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid}) return e }); err != nil { return false @@ -541,7 +541,7 @@ func (p *sessionPool) isHealthy(s *session) bool { // take returns a cached session if there are available ones; if there isn't any, it tries to allocate a new one. // Session returned by take should be used for read operations. func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { - ctx = contextWithMetadata(ctx, p.md) + ctx = contextWithOutgoingMetadata(ctx, p.md) for { var ( s *session @@ -595,7 +595,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { // takeWriteSession returns a write prepared cached session if there are available ones; if there isn't any, it tries to allocate a new one. // Session returned should be used for read write transactions. func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { - ctx = contextWithMetadata(ctx, p.md) + ctx = contextWithOutgoingMetadata(ctx, p.md) for { var ( s *session @@ -927,7 +927,7 @@ func (hc *healthChecker) worker(i int) { if ws != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - ws.prepareForWrite(contextWithMetadata(ctx, hc.pool.md)) + ws.prepareForWrite(contextWithOutgoingMetadata(ctx, hc.pool.md)) hc.pool.recycle(ws) hc.pool.mu.Lock() hc.pool.prepareReqs-- diff --git a/spanner/transaction.go b/spanner/transaction.go index 275cadef53a0..49977e9d2d79 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -84,7 +84,7 @@ func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, ke return &RowIterator{err: errSessionClosed(sh)} } return stream( - contextWithMetadata(ctx, sh.getMetadata()), + contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { return client.StreamingRead(ctx, &sppb.ReadRequest{ @@ -150,7 +150,7 @@ func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterato return &RowIterator{err: err} } return stream( - contextWithMetadata(ctx, sh.getMetadata()), + contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { req.ResumeToken = resumeToken return client.ExecuteStreamingSql(ctx, req) @@ -279,7 +279,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { if err != nil { return err } - err = runRetryable(contextWithMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error { + err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error { res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{ Session: sh.getID(), Options: &sppb.TransactionOptions{ @@ -629,7 +629,7 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error { t.state = txActive return nil } - tx, err := beginTransaction(contextWithMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) + tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) if err == nil { t.tx = tx t.state = txActive @@ -656,7 +656,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { if sid == "" || client == nil { return ts, errSessionClosed(t.sh) } - err = runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { + err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { var trailer metadata.MD res, e := client.Commit(ctx, &sppb.CommitRequest{ Session: sid, @@ -690,7 +690,7 @@ func (t *ReadWriteTransaction) rollback(ctx context.Context) { if sid == "" || client == nil { return } - err := runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { + err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { _, e := client.Rollback(ctx, &sppb.RollbackRequest{ Session: sid, TransactionId: t.tx, @@ -759,7 +759,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta return e } } - res, e := sh.getClient().Commit(contextWithMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ + res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ Session: sh.getID(), Transaction: &sppb.CommitRequest_SingleUseTransaction{ SingleUseTransaction: &sppb.TransactionOptions{ diff --git a/trace/grpc.go b/trace/grpc.go index a55aa66d14ee..16ecf5f96f60 100644 --- a/trace/grpc.go +++ b/trace/grpc.go @@ -49,7 +49,7 @@ func grpcUnaryInterceptor(ctx context.Context, method string, req, reply interfa md = md.Copy() // metadata is immutable, copy. md[grpcMetadataKey] = []string{header} } - ctx = metadata.NewContext(ctx, md) + ctx = metadata.NewOutgoingContext(ctx, md) } err := invoker(ctx, method, req, reply, cc, opts...)