Skip to content

Commit

Permalink
fix(storage): set flush and get_state to false on the last write in g…
Browse files Browse the repository at this point in the history
…RPC (#9013)

* chore(storage): set flush and get_state to false on the last write in gRPC

* clarify comment
  • Loading branch information
BrennaEpp authored Nov 15, 2023
1 parent 6bfaf37 commit c1e9fe5
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,8 +1629,8 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
},
WriteOffset: writeOffset,
FinishWrite: lastWriteOfEntireObject,
Flush: remainingDataFitsInSingleReq,
StateLookup: remainingDataFitsInSingleReq,
Flush: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
StateLookup: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
}

// Open a new stream if necessary and set the first_message field on
Expand Down Expand Up @@ -1723,32 +1723,33 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
return nil, writeOffset, nil
}

// Done sending data (remainingDataFitsInSingleReq should == true if we
// reach this code). Receive from the stream to confirm the persisted data.
resp, err := w.stream.Recv()
// Done sending the data in the buffer (remainingDataFitsInSingleReq
// should == true if we reach this code).
// If we are done sending the whole object, close the stream and get the final
// object. Otherwise, receive from the stream to confirm the persisted data.
if !lastWriteOfEntireObject {
resp, err := w.stream.Recv()

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
writeOffset, err = w.determineOffset(start)
continue
}
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil

continue
}
if err != nil {
return nil, 0, err
}

// Confirm the persisted data if we have not finished uploading the object.
if !lastWriteOfEntireObject {
if resp.GetPersistedSize() != writeOffset {
// Retry if not all bytes were persisted.
writeOffset = resp.GetPersistedSize()
Expand Down

0 comments on commit c1e9fe5

Please sign in to comment.