Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker recovery #1667

Merged
merged 3 commits into from
Feb 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func (b *Broker) metaPath() string {
return filepath.Join(b.path, "meta")
}

// Index returns the highest index seen by the broker across all topics.
// Returns 0 if the broker is closed.
func (b *Broker) Index() uint64 {
b.mu.Lock()
b.mu.Unlock()
return b.index
}

func (b *Broker) opened() bool { return b.path != "" }

// SetLogOutput sets writer for all Broker log output.
Expand Down Expand Up @@ -181,9 +189,23 @@ func (b *Broker) load() error {
}
}

// Set the broker's index to the last index seen across all topics.
b.index = hdr.maxIndex()
// Read the highest index from each of the topic files.
if err := b.loadIndex(); err != nil {
return fmt.Errorf("load index: %s", err)
}

return nil
}

// loadIndex reads through all topics to find the highest known index.
func (b *Broker) loadIndex() error {
for _, t := range b.topics {
if err := t.loadIndex(); err != nil {
return fmt.Errorf("topic(%d): %s", t.id, err)
} else if t.index > b.index {
b.index = t.index
}
}
return nil
}

Expand Down Expand Up @@ -568,12 +590,7 @@ func (fsm *brokerFSM) MustApply(e *raft.LogEntry) {
}

// Save highest applied index.
// TODO: Persist to disk for raft commands.
b.index = e.Index

// HACK: Persist metadata after each apply.
// This should be derived on startup from the topic logs.
b.mustSave()
}

// Index returns the highest index that the broker has seen.
Expand Down Expand Up @@ -774,6 +791,33 @@ func (t *topic) Close() error {
return nil
}

// loadIndex reads the highest available index for a topic from disk.
func (t *topic) loadIndex() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it's going to take a long time if there are quite a bit of topic logs backed up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we add truncation (and log segments) then it'll only have to traverse through the latest log segment for each topic.

// Open topic file for reading.
f, err := os.Open(t.path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer func() { _ = f.Close() }()

// Read all messages.
dec := NewMessageDecoder(bufio.NewReader(f))
for {
// Decode message.
var m Message
if err := dec.Decode(&m); err == io.EOF {
return nil
} else if err != nil {
return fmt.Errorf("decode: %s", err)
}

// Update the topic's highest index.
t.index = m.Index
}
}

// writeTo writes the topic to a replica since a given index.
// Returns an error if the starting index is unavailable.
func (t *topic) writeTo(r *Replica, index uint64) (int64, error) {
Expand Down
65 changes: 65 additions & 0 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,34 @@ func TestBroker_Unsubscribe_ErrReplicaNotFound(t *testing.T) {
}
}

// Ensure the broker can reopen and recover correctly.
func TestBroker_Reopen(t *testing.T) {
b := NewBroker(nil)
defer b.Close()
b.MustCreateReplica(2000, &url.URL{Host: "localhost"})
b.MustSubscribe(2000, 20)
b.MustSubscribe(2000, 21)
b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")})
b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fully test the code, should this test publish to at least two different topics?

b.MustPublishSync(&messaging.Message{TopicID: 21, Data: []byte("0000")})
index := b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")})
time.Sleep(100 * time.Millisecond)

// Close broker and reopen with a new broker instance.
path, u := b.Path(), b.URL()
b.Broker.Close()
b.Broker = messaging.NewBroker()
if err := b.Broker.Open(path, u); err != nil {
t.Fatal(err)
}

// Verify the broker is up to date.
newIndex := b.Index()
if newIndex != index {
t.Fatalf("index mismatch: exp=%d, got=%d", index, newIndex)
}
}

// Benchmarks a single broker without HTTP.
func BenchmarkBroker_Publish(b *testing.B) {
br := NewBroker(nil)
Expand Down Expand Up @@ -276,6 +304,43 @@ func (b *Broker) MustReadAll(replicaID uint64) (a []*messaging.Message) {
return
}

// MustCreateReplica creates a new replica. Panic on error.
func (b *Broker) MustCreateReplica(replicaID uint64, u *url.URL) {
if err := b.CreateReplica(replicaID, u); err != nil {
panic(err.Error())
}
}

// MustSubscribe subscribes a replica to a topic. Panic on error.
func (b *Broker) MustSubscribe(replicaID, topicID uint64) {
if err := b.Subscribe(replicaID, topicID); err != nil {
panic(err.Error())
}
}

// MustSync syncs to a broker index. Panic on error.
func (b *Broker) MustSync(index uint64) {
if err := b.Sync(index); err != nil {
panic(err.Error())
}
}

// MustPublish publishes a message to the broker. Panic on error.
func (b *Broker) MustPublish(m *messaging.Message) uint64 {
index, err := b.Publish(&messaging.Message{Type: 100, TopicID: 20, Data: []byte("0000")})
if err != nil {
panic(err.Error())
}
return index
}

// MustPublishSync publishes a message to the broker and syncs to that index. Panic on error.
func (b *Broker) MustPublishSync(m *messaging.Message) uint64 {
index := b.MustPublish(m)
b.MustSync(index)
return index
}

// Messages represents a collection of messages.
// This type provides helper functions.
type Messages []*messaging.Message
Expand Down
2 changes: 1 addition & 1 deletion raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term

// WaitInterval represents the amount of time between checks to the applied index.
// This is used by clients wanting to wait until a given index is processed.
const WaitInterval = 100 * time.Millisecond
const WaitInterval = 1 * time.Millisecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deliberate? Just want to be sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, you have an explicit commit for this, so all good.


// State represents whether the log is a follower, candidate, or leader.
type State int
Expand Down