Skip to content

Commit

Permalink
chore(bigtable): add timeouts to proxy methods (#6973)
Browse files Browse the repository at this point in the history
* chore(bigtable): add timeouts to proxy methods

* per reviewer

* per reviewer
  • Loading branch information
telpirion authored Nov 1, 2022
1 parent a9189fa commit 5d2dc01
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions bigtable/internal/testproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,15 @@ type testClient struct {
isOpen bool // isOpen indicates whether this client is open for new requests
}

// timeout adds a timeout setting to a context if perOperationTimeout is set on
// the testClient object.
func (tc *testClient) timeout(ctx context.Context) (context.Context, context.CancelFunc) {
if tc.perOperationTimeout != nil {
return context.WithTimeout(ctx, tc.perOperationTimeout.AsDuration())
}
return context.WithCancel(ctx)
}

// credentialsBundle implements credentials.Bundle interface
// [See documentation for usage](https://pkg.go.dev/google.golang.org/grpc/credentials#Bundle).
type credentialsBundle struct {
Expand Down Expand Up @@ -617,10 +626,8 @@ func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest)
Row: &btpb.Row{},
}

if btc.perOperationTimeout != nil {
ct, _ := context.WithTimeout(ctx, btc.perOperationTimeout.AsDuration())
ctx = ct
}
ctx, cancel := btc.timeout(ctx)
defer cancel()

r, err := t.ReadRow(ctx, req.RowKey)
if err != nil {
Expand Down Expand Up @@ -674,6 +681,9 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques
rs = bigtable.InfiniteRange("")
}

ctx, cancel := btc.timeout(ctx)
defer cancel()

var c int32
var rowsPb []*btpb.Row
lim := req.GetCancelAfterRows()
Expand Down Expand Up @@ -743,6 +753,9 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ
},
}

ctx, cancel := btc.timeout(ctx)
defer cancel()

err := t.Apply(ctx, string(row), m)
if err != nil {
res.Status = statusFromError(err)
Expand Down Expand Up @@ -794,6 +807,9 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo
},
}

ctx, cancel := btc.timeout(ctx)
defer cancel()

errs, err := t.ApplyBulk(ctx, keys, muts)
if err != nil {
log.Printf("received error from Table.ApplyBulk(): %v", err)
Expand Down Expand Up @@ -863,6 +879,9 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check
var matched bool
ao := bigtable.GetCondMutationResult(&matched)

ctx, cancel := btc.timeout(ctx)
defer cancel()

err := t.Apply(ctx, rowKey, c, ao)
if err != nil {
log.Printf("received error from Table.Apply: %v", err)
Expand Down Expand Up @@ -902,6 +921,9 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow
},
}

ctx, cancel := btc.timeout(ctx)
defer cancel()

t := btc.c.Open(rrq.TableName)
keys, err := t.SampleRowKeys(ctx)
if err != nil {
Expand Down Expand Up @@ -964,6 +986,10 @@ func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.Read

t := btc.c.Open(rrq.TableName)
k := string(rrq.RowKey)

ctx, cancel := btc.timeout(ctx)
defer cancel()

r, err := t.ApplyReadModifyWrite(ctx, k, rmw)
if err != nil {
return nil, err
Expand Down

0 comments on commit 5d2dc01

Please sign in to comment.