Skip to content

Commit

Permalink
Avoid grpc/metadata.NewContext and FromContext
Browse files Browse the repository at this point in the history
Use appropriate Incoming/Outgoing versions instead.

Change-Id: I94d88915417e2140cb939b9cc0d5fbab591212c0
Reviewed-on: https://code-review.googlesource.com/13094
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Gary Elliott <garyelliott@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
  • Loading branch information
dfawley authored and garye committed May 19, 2017
1 parent 99e4baf commit dc9545a
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 47 deletions.
24 changes: 12 additions & 12 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (ac *AdminClient) instancePrefix() string {

// Tables returns a list of the tables in the instance.
func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.ListTablesRequest{
Parent: prefix,
Expand All @@ -94,7 +94,7 @@ func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) {
// CreateTable creates a new table in the instance.
// This method may return before the table's creation is complete.
func (ac *AdminClient) CreateTable(ctx context.Context, table string) error {
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.CreateTableRequest{
Parent: prefix,
Expand All @@ -114,7 +114,7 @@ func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, sp
for _, split := range split_keys {
req_splits = append(req_splits, &btapb.CreateTableRequest_Split{[]byte(split)})
}
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.CreateTableRequest{
Parent: prefix,
Expand All @@ -128,7 +128,7 @@ func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, sp
// CreateColumnFamily creates a new column family in a table.
func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family string) error {
// TODO(dsymonds): Permit specifying gcexpr and any other family settings.
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + table,
Expand All @@ -143,7 +143,7 @@ func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family str

// DeleteTable deletes a table and all of its data.
func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.DeleteTableRequest{
Name: prefix + "/tables/" + table,
Expand All @@ -154,7 +154,7 @@ func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {

// DeleteColumnFamily deletes a column family in a table and all of its data.
func (ac *AdminClient) DeleteColumnFamily(ctx context.Context, table, family string) error {
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + table,
Expand All @@ -174,7 +174,7 @@ type TableInfo struct {

// TableInfo retrieves information about a table.
func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, error) {
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.GetTableRequest{
Name: prefix + "/tables/" + table,
Expand All @@ -194,7 +194,7 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo,
// GC executes opportunistically in the background; table reads may return data
// matching the GC policy.
func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + table,
Expand All @@ -209,7 +209,7 @@ func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, po

// DropRowRange permanently deletes a row range from the specified table.
func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error {
ctx = mergeMetadata(ctx, ac.md)
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()
req := &btapb.DropRowRangeRequest{
Name: prefix + "/tables/" + table,
Expand Down Expand Up @@ -291,7 +291,7 @@ var instanceNameRegexp = regexp.MustCompile(`^projects/([^/]+)/instances/([a-z][
// CreateInstance creates a new instance in the project.
// This method will return when the instance has been created or when an error occurs.
func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *InstanceConf) error {
ctx = mergeMetadata(ctx, iac.md)
ctx = mergeOutgoingMetadata(ctx, iac.md)
req := &btapb.CreateInstanceRequest{
Parent: "projects/" + iac.project,
InstanceId: conf.InstanceId,
Expand All @@ -315,15 +315,15 @@ func (iac *InstanceAdminClient) CreateInstance(ctx context.Context, conf *Instan

// DeleteInstance deletes an instance from the project.
func (iac *InstanceAdminClient) DeleteInstance(ctx context.Context, instanceId string) error {
ctx = mergeMetadata(ctx, iac.md)
ctx = mergeOutgoingMetadata(ctx, iac.md)
req := &btapb.DeleteInstanceRequest{"projects/" + iac.project + "/instances/" + instanceId}
_, err := iac.iClient.DeleteInstance(ctx, req)
return err
}

// Instances returns a list of instances in the project.
func (iac *InstanceAdminClient) Instances(ctx context.Context) ([]*InstanceInfo, error) {
ctx = mergeMetadata(ctx, iac.md)
ctx = mergeOutgoingMetadata(ctx, iac.md)
req := &btapb.ListInstancesRequest{
Parent: "projects/" + iac.project,
}
Expand Down
24 changes: 12 additions & 12 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *Client) Close() error {
}

var (
idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted,codes.Internal}
idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted, codes.Internal}
isIdempotentRetryCode = make(map[codes.Code]bool)
retryOptions = []gax.CallOption{
gax.WithDelayTimeoutSettings(100*time.Millisecond, 2000*time.Millisecond, 1.2),
Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *Client) Open(table string) *Table {
// By default, the yielded rows will contain all values in all cells.
// Use RowFilter to limit the cells returned.
func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)

var prevRowKey string
err := gax.Invoke(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -337,7 +337,7 @@ func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
}

func (r RowRangeList) valid() bool {
for _, rr := range r {
for _, rr := range r {
if rr.valid() {
return true
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool {

// Apply applies a Mutation to a specific row.
func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
Expand Down Expand Up @@ -582,7 +582,7 @@ type entryErr struct {
//
// Conditional mutations cannot be applied in bulk and providing one will result in an error.
func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)
if len(rowKeys) != len(muts) {
return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
}
Expand Down Expand Up @@ -711,13 +711,13 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp {
if ts == ServerTime {
return ts
}
return ts - ts % 1000
return ts - ts%1000
}

// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
// It returns the newly written cells.
func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
ctx = mergeMetadata(ctx, t.md)
ctx = mergeOutgoingMetadata(ctx, t.md)
req := &btpb.ReadModifyWriteRowRequest{
TableName: t.c.fullTableName(t.table),
RowKey: []byte(row),
Expand Down Expand Up @@ -774,9 +774,9 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
})
}

// mergeMetadata returns a context populated by the existing metadata, if any,
// joined with internal metadata.
func mergeMetadata(ctx context.Context, md metadata.MD) context.Context {
mdCopy, _ := metadata.FromContext(ctx)
return metadata.NewContext(ctx, metadata.Join(mdCopy, md))
// mergeOutgoingMetadata returns a context populated by the existing outgoing metadata,
// if any, joined with internal metadata.
func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
mdCopy, _ := metadata.FromOutgoingContext(ctx)
return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md))
}
12 changes: 6 additions & 6 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,27 @@ func newDatastoreClient(conn *grpc.ClientConn, projectID string) pb.DatastoreCli
}

func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (*pb.LookupResponse, error) {
return dc.c.Lookup(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.Lookup(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (*pb.RunQueryResponse, error) {
return dc.c.RunQuery(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.RunQuery(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (*pb.BeginTransactionResponse, error) {
return dc.c.BeginTransaction(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.BeginTransaction(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (*pb.CommitResponse, error) {
return dc.c.Commit(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.Commit(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (*pb.RollbackResponse, error) {
return dc.c.Rollback(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.Rollback(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (*pb.AllocateIdsResponse, error) {
return dc.c.AllocateIds(metadata.NewContext(ctx, dc.md), in, opts...)
return dc.c.AllocateIds(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
}

// Client is a client for reading and writing data in a datastore dataset.
Expand Down
6 changes: 3 additions & 3 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func errDial(ci int, err error) error {
return e
}

func contextWithMetadata(ctx context.Context, md metadata.MD) context.Context {
existing, ok := metadata.FromContext(ctx)
func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
existing, ok := metadata.FromOutgoingContext(ctx)
if ok {
md = metadata.Join(existing, md)
}
return metadata.NewContext(ctx, md)
return metadata.NewOutgoingContext(ctx, md)
}

// NewClient creates a client to a database. A valid database name has the
Expand Down
12 changes: 6 additions & 6 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *session) ping() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
return runRetryable(ctx, func(ctx context.Context) error {
_, err := s.client.GetSession(contextWithMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid.
_, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.GetSessionRequest{Name: s.getID()}) // s.getID is safe even when s is invalid.
return err
})
}
Expand All @@ -185,7 +185,7 @@ func (s *session) refreshIdle() bool {
defer cancel()
var sid string
err := runRetryable(ctx, func(ctx context.Context) error {
session, e := s.client.CreateSession(contextWithMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db})
session, e := s.client.CreateSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.CreateSessionRequest{Database: s.pool.db})
if e != nil {
return e
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *session) refreshIdle() bool {
// If we fail to explicitly destroy the session, it will be eventually garbage collected by
// Cloud Spanner.
if err = runRetryable(ctx, func(ctx context.Context) error {
_, e := s.client.DeleteSession(contextWithMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid})
_, e := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.pool.md), &sppb.DeleteSessionRequest{Name: sid})
return e
}); err != nil {
return false
Expand Down Expand Up @@ -541,7 +541,7 @@ func (p *sessionPool) isHealthy(s *session) bool {
// take returns a cached session if there are available ones; if there isn't any, it tries to allocate a new one.
// Session returned by take should be used for read operations.
func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
ctx = contextWithMetadata(ctx, p.md)
ctx = contextWithOutgoingMetadata(ctx, p.md)
for {
var (
s *session
Expand Down Expand Up @@ -595,7 +595,7 @@ func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) {
// takeWriteSession returns a write prepared cached session if there are available ones; if there isn't any, it tries to allocate a new one.
// Session returned should be used for read write transactions.
func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) {
ctx = contextWithMetadata(ctx, p.md)
ctx = contextWithOutgoingMetadata(ctx, p.md)
for {
var (
s *session
Expand Down Expand Up @@ -927,7 +927,7 @@ func (hc *healthChecker) worker(i int) {
if ws != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ws.prepareForWrite(contextWithMetadata(ctx, hc.pool.md))
ws.prepareForWrite(contextWithOutgoingMetadata(ctx, hc.pool.md))
hc.pool.recycle(ws)
hc.pool.mu.Lock()
hc.pool.prepareReqs--
Expand Down
14 changes: 7 additions & 7 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, ke
return &RowIterator{err: errSessionClosed(sh)}
}
return stream(
contextWithMetadata(ctx, sh.getMetadata()),
contextWithOutgoingMetadata(ctx, sh.getMetadata()),
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.StreamingRead(ctx,
&sppb.ReadRequest{
Expand Down Expand Up @@ -150,7 +150,7 @@ func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterato
return &RowIterator{err: err}
}
return stream(
contextWithMetadata(ctx, sh.getMetadata()),
contextWithOutgoingMetadata(ctx, sh.getMetadata()),
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
req.ResumeToken = resumeToken
return client.ExecuteStreamingSql(ctx, req)
Expand Down Expand Up @@ -279,7 +279,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
if err != nil {
return err
}
err = runRetryable(contextWithMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Expand Down Expand Up @@ -629,7 +629,7 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error {
t.state = txActive
return nil
}
tx, err := beginTransaction(contextWithMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
if err == nil {
t.tx = tx
t.state = txActive
Expand All @@ -656,7 +656,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
if sid == "" || client == nil {
return ts, errSessionClosed(t.sh)
}
err = runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
var trailer metadata.MD
res, e := client.Commit(ctx, &sppb.CommitRequest{
Session: sid,
Expand Down Expand Up @@ -690,7 +690,7 @@ func (t *ReadWriteTransaction) rollback(ctx context.Context) {
if sid == "" || client == nil {
return
}
err := runRetryable(contextWithMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
_, e := client.Rollback(ctx, &sppb.RollbackRequest{
Session: sid,
TransactionId: t.tx,
Expand Down Expand Up @@ -759,7 +759,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
return e
}
}
res, e := sh.getClient().Commit(contextWithMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
Session: sh.getID(),
Transaction: &sppb.CommitRequest_SingleUseTransaction{
SingleUseTransaction: &sppb.TransactionOptions{
Expand Down
2 changes: 1 addition & 1 deletion trace/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func grpcUnaryInterceptor(ctx context.Context, method string, req, reply interfa
md = md.Copy() // metadata is immutable, copy.
md[grpcMetadataKey] = []string{header}
}
ctx = metadata.NewContext(ctx, md)
ctx = metadata.NewOutgoingContext(ctx, md)
}

err := invoker(ctx, method, req, reply, cc, opts...)
Expand Down

0 comments on commit dc9545a

Please sign in to comment.