From 044c0775fb3a5e2d1496d609981ad1115620a2eb Mon Sep 17 00:00:00 2001 From: shollyman Date: Thu, 21 Nov 2024 10:36:43 -0800 Subject: [PATCH] refactor(bigquery/storage/managedwriter): simplify signatures (#11158) Another unparam related PR, this one focused on reducing unused params in the managedwriter and its adapt subpackage. Followup to https://github.com/googleapis/google-cloud-go/pull/11149 --- .../managedwriter/adapt/protoconversion.go | 19 +++------- bigquery/storage/managedwriter/connection.go | 6 ++-- .../storage/managedwriter/managed_stream.go | 4 +-- bigquery/storage/managedwriter/retry.go | 36 ++++++++----------- 4 files changed, 24 insertions(+), 41 deletions(-) diff --git a/bigquery/storage/managedwriter/adapt/protoconversion.go b/bigquery/storage/managedwriter/adapt/protoconversion.go index 5cc00c2ed17d..56eade970fd5 100644 --- a/bigquery/storage/managedwriter/adapt/protoconversion.go +++ b/bigquery/storage/managedwriter/adapt/protoconversion.go @@ -277,10 +277,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st deps = append(deps, foundDesc.ParentFile()) } // Construct field descriptor for the message. - fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3) - if err != nil { - return nil, newConversionError(scope, fmt.Errorf("couldn't convert field to FieldDescriptorProto: %w", err)) - } + fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3) fields = append(fields, fdp) } else { // Wrap the current struct's fields in a TableSchema outer message, and then build the submessage. @@ -298,10 +295,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st if err != nil { return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %w", err)) } - fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3) - if err != nil { - return nil, newConversionError(currentScope, fmt.Errorf("couldn't compute field schema : %w", err)) - } + fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3) fields = append(fields, fdp) } } else { @@ -329,10 +323,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st } } } - fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3) - if err != nil { - return nil, newConversionError(currentScope, err) - } + fd := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3) fields = append(fields, fd) } } @@ -411,7 +402,7 @@ func messageDependsOnFile(msg protoreflect.MessageDescriptor, file protoreflect. // For proto2, we propagate the mode->label annotation as expected. // // Messages are always nullable, and repeated fields are as well. -func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) (*descriptorpb.FieldDescriptorProto, error) { +func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) *descriptorpb.FieldDescriptorProto { name := field.GetName() var fdp *descriptorpb.FieldDescriptorProto @@ -474,7 +465,7 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i } proto.SetExtension(fdp.Options, storagepb.E_ColumnName, name) } - return fdp, nil + return fdp } // nameRequiresAnnotation determines whether a field name requires unicode-annotation. diff --git a/bigquery/storage/managedwriter/connection.go b/bigquery/storage/managedwriter/connection.go index 6b46e535608b..b7c548eb303f 100644 --- a/bigquery/storage/managedwriter/connection.go +++ b/bigquery/storage/managedwriter/connection.go @@ -537,7 +537,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue // TODO: Determine if/how we should report this case, as we have no viable context for propagating. // Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue. - pw.writer.processRetry(pw, co, nil, doneErr) + pw.writer.processRetry(pw, nil, doneErr) } case nextWrite, ok := <-ch: if !ok { @@ -557,7 +557,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue } recordStat(metricCtx, AppendResponseErrors, 1) - nextWrite.writer.processRetry(nextWrite, co, nil, err) + nextWrite.writer.processRetry(nextWrite, nil, err) continue } // Record that we did in fact get a response from the backend. @@ -573,7 +573,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue recordStat(metricCtx, AppendResponseErrors, 1) respErr := grpcstatus.ErrorProto(status) - nextWrite.writer.processRetry(nextWrite, co, resp, respErr) + nextWrite.writer.processRetry(nextWrite, resp, respErr) continue } diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 090933d7e706..3ab104da6186 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -194,7 +194,7 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) ( // appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long // lived bidirectional network stream, with it's own managed context (ms.ctx), and there's a per-request context // attached to the pendingWrite. -func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error { +func (ms *ManagedStream) appendWithRetry(pw *pendingWrite) error { for { ms.mu.Lock() err := ms.err @@ -355,7 +355,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... // processRetry is responsible for evaluating and re-enqueing an append. // If the append is not retried, it is marked complete. -func (ms *ManagedStream) processRetry(pw *pendingWrite, srcConn *connection, appendResp *storagepb.AppendRowsResponse, initialErr error) { +func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) { err := initialErr for { pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount) diff --git a/bigquery/storage/managedwriter/retry.go b/bigquery/storage/managedwriter/retry.go index 60c5c347d1e2..8ec4355a8b80 100644 --- a/bigquery/storage/managedwriter/retry.go +++ b/bigquery/storage/managedwriter/retry.go @@ -33,9 +33,9 @@ var ( // This retry predicate is used for higher level retries, enqueing appends onto to a bidi // channel and evaluating whether an append should be retried (re-enqueued). -func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) { +func retryPredicate(err error) bool { if err == nil { - return + return false } s, ok := status.FromError(err) @@ -43,11 +43,10 @@ func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) { if !ok { // EOF can happen in the case of connection close. if errors.Is(err, io.EOF) { - shouldRetry = true - return + return true } // All other non-status errors are treated as non-retryable (including context errors). - return + return false } switch s.Code() { case codes.Aborted, @@ -56,17 +55,15 @@ func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) { codes.FailedPrecondition, codes.Internal, codes.Unavailable: - shouldRetry = true - return + return true case codes.ResourceExhausted: if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { // Note: internal b/246031522 opened to give this a structured error // and avoid string parsing. Should be a QuotaFailure or similar. - shouldRetry = true - return + return true } } - return + return false } // unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection. @@ -75,7 +72,7 @@ type unaryRetryer struct { } func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) { - shouldRetry, _ := retryPredicate(err) + shouldRetry := retryPredicate(err) return ur.bo.Pause(), shouldRetry } @@ -86,10 +83,9 @@ type statelessRetryer struct { mu sync.Mutex // guards r r *rand.Rand - minBackoff time.Duration - jitter time.Duration - aggressiveFactor int - maxAttempts int + minBackoff time.Duration + jitter time.Duration + maxAttempts int } func newStatelessRetryer() *statelessRetryer { @@ -101,7 +97,7 @@ func newStatelessRetryer() *statelessRetryer { } } -func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration { +func (sr *statelessRetryer) pause() time.Duration { jitter := sr.jitter.Nanoseconds() if jitter > 0 { sr.mu.Lock() @@ -109,9 +105,6 @@ func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration { sr.mu.Unlock() } pause := sr.minBackoff.Nanoseconds() + jitter - if aggressiveBackoff { - pause = pause * int64(sr.aggressiveFactor) - } return time.Duration(pause) } @@ -119,9 +112,8 @@ func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, b if attemptCount >= sr.maxAttempts { return 0, false } - shouldRetry, aggressive := retryPredicate(err) - if shouldRetry { - return sr.pause(aggressive), true + if retryPredicate(err) { + return sr.pause(), true } return 0, false }