From 64cc1fb0f605694db8d799c72c64cfde366a1838 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 21 Feb 2015 08:19:28 -0700 Subject: [PATCH 1/3] Lower wait interval to speed up tests. --- raft/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/log.go b/raft/log.go index 9c674b6e200..c2280597a37 100644 --- a/raft/log.go +++ b/raft/log.go @@ -45,7 +45,7 @@ const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term // WaitInterval represents the amount of time between checks to the applied index. // This is used by clients wanting to wait until a given index is processed. -const WaitInterval = 100 * time.Millisecond +const WaitInterval = 1 * time.Millisecond // State represents whether the log is a follower, candidate, or leader. type State int From a5692b71ee0ffaa7507f6903c599290727d588f5 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 21 Feb 2015 08:21:51 -0700 Subject: [PATCH 2/3] Add proper broker recovery. This commit fixes the broker recovery so that it determines the last index from the various topic logs instead of persisting the snapshot on every message that comes in. --- messaging/broker.go | 58 ++++++++++++++++++++++++++++++++----- messaging/broker_test.go | 62 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/messaging/broker.go b/messaging/broker.go index 1ac3ce0754b..e317f498494 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -57,6 +57,14 @@ func (b *Broker) metaPath() string { return filepath.Join(b.path, "meta") } +// Index returns the highest index seen by the broker. +// Returns 0 if the broker is closed. +func (b *Broker) Index() uint64 { + b.mu.Lock() + b.mu.Unlock() + return b.index +} + func (b *Broker) opened() bool { return b.path != "" } // SetLogOutput sets writer for all Broker log output. @@ -181,9 +189,23 @@ func (b *Broker) load() error { } } - // Set the broker's index to the last index seen across all topics. - b.index = hdr.maxIndex() + // Read the highest index from each of the topic files. + if err := b.loadIndex(); err != nil { + return fmt.Errorf("load index: %s", err) + } + + return nil +} +// loadIndex reads through all topics to find the highest known index. +func (b *Broker) loadIndex() error { + for _, t := range b.topics { + if err := t.loadIndex(); err != nil { + return fmt.Errorf("topic(%d): %s", t.id, err) + } else if t.index > b.index { + b.index = t.index + } + } return nil } @@ -568,12 +590,7 @@ func (fsm *brokerFSM) MustApply(e *raft.LogEntry) { } // Save highest applied index. - // TODO: Persist to disk for raft commands. b.index = e.Index - - // HACK: Persist metadata after each apply. - // This should be derived on startup from the topic logs. - b.mustSave() } // Index returns the highest index that the broker has seen. @@ -774,6 +791,33 @@ func (t *topic) Close() error { return nil } +// loadIndex reads the highest available index for a topic from disk. +func (t *topic) loadIndex() error { + // Open topic file for reading. + f, err := os.Open(t.path) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + defer func() { _ = f.Close() }() + + // Read all messages. + dec := NewMessageDecoder(bufio.NewReader(f)) + for { + // Decode message. + var m Message + if err := dec.Decode(&m); err == io.EOF { + return nil + } else if err != nil { + return fmt.Errorf("decode: %s", err) + } + + // Update the topic's highest index. + t.index = m.Index + } +} + // writeTo writes the topic to a replica since a given index. // Returns an error if the starting index is unavailable. func (t *topic) writeTo(r *Replica, index uint64) (int64, error) { diff --git a/messaging/broker_test.go b/messaging/broker_test.go index fda760514e6..1d0f6c7c843 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -194,6 +194,31 @@ func TestBroker_Unsubscribe_ErrReplicaNotFound(t *testing.T) { } } +// Ensure the broker can reopen and recover correctly. +func TestBroker_Reopen(t *testing.T) { + b := NewBroker(nil) + defer b.Close() + b.MustCreateReplica(2000, &url.URL{Host: "localhost"}) + b.MustSubscribe(2000, 20) + b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + index := b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + + // Close broker and reopen with a new broker instance. + path, u := b.Path(), b.URL() + b.Broker.Close() + b.Broker = messaging.NewBroker() + if err := b.Broker.Open(path, u); err != nil { + t.Fatal(err) + } + + // Verify the broker is up to date. + newIndex := b.Index() + if newIndex != index { + t.Fatalf("index mismatch: exp=%d, got=%d", index, newIndex) + } +} + // Benchmarks a single broker without HTTP. func BenchmarkBroker_Publish(b *testing.B) { br := NewBroker(nil) @@ -276,6 +301,43 @@ func (b *Broker) MustReadAll(replicaID uint64) (a []*messaging.Message) { return } +// MustCreateReplica creates a new replica. Panic on error. +func (b *Broker) MustCreateReplica(replicaID uint64, u *url.URL) { + if err := b.CreateReplica(replicaID, u); err != nil { + panic(err.Error()) + } +} + +// MustSubscribe subscribes a replica to a topic. Panic on error. +func (b *Broker) MustSubscribe(replicaID, topicID uint64) { + if err := b.Subscribe(replicaID, topicID); err != nil { + panic(err.Error()) + } +} + +// MustSync syncs to a broker index. Panic on error. +func (b *Broker) MustSync(index uint64) { + if err := b.Sync(index); err != nil { + panic(err.Error()) + } +} + +// MustPublish publishes a message to the broker. Panic on error. +func (b *Broker) MustPublish(m *messaging.Message) uint64 { + index, err := b.Publish(&messaging.Message{Type: 100, TopicID: 20, Data: []byte("0000")}) + if err != nil { + panic(err.Error()) + } + return index +} + +// MustPublishSync publishes a message to the broker and syncs to that index. Panic on error. +func (b *Broker) MustPublishSync(m *messaging.Message) uint64 { + index := b.MustPublish(m) + b.MustSync(index) + return index +} + // Messages represents a collection of messages. // This type provides helper functions. type Messages []*messaging.Message From 408cf37e5a599f3060b52c049294c0164078e43b Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 21 Feb 2015 15:25:11 -0700 Subject: [PATCH 3/3] Fix broker reopen test, docs. --- messaging/broker.go | 2 +- messaging/broker_test.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/messaging/broker.go b/messaging/broker.go index e317f498494..2d232a66b4b 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -57,7 +57,7 @@ func (b *Broker) metaPath() string { return filepath.Join(b.path, "meta") } -// Index returns the highest index seen by the broker. +// Index returns the highest index seen by the broker across all topics. // Returns 0 if the broker is closed. func (b *Broker) Index() uint64 { b.mu.Lock() diff --git a/messaging/broker_test.go b/messaging/broker_test.go index 1d0f6c7c843..85f1a86fce4 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -200,9 +200,12 @@ func TestBroker_Reopen(t *testing.T) { defer b.Close() b.MustCreateReplica(2000, &url.URL{Host: "localhost"}) b.MustSubscribe(2000, 20) + b.MustSubscribe(2000, 21) b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + b.MustPublishSync(&messaging.Message{TopicID: 21, Data: []byte("0000")}) index := b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + time.Sleep(100 * time.Millisecond) // Close broker and reopen with a new broker instance. path, u := b.Path(), b.URL()