Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: use metadata struct for both marshal and unmarshal #1380

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 44 additions & 35 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,23 @@ func (n *NSQD) Main() error {
return err
}

type meta struct {
Topics []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
Channels []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
} `json:"channels"`
} `json:"topics"`
// Metadata is the collection of persistent information about the current NSQD.
type Metadata struct {
Topics []TopicMetadata `json:"topics"`
Version string `json:"version"`
}

// TopicMetadata is the collection of persistent information about a topic.
type TopicMetadata struct {
Name string `json:"name"`
Paused bool `json:"paused"`
Channels []ChannelMetadata `json:"channels"`
}

// ChannelMetadata is the collection of persistent information about a channel.
type ChannelMetadata struct {
Name string `json:"name"`
Paused bool `json:"paused"`
}

func newMetadataFile(opts *Options) string {
Expand Down Expand Up @@ -317,7 +325,7 @@ func (n *NSQD) LoadMetadata() error {
return nil // fresh start
}

var m meta
var m Metadata
err = json.Unmarshal(data, &m)
if err != nil {
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
Expand Down Expand Up @@ -347,46 +355,47 @@ func (n *NSQD) LoadMetadata() error {
return nil
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())

n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)

js := make(map[string]interface{})
topics := []interface{}{}
// GetMetadata retrieves the current topic and channel set of the NSQ daemon. If
// the ephemeral flag is set, ephemeral topics are also returned even though these
// are not saved to disk.
func (n *NSQD) GetMetadata(ephemeral bool) *Metadata {
meta := &Metadata{
Version: version.Binary,
}
for _, topic := range n.topicMap {
if topic.ephemeral {
if topic.ephemeral && !ephemeral {
continue
}
topicData := make(map[string]interface{})
topicData["name"] = topic.name
topicData["paused"] = topic.IsPaused()
channels := []interface{}{}
topicData := TopicMetadata{
Name: topic.name,
Paused: topic.IsPaused(),
}
topic.Lock()
for _, channel := range topic.channelMap {
if channel.ephemeral {
continue
}
channel.Lock()
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
channel.Unlock()
channels = append(channels, channelData)
topicData.Channels = append(topicData.Channels, ChannelMetadata{
Name: channel.name,
Paused: channel.IsPaused(),
})
}
topic.Unlock()
topicData["channels"] = channels
topics = append(topics, topicData)
meta.Topics = append(meta.Topics, topicData)
}
js["version"] = version.Binary
js["topics"] = topics
return meta
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())

data, err := json.Marshal(&js)
n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)

data, err := json.Marshal(n.GetMetadata(false))
if err != nil {
return err
}

tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

err = writeSyncFile(tmpFileName, data)
Expand Down
4 changes: 2 additions & 2 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ const (
RequestTimeout = 5 * time.Second
)

func getMetadata(n *NSQD) (*meta, error) {
func getMetadata(n *NSQD) (*Metadata, error) {
fn := newMetadataFile(n.getOpts())
data, err := ioutil.ReadFile(fn)
if err != nil {
return nil, err
}

var m meta
var m Metadata
err = json.Unmarshal(data, &m)
if err != nil {
return nil, err
Expand Down