Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): simplify signatures (#11158)
Browse files Browse the repository at this point in the history
Another unparam related PR, this one focused on reducing unused params
in the managedwriter and its adapt subpackage.

Followup to #11149
  • Loading branch information
shollyman authored Nov 21, 2024
1 parent f198452 commit 044c077
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 41 deletions.
19 changes: 5 additions & 14 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
deps = append(deps, foundDesc.ParentFile())
}
// Construct field descriptor for the message.
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
if err != nil {
return nil, newConversionError(scope, fmt.Errorf("couldn't convert field to FieldDescriptorProto: %w", err))
}
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
fields = append(fields, fdp)
} else {
// Wrap the current struct's fields in a TableSchema outer message, and then build the submessage.
Expand All @@ -298,10 +295,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %w", err))
}
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't compute field schema : %w", err))
}
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
fields = append(fields, fdp)
}
} else {
Expand Down Expand Up @@ -329,10 +323,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
}
}
}
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, err)
}
fd := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
fields = append(fields, fd)
}
}
Expand Down Expand Up @@ -411,7 +402,7 @@ func messageDependsOnFile(msg protoreflect.MessageDescriptor, file protoreflect.
// For proto2, we propagate the mode->label annotation as expected.
//
// Messages are always nullable, and repeated fields are as well.
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) (*descriptorpb.FieldDescriptorProto, error) {
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) *descriptorpb.FieldDescriptorProto {
name := field.GetName()
var fdp *descriptorpb.FieldDescriptorProto

Expand Down Expand Up @@ -474,7 +465,7 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
}
proto.SetExtension(fdp.Options, storagepb.E_ColumnName, name)
}
return fdp, nil
return fdp
}

// nameRequiresAnnotation determines whether a field name requires unicode-annotation.
Expand Down
6 changes: 3 additions & 3 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue
// TODO: Determine if/how we should report this case, as we have no viable context for propagating.

// Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue.
pw.writer.processRetry(pw, co, nil, doneErr)
pw.writer.processRetry(pw, nil, doneErr)
}
case nextWrite, ok := <-ch:
if !ok {
Expand All @@ -557,7 +557,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue
}
recordStat(metricCtx, AppendResponseErrors, 1)

nextWrite.writer.processRetry(nextWrite, co, nil, err)
nextWrite.writer.processRetry(nextWrite, nil, err)
continue
}
// Record that we did in fact get a response from the backend.
Expand All @@ -573,7 +573,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue
recordStat(metricCtx, AppendResponseErrors, 1)
respErr := grpcstatus.ErrorProto(status)

nextWrite.writer.processRetry(nextWrite, co, resp, respErr)
nextWrite.writer.processRetry(nextWrite, resp, respErr)

continue
}
Expand Down
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (
// appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long
// lived bidirectional network stream, with it's own managed context (ms.ctx), and there's a per-request context
// attached to the pendingWrite.
func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error {
func (ms *ManagedStream) appendWithRetry(pw *pendingWrite) error {
for {
ms.mu.Lock()
err := ms.err
Expand Down Expand Up @@ -355,7 +355,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...

// processRetry is responsible for evaluating and re-enqueing an append.
// If the append is not retried, it is marked complete.
func (ms *ManagedStream) processRetry(pw *pendingWrite, srcConn *connection, appendResp *storagepb.AppendRowsResponse, initialErr error) {
func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) {
err := initialErr
for {
pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount)
Expand Down
36 changes: 14 additions & 22 deletions bigquery/storage/managedwriter/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,20 @@ var (

// This retry predicate is used for higher level retries, enqueing appends onto to a bidi
// channel and evaluating whether an append should be retried (re-enqueued).
func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) {
func retryPredicate(err error) bool {
if err == nil {
return
return false
}

s, ok := status.FromError(err)
// non-status based error conditions.
if !ok {
// EOF can happen in the case of connection close.
if errors.Is(err, io.EOF) {
shouldRetry = true
return
return true
}
// All other non-status errors are treated as non-retryable (including context errors).
return
return false
}
switch s.Code() {
case codes.Aborted,
Expand All @@ -56,17 +55,15 @@ func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) {
codes.FailedPrecondition,
codes.Internal,
codes.Unavailable:
shouldRetry = true
return
return true
case codes.ResourceExhausted:
if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
// Note: internal b/246031522 opened to give this a structured error
// and avoid string parsing. Should be a QuotaFailure or similar.
shouldRetry = true
return
return true
}
}
return
return false
}

// unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection.
Expand All @@ -75,7 +72,7 @@ type unaryRetryer struct {
}

func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) {
shouldRetry, _ := retryPredicate(err)
shouldRetry := retryPredicate(err)
return ur.bo.Pause(), shouldRetry
}

Expand All @@ -86,10 +83,9 @@ type statelessRetryer struct {
mu sync.Mutex // guards r
r *rand.Rand

minBackoff time.Duration
jitter time.Duration
aggressiveFactor int
maxAttempts int
minBackoff time.Duration
jitter time.Duration
maxAttempts int
}

func newStatelessRetryer() *statelessRetryer {
Expand All @@ -101,27 +97,23 @@ func newStatelessRetryer() *statelessRetryer {
}
}

func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration {
func (sr *statelessRetryer) pause() time.Duration {
jitter := sr.jitter.Nanoseconds()
if jitter > 0 {
sr.mu.Lock()
jitter = sr.r.Int63n(jitter)
sr.mu.Unlock()
}
pause := sr.minBackoff.Nanoseconds() + jitter
if aggressiveBackoff {
pause = pause * int64(sr.aggressiveFactor)
}
return time.Duration(pause)
}

func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) {
if attemptCount >= sr.maxAttempts {
return 0, false
}
shouldRetry, aggressive := retryPredicate(err)
if shouldRetry {
return sr.pause(aggressive), true
if retryPredicate(err) {
return sr.pause(), true
}
return 0, false
}
Expand Down

0 comments on commit 044c077

Please sign in to comment.