Skip to content

Commit

Permalink
Add advisories for pinning and unpinning consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Aug 13, 2024
1 parent e45ac9c commit a707ba8
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
70 changes: 69 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,62 @@ func (o *consumer) sendDeleteAdvisoryLocked() {
o.sendAdvisory(subj, j)
}

func (o *consumer) sendPinnedAdvisoryLocked() {
group := _EMPTY_
if len(o.cfg.PriorityGroups) == 0 {
group = o.cfg.PriorityGroups[0]
}
e := JSStreamGroupPinnedAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamGroupPinnedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Account: o.acc.Name,
Stream: o.stream,
Consumer: o.name,
Domain: o.srv.getOpts().JetStreamDomain,
PinnedClientId: o.currentNuid,
Group: group,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPinnedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)

}
func (o *consumer) sendUnpinnedAdvisoryLocked() {
group := _EMPTY_
if len(o.cfg.PriorityGroups) == 0 {
group = o.cfg.PriorityGroups[0]
}
e := JSStreamGroupUnPinnedAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamGroupPinnedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Account: o.acc.Name,
Stream: o.stream,
Consumer: o.name,
Domain: o.srv.getOpts().JetStreamDomain,
Group: group,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerUnpinnedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)

}

func (o *consumer) sendCreateAdvisory() {
o.mu.Lock()
defer o.mu.Unlock()
Expand Down Expand Up @@ -3424,7 +3480,6 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, []byte(JSPullRequestWrongPinID), nil, nil, 0))
o.waiting.removeCurrent()
if o.node != nil {
needNewPin = false
o.removeClusterPendingRequest(wr.reply)
}
wr.recycle()
Expand All @@ -3446,10 +3501,19 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
}
if len(rr.psubs)+len(rr.qsubs) > 0 {
if needNewPin {
o.sendPinnedAdvisoryLocked()
}
return o.waiting.pop()
} else if time.Since(wr.received) < defaultGatewayRecentSubExpiration && (o.srv.leafNodeEnabled || o.srv.gateway.enabled) {
if needNewPin {
o.sendPinnedAdvisoryLocked()
}
return o.waiting.pop()
} else if o.srv.gateway.enabled && o.srv.hasGatewayInterest(wr.acc.Name, wr.interest) {
if needNewPin {
o.sendPinnedAdvisoryLocked()
}
return o.waiting.pop()
}
} else {
Expand All @@ -3475,6 +3539,8 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
o.removeClusterPendingRequest(wr.reply)
}
wr.recycle()
// We did not find any wr, so let's reset the newly set pin.
o.currentNuid = _EMPTY_
}

return nil
Expand Down Expand Up @@ -3515,6 +3581,7 @@ func (o *consumer) processUnpinRequest(_ *subscription, c *client, _ *Account, _
o.mu.Lock()
defer o.mu.Unlock()
o.currentNuid = _EMPTY_
o.sendUnpinnedAdvisoryLocked()
}

// processNextMsgReq will process a request for the next message available. A nil message payload means deliver
Expand Down Expand Up @@ -3590,6 +3657,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
o.pinnedTtl = time.AfterFunc(o.cfg.PinnedTTL, func() {
o.mu.Lock()
o.currentNuid = _EMPTY_
o.sendUnpinnedAdvisoryLocked()
o.mu.Unlock()
o.signalNewMessages()
})
Expand Down
6 changes: 6 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ const (
// JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"

// JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"

// JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"

// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"

Expand Down
12 changes: 11 additions & 1 deletion server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,10 @@ func TestJetStreamConsumerPinned(t *testing.T) {
require_NoError(t, err)
require_NotNil(t, msg)

// Send a new request without pin ID, which should wit
advisories, err := nc.SubscribeSync("$JS.EVENT.ADVISORY.CONSUMER.*.TEST.C")
require_NoError(t, err)

// Send a new request without pin ID, which should work after the TTL.
req = JSApiConsumerGetNextRequest{Batch: 3, Expires: 30 * time.Second, PriorityGroups: PriorityGroups{}}
reqb, _ = json.Marshal(req)
reply = "FIVE"
Expand All @@ -1343,6 +1346,13 @@ func TestJetStreamConsumerPinned(t *testing.T) {
return err
})

advisory, err := advisories.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, fmt.Sprintf("%s.TEST.C", JSAdvisoryConsumerUnpinnedPre), advisory.Subject)
advisory, err = advisories.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, fmt.Sprintf("%s.TEST.C", JSAdvisoryConsumerPinnedPre), advisory.Subject)

}

// Simple happy path test for consumer with overflow.
Expand Down
28 changes: 28 additions & 0 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,34 @@ type JSConsumerQuorumLostAdvisory struct {
Domain string `json:"domain,omitempty"`
}

const JSStreamGroupPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_pinned"

// JSStreamGroupPinnedAdvisory that a group switched to a new pinned client
type JSStreamGroupPinnedAdvisory struct {
TypedEvent
Account string `json:"account,omitempty"`
Stream string `json:"stream"`
Consumer string `json:"consumer"`
Domain string `json:"domain,omitempty"`
Group string `json:"group"`
PinnedClientId string `json:"pinned_id"`
Client *ClientInfo `json:"client"` // if available
}

const JSStreamGroupUnPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_unpinned"

// JSStreamGroupUnPinnedAdvisory indicates that a pin was lost
type JSStreamGroupUnPinnedAdvisory struct {
TypedEvent
Account string `json:"account,omitempty"`
Stream string `json:"stream"`
Consumer string `json:"consumer"`
Domain string `json:"domain,omitempty"`
Group string `json:"group"`
// one of "admin" or "timeout", could be an enum up to the implementor to decide
Reason string `json:"reason"`
}

// JSServerOutOfStorageAdvisoryType is sent when the server is out of storage space.
const JSServerOutOfStorageAdvisoryType = "io.nats.jetstream.advisory.v1.server_out_of_space"

Expand Down

0 comments on commit a707ba8

Please sign in to comment.