diff --git a/server/accounts.go b/server/accounts.go index 962b54d5d4c..ebb75ebfc9d 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -54,6 +54,7 @@ type Account struct { Name string Nkey string Issuer string + TraceDest string claimJWT string updated time.Time mu sync.RWMutex @@ -260,6 +261,7 @@ func (a *Account) String() string { func (a *Account) shallowCopy(na *Account) { na.Nkey = a.Nkey na.Issuer = a.Issuer + na.TraceDest = a.TraceDest if a.imports.streams != nil { na.imports.streams = make([]*streamImport, 0, len(a.imports.streams)) diff --git a/server/accounts_test.go b/server/accounts_test.go index e48b92b169f..ae3ad9ff28c 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -509,7 +509,8 @@ func TestAccountSimpleConfig(t *testing.T) { } func TestAccountParseConfig(t *testing.T) { - confFileName := createConfFile(t, []byte(` + traceDest := "my.trace.dest" + confFileName := createConfFile(t, []byte(fmt.Sprintf(` accounts { synadia { users = [ @@ -518,13 +519,14 @@ func TestAccountParseConfig(t *testing.T) { ] } nats.io { + trace_dest: %q users = [ {user: derek, password: foo} {user: ivan, password: bar} ] } } - `)) + `, traceDest))) opts, err := ProcessConfigFile(confFileName) if err != nil { t.Fatalf("Received an error processing config file: %v", err) @@ -548,6 +550,7 @@ func TestAccountParseConfig(t *testing.T) { if natsAcc == nil { t.Fatalf("Error retrieving account for 'nats.io'") } + require_Equal[string](t, natsAcc.TraceDest, traceDest) for _, u := range opts.Users { if u.Username == "derek" { diff --git a/server/client.go b/server/client.go index d3466ae820c..e55c0e5d1dd 100644 --- a/server/client.go +++ b/server/client.go @@ -4349,11 +4349,11 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // We do so by setting the c.pa.trace to nil (it will be restored // with c.pa = pacopy). c.pa.trace = nil - // We also need to disable the trace destination header so that - // if message is routed, it does not initialize tracing in the + // We also need to disable the message trace headers so that + // if the message is routed, it does not initialize tracing in the // remote. - pos := mt.disableTraceHeader(c, msg) - defer mt.enableTraceHeader(c, msg, pos) + positions := mt.disableTraceHeaders(c, msg) + defer mt.enableTraceHeaders(c, msg, positions) } } } diff --git a/server/config_check_test.go b/server/config_check_test.go index f1cf644fb9b..9634d07dee7 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -913,6 +913,28 @@ func TestConfigCheck(t *testing.T) { errorLine: 7, errorPos: 25, }, + { + name: "when account trace destination is of the wrong type", + config: ` + accounts { + A { trace_dest: 123 } + } + `, + err: errors.New(`interface conversion: interface {} is int64, not string`), + errorLine: 3, + errorPos: 23, + }, + { + name: "when account trace destination is not a valid destination", + config: ` + accounts { + A { trace_dest: "invalid..dest" } + } + `, + err: errors.New(`Trace destination "invalid..dest" is not valid`), + errorLine: 3, + errorPos: 23, + }, { name: "when user authorization config has both token and users", config: ` diff --git a/server/msgtrace.go b/server/msgtrace.go index 44bde6ea4db..4008e5119bc 100644 --- a/server/msgtrace.go +++ b/server/msgtrace.go @@ -330,12 +330,25 @@ func (c *client) initMsgTrace() *msgTrace { return nil } hdr := c.msgBuf[:c.pa.hdr] + var external bool // Do not call c.parseState.getHeader() yet for performance reasons. // We first do a "manual" search of the "send-to" destination's header. // If not present, no need to lift the message headers. td := getHeader(MsgTraceSendTo, hdr) if len(td) <= 0 { - return nil + // If the NATS trace destination header is not present, look for + // the external trace header. + trcCtxHdrVal := getHeader(trcCtx, hdr) + if len(trcCtxHdrVal) <= 0 { + return nil + } + // See if we need to trigger the race based on the last token + // that should be set to "01". + tk := bytes.Split(trcCtxHdrVal, stringToBytes("-")) + if len(tk) != 4 || len([]byte(tk[3])) != 2 || !bytes.Equal(tk[3], stringToBytes("01")) { + return nil + } + external = true } // Now we know that this is a message that requested tracing, we // will lift the headers since we also need to transfer them to @@ -346,11 +359,14 @@ func (c *client) initMsgTrace() *msgTrace { } ct := getCompressionType(headers.Get(acceptEncodingHeader)) var traceOnly bool - if to := headers.Get(MsgTraceOnly); to != _EMPTY_ { - tos := strings.ToLower(to) - switch tos { - case "1", "true", "on": - traceOnly = true + // Check for traceOnly only if not external. + if !external { + if to := headers.Get(MsgTraceOnly); to != _EMPTY_ { + tos := strings.ToLower(to) + switch tos { + case "1", "true", "on": + traceOnly = true + } } } var ( @@ -403,11 +419,26 @@ func (c *client) initMsgTrace() *msgTrace { acc = c.acc ian = acc.GetName() } + // If external, we need to have the account's trace destination set, + // otherwise, we are not enabling tracing. + var dest string + if external { + if acc != nil { + acc.mu.RLock() + dest = acc.TraceDest + acc.mu.RUnlock() + } + if dest == _EMPTY_ { + return nil + } + } else { + dest = string(td) + } c.pa.trace = &msgTrace{ srv: c.srv, acc: acc, oan: oan, - dest: string(td), + dest: dest, ct: ct, hop: hop, event: &MsgTraceEvent{ @@ -496,49 +527,61 @@ func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte { return c.setHeader(MsgTraceHop, t.nhop, msg) } -// Will look for the MsgTraceSendTo header and change the first character -// to an 'X' so that if this message is sent to a remote, the remote will -// not initialize tracing since it won't find the actual MsgTraceSendTo -// header. The function returns the position of the header so it can -// efficiently be re-enabled by calling enableTraceHeader. +// Will look for the MsgTraceSendTo and trcCtx headers and change the first +// character to an 'X' so that if this message is sent to a remote, the remote +// will not initialize tracing since it won't find the actual trace headers. +// The function returns the position of the headers so it can efficiently be +// re-enabled by calling enableTraceHeaders. // Note that if `msg` can be either the header alone or the full message // (header and payload). This function will use c.pa.hdr to limit the // search to the header section alone. -func (t *msgTrace) disableTraceHeader(c *client, msg []byte) int { +func (t *msgTrace) disableTraceHeaders(c *client, msg []byte) []int { // Code largely copied from getHeader(), except that we don't need the value if c.pa.hdr <= 0 { - return -1 + return []int{-1, -1} } hdr := msg[:c.pa.hdr] - key := stringToBytes(MsgTraceSendTo) - pos := bytes.Index(hdr, key) - if pos < 0 { - return -1 - } - // Make sure this key does not have additional prefix. - if pos < 2 || hdr[pos-1] != '\n' || hdr[pos-2] != '\r' { - return -1 - } - index := pos + len(key) - if index >= len(hdr) { - return -1 - } - if hdr[index] != ':' { - return -1 + headers := [2]string{MsgTraceSendTo, trcCtx} + positions := [2]int{} + for i := 0; i < 2; i++ { + key := stringToBytes(headers[i]) + pos := bytes.Index(hdr, key) + if pos < 0 { + positions[i] = -1 + continue + } + // Make sure this key does not have additional prefix. + if pos < 2 || hdr[pos-1] != '\n' || hdr[pos-2] != '\r' { + positions[i] = -1 + continue + } + index := pos + len(key) + if index >= len(hdr) { + positions[i] = -1 + continue + } + if hdr[index] != ':' { + positions[i] = -1 + continue + } + // Disable the trace by altering the first character of the header + hdr[pos] = 'X' + positions[i] = pos } - // Disable the trace by altering the first character of the header - hdr[pos] = 'X' - // Return the position of that character so we can re-enable it. - return pos + // Return the positions of those characters so we can re-enable the headers. + return positions[:2] } // Changes back the character at the given position `pos` in the `msg` // byte slice to the first character of the MsgTraceSendTo header. -func (t *msgTrace) enableTraceHeader(c *client, msg []byte, pos int) { - if pos <= 0 { - return +func (t *msgTrace) enableTraceHeaders(c *client, msg []byte, positions []int) { + headers := [2]string{MsgTraceSendTo, trcCtx} + for i, pos := range positions { + if pos == -1 { + continue + } + msg[pos] = headers[i][0] } - msg[pos] = MsgTraceSendTo[0] } func (t *msgTrace) setIngressError(err string) { diff --git a/server/msgtrace_test.go b/server/msgtrace_test.go index 222ecb23a49..e23f6c3a72d 100644 --- a/server/msgtrace_test.go +++ b/server/msgtrace_test.go @@ -1841,15 +1841,18 @@ func TestMsgTraceServiceImportWithSuperCluster(t *testing.T) { mappings = { bar: bozo } + trace_dest: "a.trace.subj" } B { users: [{user: b, password: pwd}] imports: [ { service: {account: "A", subject:">"} } ] exports: [ { service: ">" , allow_trace: ` + mainTest.allowStr + ` } ] + trace_dest: "b.trace.subj" } C { users: [{user: c, password: pwd}] exports: [ { service: ">" , allow_trace: ` + mainTest.allowStr + ` } ] + trace_dest: "c.trace.subj" } D { users: [{user: d, password: pwd}] @@ -1860,6 +1863,7 @@ func TestMsgTraceServiceImportWithSuperCluster(t *testing.T) { mappings = { bat: baz } + trace_dest: "d.trace.subj" } $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } @@ -1886,6 +1890,15 @@ func TestMsgTraceServiceImportWithSuperCluster(t *testing.T) { subSvcC := natsSubSync(t, nc3, "baz") natsFlush(t, nc3) + // Create a subscription for each account trace destination to make + // sure that we are not sending it there. + var accSubs []*nats.Subscription + for _, user := range []string{"a", "b", "c", "d"} { + nc := natsConnect(t, sfornc.ClientURL(), nats.UserInfo(user, "pwd")) + defer nc.Close() + accSubs = append(accSubs, natsSubSync(t, nc, user+".trace.subj")) + } + for _, test := range []struct { name string deliverMsg bool @@ -1899,6 +1912,10 @@ func TestMsgTraceServiceImportWithSuperCluster(t *testing.T) { if !test.deliverMsg { msg.Header.Set(MsgTraceOnly, "true") } + // We add the trcCtx header to make sure that it is deactivated + // in addition to the Nats-Trace-Dest header too when needed. + trcCtxVal := "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" + msg.Header.Set(trcCtx, trcCtxVal) if !test.deliverMsg { msg.Data = []byte("request1") } else { @@ -1924,10 +1941,16 @@ func TestMsgTraceServiceImportWithSuperCluster(t *testing.T) { if hv := appMsg.Header.Get(MsgTraceSendTo); hv != traceSub.Subject { t.Fatalf("Expecting header with %q, but got %q", traceSub.Subject, hv) } + if hv := appMsg.Header.Get(trcCtx); hv != trcCtxVal { + t.Fatalf("Expecting header with %q, but got %q", trcCtxVal, hv) + } } else { if hv := appMsg.Header.Get(MsgTraceSendTo); hv != _EMPTY_ { t.Fatalf("Expecting no header, but header was present with value: %q", hv) } + if hv := appMsg.Header.Get(trcCtx); hv != _EMPTY_ { + t.Fatalf("Expecting no header, but header was present with value: %q", hv) + } // We don't really need to check that, but we // should see the header with the first letter // being an `X`. @@ -1937,6 +1960,12 @@ func TestMsgTraceServiceImportWithSuperCluster(t *testing.T) { if hv := appMsg.Header.Get(hn); hv != traceSub.Subject { t.Fatalf("Expected header %q to be %q, got %q", hn, traceSub.Subject, hv) } + hnb = []byte(trcCtx) + hnb[0] = 'X' + hn = string(hnb) + if hv := appMsg.Header.Get(hn); hv != trcCtxVal { + t.Fatalf("Expected header %q to be %q, got %q", hn, trcCtxVal, hv) + } } appMsg.Respond(appMsg.Data) } @@ -2059,7 +2088,13 @@ func TestMsgTraceServiceImportWithSuperCluster(t *testing.T) { if tm, err := traceSub.NextMsg(250 * time.Millisecond); err == nil { t.Fatalf("Should not have received trace message: %s", tm.Data) } - + // Make sure that we never receive on any of the account + // trace destination's sub. + for _, sub := range accSubs { + if tm, err := sub.NextMsg(100 * time.Millisecond); err == nil { + t.Fatalf("Should not have received trace message on account's trace sub, got %s", tm.Data) + } + } // Make sure we properly remove the responses. checkResp := func(an string) { for _, s := range []*Server{sfornc, sfornc2, sfornc3} { @@ -3400,6 +3435,25 @@ func TestMsgTraceJetStreamWithSuperCluster(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 2) defer sc.shutdown() + traceDest := "my.trace.subj" + + // Hack to set the trace destination for the global account in order + // to make sure that the trcCtx header is disabled when a message + // is stored in JetStream, which will prevent emitting a trace + // when such message is retrieved and traverses a route. + // Without the account destination set, the trace would not be + // triggered, but that does not mean that we would have been + // doing the right thing of disabling the header. + for _, cl := range sc.clusters { + for _, s := range cl.servers { + acc, err := s.LookupAccount(globalAccountName) + require_NoError(t, err) + acc.mu.Lock() + acc.TraceDest = traceDest + acc.mu.Unlock() + } + } + c1 := sc.clusters[0] c2 := sc.clusters[1] nc, js := jsClientConnect(t, c1.randomServer()) @@ -3463,7 +3517,7 @@ func TestMsgTraceJetStreamWithSuperCluster(t *testing.T) { nct := natsConnect(t, s.ClientURL(), nats.Name("Tracer")) defer nct.Close() - traceSub := natsSubSync(t, nct, "my.trace.subj") + traceSub := natsSubSync(t, nct, traceDest) natsFlush(t, nct) for _, test := range []struct { @@ -3479,6 +3533,9 @@ func TestMsgTraceJetStreamWithSuperCluster(t *testing.T) { if !test.deliverMsg { msg.Header.Set(MsgTraceOnly, "true") } + // We add the trcCtx header to make sure that it is deactivated + // in addition to the Nats-Trace-Dest header too when needed. + msg.Header.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") msg.Header.Set(JSMsgId, "MyId") msg.Data = payload err = nct.PublishMsg(msg) @@ -3589,6 +3646,7 @@ func TestMsgTraceJetStreamWithSuperCluster(t *testing.T) { msg := nats.NewMsg(mainTest.stream) msg.Header.Set(MsgTraceSendTo, traceSub.Subject) msg.Header.Set(MsgTraceOnly, "true") + msg.Header.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") msg.Data = []byte("hello") return msg } @@ -3722,7 +3780,7 @@ func TestMsgTraceJetStreamWithSuperCluster(t *testing.T) { // been properly removed so that they don't trigger it. nct := natsConnect(t, s.ClientURL(), nats.Name("Tracer")) defer nct.Close() - traceSub := natsSubSync(t, nct, "my.trace.subj") + traceSub := natsSubSync(t, nct, traceDest) natsFlush(t, nct) jct, err := nct.JetStream() @@ -4199,3 +4257,263 @@ func TestMsgTraceHops(t *testing.T) { require_Equal[int](t, len(egress), 1) checkEgressClient(egress[0], "sub8") } + +func TestMsgTraceTriggeredByExternalHeader(t *testing.T) { + tmpl := ` + port: -1 + server_name: "%s" + accounts { + A { + users: [{user:A, password: pwd}] + trace_dest: "acc.trace.dest" + } + B { + users: [{user:B, password: pwd}] + %s + } + } + cluster { + name: "local" + port: -1 + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "A", _EMPTY_, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", _EMPTY_, fmt.Sprintf(`routes: ["nats://127.0.0.1:%d"]`, o1.Cluster.Port)))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + nc2 := natsConnect(t, s2.ClientURL(), nats.UserInfo("A", "pwd")) + defer nc2.Close() + appSub := natsSubSync(t, nc2, "foo") + natsFlush(t, nc2) + + checkSubInterest(t, s1, "A", "foo", time.Second) + + nc1 := natsConnect(t, s1.ClientURL(), nats.UserInfo("A", "pwd")) + defer nc1.Close() + + traceSub := natsSubSync(t, nc1, "trace.dest") + accTraceSub := natsSubSync(t, nc1, "acc.trace.dest") + natsFlush(t, nc1) + + checkSubInterest(t, s1, "A", traceSub.Subject, time.Second) + checkSubInterest(t, s1, "A", accTraceSub.Subject, time.Second) + + var msgCount int + for _, test := range []struct { + name string + setHeaders func(h nats.Header) + traceTriggered bool + traceOnly bool + expectedAccSub bool + }{ + // Tests with external header only (no Nats-Trace-Dest). In this case, the + // trace is triggered based on sampling (last token is `-01`). The presence + // of Nats-Trace-Only has no effect and message should always be delivered + // to the application. + {"only external header sampling", + func(h nats.Header) { + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + }, + true, + false, + true}, + {"only external header no sampling", + func(h nats.Header) { + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00") + }, + false, + false, + false}, + {"external header sampling and trace only", + func(h nats.Header) { + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + h.Set(MsgTraceOnly, "true") + }, + true, + false, + true}, + {"external header no sampling and trace only", + func(h nats.Header) { + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00") + h.Set(MsgTraceOnly, "true") + }, + false, + false, + false}, + // Tests where Nats-Trace-Dest is present, so ignore external header and + // always deliver to the Nats-Trace-Dest, not the account. + {"trace dest and external header sampling", + func(h nats.Header) { + h.Set(MsgTraceSendTo, traceSub.Subject) + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + }, + true, + false, + false}, + {"trace dest and external header no sampling", + func(h nats.Header) { + h.Set(MsgTraceSendTo, traceSub.Subject) + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00") + }, + true, + false, + false}, + {"trace dest with trace only and external header sampling", + func(h nats.Header) { + h.Set(MsgTraceSendTo, traceSub.Subject) + h.Set(MsgTraceOnly, "true") + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + }, + true, + true, + false}, + {"trace dest with trace only and external header no sampling", + func(h nats.Header) { + h.Set(MsgTraceSendTo, traceSub.Subject) + h.Set(MsgTraceOnly, "true") + h.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-00") + }, + true, + true, + false}, + } { + t.Run(test.name, func(t *testing.T) { + msg := nats.NewMsg("foo") + test.setHeaders(msg.Header) + msgCount++ + msgPayload := fmt.Sprintf("msg%d", msgCount) + msg.Data = []byte(msgPayload) + err := nc1.PublishMsg(msg) + require_NoError(t, err) + + if !test.traceOnly { + appMsg := natsNexMsg(t, appSub, time.Second) + require_Equal[string](t, string(appMsg.Data), msgPayload) + } + // Make sure we don't receive more (or not if trace only) + if appMsg, err := appSub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Expected no app message, got %q", appMsg.Data) + } + + checkTrace := func(sub *nats.Subscription) { + // We should receive 2 traces, 1 per server. + for i := 0; i < 2; i++ { + tm := natsNexMsg(t, sub, time.Second) + var e MsgTraceEvent + err := json.Unmarshal(tm.Data, &e) + require_NoError(t, err) + } + } + + if test.traceTriggered { + if test.expectedAccSub { + checkTrace(accTraceSub) + } else { + checkTrace(traceSub) + } + } + // Make sure no trace is received in the other trace sub + // or no trace received at all. + for _, sub := range []*nats.Subscription{accTraceSub, traceSub} { + if tm, err := sub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Expected no trace for the trace sub on %q, got %q", sub.Subject, tm.Data) + } + } + }) + } + + nc1.Close() + nc2.Close() + + // Now replace connections and subs for account "B" + nc2 = natsConnect(t, s2.ClientURL(), nats.UserInfo("B", "pwd")) + defer nc2.Close() + appSub = natsSubSync(t, nc2, "foo") + natsFlush(t, nc2) + + checkSubInterest(t, s1, "B", "foo", time.Second) + + nc1 = natsConnect(t, s1.ClientURL(), nats.UserInfo("B", "pwd")) + defer nc1.Close() + + traceSub = natsSubSync(t, nc1, "trace.dest") + accTraceSub = natsSubSync(t, nc1, "acc.trace.dest") + natsFlush(t, nc1) + + checkSubInterest(t, s1, "B", traceSub.Subject, time.Second) + checkSubInterest(t, s1, "B", accTraceSub.Subject, time.Second) + + for _, test := range []struct { + name string + reload bool + }{ + {"external header but no account destination", true}, + {"external header with account destination added through config reload", false}, + } { + t.Run(test.name, func(t *testing.T) { + msg := nats.NewMsg("foo") + msg.Header.Set(trcCtx, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + msg.Data = []byte("hello") + err := nc1.PublishMsg(msg) + require_NoError(t, err) + + // Application should receive the message + appMsg := natsNexMsg(t, appSub, time.Second) + require_Equal[string](t, string(appMsg.Data), "hello") + // Only once... + if appMsg, err := appSub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Expected no app message, got %q", appMsg.Data) + } + if !test.reload { + // We should receive the traces (1 per server) on the account destination + for i := 0; i < 2; i++ { + tm := natsNexMsg(t, accTraceSub, time.Second) + var e MsgTraceEvent + err := json.Unmarshal(tm.Data, &e) + require_NoError(t, err) + } + } + // No (or no more) trace message should be received. + for _, sub := range []*nats.Subscription{accTraceSub, traceSub} { + if tm, err := sub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Expected no trace for the trace sub on %q, got %q", sub.Subject, tm.Data) + } + } + // Do the config reload and we will repeat the test and now + // we should receive the trace message into the account + // destination trace. + if test.reload { + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", `trace_dest: "acc.trace.dest"`, _EMPTY_)) + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", `trace_dest: "acc.trace.dest"`, fmt.Sprintf(`routes: ["nats://127.0.0.1:%d"]`, o1.Cluster.Port))) + } + }) + } + + nc1.Close() + nc2.Close() + s2.Shutdown() + s1.Shutdown() + s1 = nil + + t.Run("check account trace destination valid when specified programmatically", func(t *testing.T) { + o1.Accounts[0].TraceDest = "invalid..dest" + o1.Accounts = o1.Accounts[:1] + accName := o1.Accounts[0].Name + o1.Port, o1.Cluster.Port = -1, -1 + s1, err := NewServer(o1) + if err == nil || !strings.Contains(err.Error(), fmt.Sprintf("trace_dest %q of account %q is not valid", "invalid..dest", accName)) { + if s1 != nil { + s1.Shutdown() + } + + } + s1.Shutdown() + }) +} diff --git a/server/opts.go b/server/opts.go index 73635f95c9e..00724e59a92 100644 --- a/server/opts.go +++ b/server/opts.go @@ -3025,6 +3025,14 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, err) continue } + case "trace_dest", "trace_destination", "trace_subject": + td := mv.(string) + if !IsValidSubject(td) { + err := &configErr{tk, fmt.Sprintf("Trace destination %q is not valid", mv)} + *errors = append(*errors, err) + continue + } + acc.TraceDest = td default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/server.go b/server/server.go index f437cbceca6..6c25e5b60b0 100644 --- a/server/server.go +++ b/server/server.go @@ -1046,6 +1046,15 @@ func validateOptions(o *Options) error { return fmt.Errorf("max_payload (%v) cannot be higher than max_pending (%v)", o.MaxPayload, o.MaxPending) } + // Check that account's trace_dest is a valid subject. + for _, acc := range o.Accounts { + if acc.TraceDest == _EMPTY_ { + continue + } + if !IsValidSubject(acc.TraceDest) { + return fmt.Errorf("trace_dest %q of account %q is not valid", acc.TraceDest, acc.Name) + } + } // Check that the trust configuration is correct. if err := validateTrustedOperators(o); err != nil { return err diff --git a/server/stream.go b/server/stream.go index 8e1d75682a7..a2613866383 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4213,11 +4213,11 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac msg = copyBytes(msg) } if mt, traceOnly := c.isMsgTraceEnabled(); mt != nil { - // If message is delivered, we need to disable the message trace destination - // header to prevent a trace event to be generated when a stored message + // If message is delivered, we need to disable the message trace headers + // to prevent a trace event to be generated when a stored message // is delivered to a consumer and routed. if !traceOnly { - mt.disableTraceHeader(c, hdr) + mt.disableTraceHeaders(c, hdr) } // This will add the jetstream event while in the client read loop. // Since the event will be updated in a different go routine, the