Skip to content

Commit

Permalink
Merge pull request #1055 from ploxiln/rm_nsqd_old_meta
Browse files Browse the repository at this point in the history
nsqd: remove support for old metadata filename
  • Loading branch information
mreiferson authored Aug 20, 2018
2 parents 4585cdd + 0eeb50b commit e319f8c
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 153 deletions.
54 changes: 1 addition & 53 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nsqd

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -13,7 +12,6 @@ import (
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -274,10 +272,6 @@ func newMetadataFile(opts *Options) string {
return path.Join(opts.DataPath, "nsqd.dat")
}

func oldMetadataFile(opts *Options) string {
return path.Join(opts.DataPath, fmt.Sprintf("nsqd.%d.dat", opts.ID))
}

func readOrEmpty(fn string) ([]byte, error) {
data, err := ioutil.ReadFile(fn)
if err != nil {
Expand Down Expand Up @@ -307,30 +301,13 @@ func (n *NSQD) LoadMetadata() error {
defer atomic.StoreInt32(&n.isLoading, 0)

fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())

data, err := readOrEmpty(fn)
if err != nil {
return err
}
dataID, errID := readOrEmpty(fnID)
if errID != nil {
return errID
}

if data == nil && dataID == nil {
return nil // fresh start
}
if data != nil && dataID != nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
}
}
if data == nil {
// only old metadata file exists, use it
fn = fnID
data = dataID
return nil // fresh start
}

var m meta
Expand Down Expand Up @@ -366,8 +343,6 @@ func (n *NSQD) LoadMetadata() error {
func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fileNameID := oldMetadataFile(n.getOpts())

n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)

Expand Down Expand Up @@ -418,33 +393,6 @@ func (n *NSQD) PersistMetadata() error {
}
// technically should fsync DataPath here

stat, err := os.Lstat(fileNameID)
if err == nil && stat.Mode()&os.ModeSymlink != 0 {
return nil
}

// if no symlink (yet), race condition:
// crash right here may cause next startup to see metadata conflict and abort

tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

if runtime.GOOS != "windows" {
err = os.Symlink(fileName, tmpFileNameID)
} else {
// on Windows need Administrator privs to Symlink
// instead write copy every time
err = writeSyncFile(tmpFileNameID, data)
}
if err != nil {
return err
}

err = os.Rename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
// technically should fsync DataPath here

return nil
}

Expand Down
100 changes: 0 additions & 100 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,106 +158,6 @@ func TestStartup(t *testing.T) {
<-doneExitChan
}

func TestMetadataMigrate(t *testing.T) {
old_meta := `
{
"topics": [
{
"channels": [
{"name": "c1", "paused": false},
{"name": "c2", "paused": false}
],
"name": "t1",
"paused": false
}
],
"version": "1.0.0-alpha"
}`

tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.DataPath = tmpDir
opts.Logger = test.NewTestLogger(t)

oldFn := oldMetadataFile(opts)
err = ioutil.WriteFile(oldFn, []byte(old_meta), 0600)
if err != nil {
panic(err)
}

_, _, nsqd := mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.Nil(t, err)
err = nsqd.PersistMetadata()
test.Nil(t, err)
nsqd.Exit()

oldFi, err := os.Lstat(oldFn)
test.Nil(t, err)
test.Equal(t, oldFi.Mode()&os.ModeType, os.ModeSymlink)

_, _, nsqd = mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.Nil(t, err)

t1, err := nsqd.GetExistingTopic("t1")
test.Nil(t, err)
test.NotNil(t, t1)
c2, err := t1.GetExistingChannel("c2")
test.Nil(t, err)
test.NotNil(t, c2)

nsqd.Exit()
}

func TestMetadataConflict(t *testing.T) {
old_meta := `
{
"topics": [{
"name": "t1", "paused": false,
"channels": [{"name": "c1", "paused": false}]
}],
"version": "1.0.0-alpha"
}`
new_meta := `
{
"topics": [{
"name": "t2", "paused": false,
"channels": [{"name": "c2", "paused": false}]
}],
"version": "1.0.0-alpha"
}`

tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.DataPath = tmpDir
opts.Logger = test.NewTestLogger(t)

err = ioutil.WriteFile(oldMetadataFile(opts), []byte(old_meta), 0600)
if err != nil {
panic(err)
}
err = ioutil.WriteFile(newMetadataFile(opts), []byte(new_meta), 0600)
if err != nil {
panic(err)
}

_, _, nsqd := mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.NotNil(t, err)
nsqd.Exit()
}

func TestEphemeralTopicsAndChannels(t *testing.T) {
// ephemeral topics/channels are lazily removed after the last channel/client is removed
opts := NewOptions()
Expand Down

0 comments on commit e319f8c

Please sign in to comment.