Skip to content

Commit

Permalink
[IMPROVED] Return account not enabled for JetStream when accessing vi…
Browse files Browse the repository at this point in the history
…a system account (#4910)

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Dec 26, 2023
2 parents c5c6898 + a02c35c commit 26f0a9b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
44 changes: 41 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ const (
JSApiServerStreamCancelMove = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"

// The prefix for system level account API.
jsAPIAccountPre = "$JS.API.ACCOUNT."

// jsAckT is the template for the ack message stream coming back from a consumer
// when they ACK/NAK, etc a message.
jsAckT = "$JS.ACK.%s.%s"
Expand Down Expand Up @@ -347,6 +350,8 @@ type ApiResponse struct {
Error *ApiError `json:"error,omitempty"`
}

const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"

// When passing back to the clients generalize store failures.
var (
errStreamStoreFailed = errors.New("error creating store for stream")
Expand Down Expand Up @@ -739,26 +744,59 @@ type jsAPIRoutedReq struct {
}

func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
// Ignore system level directives meta stepdown and peer remove requests here.
if subject == JSApiLeaderStepDown ||
subject == JSApiRemoveServer ||
strings.HasPrefix(subject, jsAPIAccountPre) {
return
}
// No lock needed, those are immutable.
s, rr := js.srv, js.apiSubs.Match(subject)

hdr, _ := c.msgParts(rmsg)
hdr, msg := c.msgParts(rmsg)
if len(getHeader(ClientInfoHdr, hdr)) == 0 {
// Check if this is the system account. We will let these through for the account info only.
if s.SystemAccount() != acc || subject != JSApiAccountInfo {
sacc := s.SystemAccount()
if sacc != acc {
return
}
if subject != JSApiAccountInfo {
// Only respond from the initial server entry to the NATS system.
if c.kind == CLIENT || c.kind == LEAF {
var resp = ApiResponse{
Type: JSApiSystemResponseType,
Error: NewJSNotEnabledForAccountError(),
}
s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
}

// Short circuit for no interest.
if len(rr.psubs)+len(rr.qsubs) == 0 {
if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
var resp = ApiResponse{
Type: JSApiSystemResponseType,
Error: NewJSBadRequestError(),
}
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}

// We should only have psubs and only 1 per result.
// FIXME(dlc) - Should we respond here with NoResponders or error?
if len(rr.psubs) != 1 {
s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
if c.kind == CLIENT || c.kind == LEAF {
ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
var resp = ApiResponse{
Type: JSApiSystemResponseType,
Error: NewJSBadRequestError(),
}
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
jsub := rr.psubs[0]
Expand Down
28 changes: 28 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5893,3 +5893,31 @@ func TestJetStreamClusterStreamLimitsOnScaleUpAndMove(t *testing.T) {
})
require_Error(t, err, errors.New("insufficient storage resources"))
}

func TestJetStreamClusterAPIAccessViaSystemAccount(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Connect to system account.
nc, js := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "TEST"})
require_Error(t, err, NewJSNotEnabledForAccountError())

// Make sure same behavior swith single server.
tmpl := `
listen: 127.0.0.1:-1
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, t.TempDir())))
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

nc, js = jsClientConnect(t, s, nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

_, err = js.AddStream(&nats.StreamConfig{Name: "TEST"})
require_Error(t, err, NewJSNotEnabledForAccountError())
}

0 comments on commit 26f0a9b

Please sign in to comment.