Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): retry configs for gRPC media ops #6754

Merged
merged 3 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
// Store the content from the first Recv in the
// client buffer for reading later.
leftovers: msg.GetChecksummedData().GetContent(),
settings: s,
},
}

Expand All @@ -964,13 +965,19 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
}

func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
s := callSettings(c.settings, opts...)

var offset int64
errorf := params.setError
progress := params.progress
setObj := params.setObj

pr, pw := io.Pipe()
gw := newGRPCWriter(c, params, pr)
gw.settings = s
if s.userProject != "" {
gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject)
}

// This function reads the data sent to the pipe and sends sets of messages
// on the gRPC client-stream as the buffer is filled.
Expand Down Expand Up @@ -1315,6 +1322,7 @@ type gRPCReader struct {
reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
leftovers []byte
cancel context.CancelFunc
settings *settings
}

// Read reads bytes into the user's buffer from an open gRPC stream.
Expand Down Expand Up @@ -1390,7 +1398,11 @@ func (r *gRPCReader) Close() error {
// an attempt to reopen the stream.
func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
msg, err := r.stream.Recv()
if err != nil && ShouldRetry(err) {
var shouldRetry = ShouldRetry
if r.settings.retry != nil {
shouldRetry = r.settings.retry.shouldRetry
}
if err != nil && shouldRetry(err) {
// This will "close" the existing stream and immediately attempt to
// reopen the stream, but will backoff if further attempts are necessary.
// Reopening the stream Recvs the first message, so if retrying is
Expand Down Expand Up @@ -1454,6 +1466,7 @@ type gRPCWriter struct {
attrs *ObjectAttrs
conds *Conditions
encryptionKey []byte
settings *settings

sendCRC32C bool

Expand All @@ -1471,21 +1484,27 @@ func (w *gRPCWriter) startResumableUpload() error {
if err != nil {
return err
}
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
})

w.upid = upres.GetUploadId()
return err
return run(w.ctx, func() error {
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
})
w.upid = upres.GetUploadId()
return err
}, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx))
}

// queryProgress is a helper that queries the status of the resumable upload
// associated with the given upload ID.
func (w *gRPCWriter) queryProgress() (int64, error) {
q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid})
var persistedSize int64
err := run(w.ctx, func() error {
q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid})
persistedSize = q.GetPersistedSize()
return err
}, w.settings.retry, true, setRetryHeaderGRPC(w.ctx))

// q.GetCommittedSize() will return 0 if q is nil.
return q.GetPersistedSize(), err
return persistedSize, err
}

// uploadBuffer opens a Write stream and uploads the buffer at the given offset (if
Expand All @@ -1500,6 +1519,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
var err error
var finishWrite bool
var sent, limit int = 0, maxPerMessageWriteSize
var shouldRetry = ShouldRetry
if w.settings.retry != nil {
shouldRetry = w.settings.retry.shouldRetry
}
offset := start
toWrite := w.buf[:recvd]
for {
Expand Down Expand Up @@ -1570,7 +1593,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if ShouldRetry(err) {
if shouldRetry(err) {
sent = 0
finishWrite = false
// TODO: Add test case for failure modes of querying progress.
Expand Down Expand Up @@ -1601,7 +1624,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if ShouldRetry(err) {
if shouldRetry(err) {
sent = 0
finishWrite = false
offset, err = w.determineOffset(start)
Expand Down
3 changes: 1 addition & 2 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,9 +1033,8 @@ func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storage
// there is no need to add retries here.

// Retry only when the operation is idempotent or the retry policy is RetryAlways.
isIdempotent := params.conds != nil && (params.conds.GenerationMatch >= 0 || params.conds.DoesNotExist == true)
var useRetry bool
if (s.retry == nil || s.retry.policy == RetryIdempotent) && isIdempotent {
if (s.retry == nil || s.retry.policy == RetryIdempotent) && s.idempotent {
useRetry = true
} else if s.retry != nil && s.retry.policy == RetryAlways {
useRetry = true
Expand Down