-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker recovery #1667
Broker recovery #1667
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -194,6 +194,34 @@ func TestBroker_Unsubscribe_ErrReplicaNotFound(t *testing.T) { | |
} | ||
} | ||
|
||
// Ensure the broker can reopen and recover correctly. | ||
func TestBroker_Reopen(t *testing.T) { | ||
b := NewBroker(nil) | ||
defer b.Close() | ||
b.MustCreateReplica(2000, &url.URL{Host: "localhost"}) | ||
b.MustSubscribe(2000, 20) | ||
b.MustSubscribe(2000, 21) | ||
b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) | ||
b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To fully test the code, should this test publish to at least two different topics? |
||
b.MustPublishSync(&messaging.Message{TopicID: 21, Data: []byte("0000")}) | ||
index := b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) | ||
time.Sleep(100 * time.Millisecond) | ||
|
||
// Close broker and reopen with a new broker instance. | ||
path, u := b.Path(), b.URL() | ||
b.Broker.Close() | ||
b.Broker = messaging.NewBroker() | ||
if err := b.Broker.Open(path, u); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// Verify the broker is up to date. | ||
newIndex := b.Index() | ||
if newIndex != index { | ||
t.Fatalf("index mismatch: exp=%d, got=%d", index, newIndex) | ||
} | ||
} | ||
|
||
// Benchmarks a single broker without HTTP. | ||
func BenchmarkBroker_Publish(b *testing.B) { | ||
br := NewBroker(nil) | ||
|
@@ -276,6 +304,43 @@ func (b *Broker) MustReadAll(replicaID uint64) (a []*messaging.Message) { | |
return | ||
} | ||
|
||
// MustCreateReplica creates a new replica. Panic on error. | ||
func (b *Broker) MustCreateReplica(replicaID uint64, u *url.URL) { | ||
if err := b.CreateReplica(replicaID, u); err != nil { | ||
panic(err.Error()) | ||
} | ||
} | ||
|
||
// MustSubscribe subscribes a replica to a topic. Panic on error. | ||
func (b *Broker) MustSubscribe(replicaID, topicID uint64) { | ||
if err := b.Subscribe(replicaID, topicID); err != nil { | ||
panic(err.Error()) | ||
} | ||
} | ||
|
||
// MustSync syncs to a broker index. Panic on error. | ||
func (b *Broker) MustSync(index uint64) { | ||
if err := b.Sync(index); err != nil { | ||
panic(err.Error()) | ||
} | ||
} | ||
|
||
// MustPublish publishes a message to the broker. Panic on error. | ||
func (b *Broker) MustPublish(m *messaging.Message) uint64 { | ||
index, err := b.Publish(&messaging.Message{Type: 100, TopicID: 20, Data: []byte("0000")}) | ||
if err != nil { | ||
panic(err.Error()) | ||
} | ||
return index | ||
} | ||
|
||
// MustPublishSync publishes a message to the broker and syncs to that index. Panic on error. | ||
func (b *Broker) MustPublishSync(m *messaging.Message) uint64 { | ||
index := b.MustPublish(m) | ||
b.MustSync(index) | ||
return index | ||
} | ||
|
||
// Messages represents a collection of messages. | ||
// This type provides helper functions. | ||
type Messages []*messaging.Message | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,7 +45,7 @@ const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term | |
|
||
// WaitInterval represents the amount of time between checks to the applied index. | ||
// This is used by clients wanting to wait until a given index is processed. | ||
const WaitInterval = 100 * time.Millisecond | ||
const WaitInterval = 1 * time.Millisecond | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deliberate? Just want to be sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, you have an explicit commit for this, so all good. |
||
|
||
// State represents whether the log is a follower, candidate, or leader. | ||
type State int | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it's going to take a long time if there are quite a bit of topic logs backed up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we add truncation (and log segments) then it'll only have to traverse through the latest log segment for each topic.