Skip to content

Commit

Permalink
Merge pull request #1380 from karalabe/unify-expose-metadata
Browse files Browse the repository at this point in the history
nsqd: use metadata struct for both marshal and unmarshal
  • Loading branch information
mreiferson authored Aug 2, 2022
2 parents 784d911 + 64eac42 commit ae2e77a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 37 deletions.
79 changes: 44 additions & 35 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,23 @@ func (n *NSQD) Main() error {
return err
}

type meta struct {
Topics []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
Channels []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
} `json:"channels"`
} `json:"topics"`
// Metadata is the collection of persistent information about the current NSQD.
type Metadata struct {
Topics []TopicMetadata `json:"topics"`
Version string `json:"version"`
}

// TopicMetadata is the collection of persistent information about a topic.
type TopicMetadata struct {
Name string `json:"name"`
Paused bool `json:"paused"`
Channels []ChannelMetadata `json:"channels"`
}

// ChannelMetadata is the collection of persistent information about a channel.
type ChannelMetadata struct {
Name string `json:"name"`
Paused bool `json:"paused"`
}

func newMetadataFile(opts *Options) string {
Expand Down Expand Up @@ -329,7 +337,7 @@ func (n *NSQD) LoadMetadata() error {
return nil // fresh start
}

var m meta
var m Metadata
err = json.Unmarshal(data, &m)
if err != nil {
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
Expand Down Expand Up @@ -359,46 +367,47 @@ func (n *NSQD) LoadMetadata() error {
return nil
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())

n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)

js := make(map[string]interface{})
topics := []interface{}{}
// GetMetadata retrieves the current topic and channel set of the NSQ daemon. If
// the ephemeral flag is set, ephemeral topics are also returned even though these
// are not saved to disk.
func (n *NSQD) GetMetadata(ephemeral bool) *Metadata {
meta := &Metadata{
Version: version.Binary,
}
for _, topic := range n.topicMap {
if topic.ephemeral {
if topic.ephemeral && !ephemeral {
continue
}
topicData := make(map[string]interface{})
topicData["name"] = topic.name
topicData["paused"] = topic.IsPaused()
channels := []interface{}{}
topicData := TopicMetadata{
Name: topic.name,
Paused: topic.IsPaused(),
}
topic.Lock()
for _, channel := range topic.channelMap {
if channel.ephemeral {
continue
}
channel.Lock()
channelData := make(map[string]interface{})
channelData["name"] = channel.name
channelData["paused"] = channel.IsPaused()
channel.Unlock()
channels = append(channels, channelData)
topicData.Channels = append(topicData.Channels, ChannelMetadata{
Name: channel.name,
Paused: channel.IsPaused(),
})
}
topic.Unlock()
topicData["channels"] = channels
topics = append(topics, topicData)
meta.Topics = append(meta.Topics, topicData)
}
js["version"] = version.Binary
js["topics"] = topics
return meta
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())

data, err := json.Marshal(&js)
n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)

data, err := json.Marshal(n.GetMetadata(false))
if err != nil {
return err
}

tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

err = writeSyncFile(tmpFileName, data)
Expand Down
4 changes: 2 additions & 2 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ const (
RequestTimeout = 5 * time.Second
)

func getMetadata(n *NSQD) (*meta, error) {
func getMetadata(n *NSQD) (*Metadata, error) {
fn := newMetadataFile(n.getOpts())
data, err := ioutil.ReadFile(fn)
if err != nil {
return nil, err
}

var m meta
var m Metadata
err = json.Unmarshal(data, &m)
if err != nil {
return nil, err
Expand Down

0 comments on commit ae2e77a

Please sign in to comment.