Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Fix settlement handling and add Message.SendSettled
Browse files Browse the repository at this point in the history
* Message.SendSettled allows sending messages settled when
  LinkSenderSettle is ModeMixed.
  • Loading branch information
vcabbage committed Aug 22, 2019
1 parent ad1b5de commit ffafbe4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 25 deletions.
19 changes: 9 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,7 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, er
var (
maxPayloadSize = int64(s.link.session.conn.peerMaxFrameSize) - maxTransferFrameHeader
sndSettleMode = s.link.senderSettleMode
rcvSettleMode = s.link.receiverSettleMode
senderSettled = sndSettleMode != nil && *sndSettleMode == ModeSettled
senderSettled = sndSettleMode != nil && (*sndSettleMode == ModeSettled || (*sndSettleMode == ModeMixed && msg.SendSettled))
deliveryID = atomic.AddUint32(&s.link.session.nextDeliveryID, 1)
)

Expand All @@ -416,16 +415,16 @@ func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, er
fr.Payload = append([]byte(nil), buf...)
fr.More = s.buf.len() > 0
if !fr.More {
// SSM=settled: overrides RSM; no acks.
// SSM=unsettled: sender should wait for receiver to ack
// RSM=first: receiver considers it settled immediately, but must still send ack (SSM=unsettled only)
// RSM=second: receiver sends ack and waits for return ack from sender (SSM=unsettled only)

// mark final transfer as settled when sender mode is settled
fr.Settled = senderSettled

// set done on last frame to be closed after network transmission
//
// If confirmSettlement is true (ReceiverSettleMode == "second"),
// Session.mux will intercept the done channel and close it when the
// receiver has confirmed settlement instead of on net transmit.
// set done on last frame
fr.done = make(chan deliveryState, 1)
fr.confirmSettlement = rcvSettleMode != nil && *rcvSettleMode == ModeSecond
}

select {
Expand Down Expand Up @@ -750,9 +749,9 @@ func (s *Session) mux(remoteBegin *performBegin) {
delete(handlesByDeliveryID, deliveryID)
}

// if confirmSettlement requested, add done chan to map
// if not settled, add done chan to map
// and clear from frame so conn doesn't close it.
if fr.confirmSettlement && fr.done != nil {
if !fr.Settled && fr.done != nil {
settlementByDeliveryID[deliveryID] = fr.done
fr.done = nil
}
Expand Down
13 changes: 2 additions & 11 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,8 @@ func TestIssue48_ReceiverModeSecond(t *testing.T) {
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress(hubName),
amqp.LinkSenderSettle(amqp.ModeUnsettled),
amqp.LinkReceiverSettle(amqp.ModeFirst),
)
if err != nil {
t.Fatalf("%+v\n", err)
Expand All @@ -1144,17 +1146,6 @@ func TestIssue48_ReceiverModeSecond(t *testing.T) {
[]byte("there"),
},
})
time.Sleep(1 * time.Second) // Have to wait long enough for disposition to come through.
if err != nil {
t.Fatalf("Unexpected error response: %+v", err)
}

// Second send should get async error
err = sender.Send(context.Background(), &amqp.Message{
Data: [][]byte{
[]byte("hello"),
},
})
if err == nil {
t.Fatal("Expected error, got nil")
}
Expand Down
13 changes: 9 additions & 4 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,10 +1262,10 @@ type performTransfer struct {
Payload []byte

// optional channel to indicate to sender that transfer has completed
//
// Settled=true: closed when the transferred on network.
// Settled=false: closed when the receiver has confirmed settlement.
done chan deliveryState
// complete when receiver has responded with disposition (ReceiverSettleMode = second)
// instead of when this message has been sent on network
confirmSettlement bool
}

func (t *performTransfer) frameBody() {}
Expand Down Expand Up @@ -1726,6 +1726,11 @@ type Message struct {
// encryption details).
Footer Annotations

// Mark the message as settled when LinkSenderSettle is ModeMixed.
//
// This field is ignored when LinkSenderSettle is not ModeMixed.
SendSettled bool

receiver *Receiver // Receiver the message was received from
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
Expand Down Expand Up @@ -1810,7 +1815,7 @@ func (m *Message) MarshalBinary() ([]byte, error) {
}

func (m *Message) shouldSendDisposition() bool {
return !m.settled || (m.receiver.link.receiverSettleMode != nil && *m.receiver.link.receiverSettleMode == ModeSecond)
return !m.settled
}

func (m *Message) marshal(wr *buffer) error {
Expand Down

0 comments on commit ffafbe4

Please sign in to comment.