-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker log recovery #2167
Broker log recovery #2167
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -468,7 +468,10 @@ func (b *Broker) Publish(m *Message) (uint64, error) { | |
} | ||
|
||
// TopicReader returns a new topic reader for a topic starting from a given index. | ||
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser { | ||
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface { | ||
io.ReadCloser | ||
io.Seeker | ||
} { | ||
return NewTopicReader(b.TopicPath(topicID), index, streaming) | ||
} | ||
|
||
|
@@ -733,9 +736,9 @@ func (t *Topic) Open() error { | |
s := segments.Last() | ||
|
||
// Read the last segment and extract the last message index. | ||
index, err := ReadSegmentMaxIndex(s.Path) | ||
index, err := RecoverSegment(s.Path) | ||
if err != nil { | ||
return fmt.Errorf("read segment max index: %s", err) | ||
return fmt.Errorf("recover segment: %s", err) | ||
} | ||
t.index = index | ||
|
||
|
@@ -775,28 +778,6 @@ func (t *Topic) close() error { | |
return nil | ||
} | ||
|
||
// ReadIndex reads the highest available index for a topic from disk. | ||
func (t *Topic) ReadIndex() (uint64, error) { | ||
// Read a list of all segments. | ||
segments, err := ReadSegments(t.path) | ||
if err != nil && !os.IsNotExist(err) { | ||
return 0, fmt.Errorf("read segments: %s", err) | ||
} | ||
|
||
// Ignore if there are no available segments. | ||
if len(segments) == 0 { | ||
return 0, nil | ||
} | ||
|
||
// Read highest index on the last segment. | ||
index, err := ReadSegmentMaxIndex(segments.Last().Path) | ||
if err != nil { | ||
return 0, fmt.Errorf("read segment max index: %s", err) | ||
} | ||
|
||
return index, nil | ||
} | ||
|
||
// WriteMessage writes a message to the end of the topic. | ||
func (t *Topic) WriteMessage(m *Message) error { | ||
t.mu.Lock() | ||
|
@@ -827,9 +808,9 @@ func (t *Topic) WriteMessage(m *Message) error { | |
} | ||
|
||
// Encode message. | ||
b := make([]byte, messageHeaderSize+len(m.Data)) | ||
b := make([]byte, MessageHeaderSize+len(m.Data)) | ||
copy(b, m.marshalHeader()) | ||
copy(b[messageHeaderSize:], m.Data) | ||
copy(b[MessageHeaderSize:], m.Data) | ||
|
||
// Write to last segment. | ||
if _, err := t.file.Write(b); err != nil { | ||
|
@@ -977,8 +958,9 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { | |
return segments[len(segments)-1], nil | ||
} | ||
|
||
// ReadSegmentMaxIndex returns the highest index recorded in a segment. | ||
func ReadSegmentMaxIndex(path string) (uint64, error) { | ||
// RecoverSegment parses the entire segment and truncates at any partial messages. | ||
// Returns the last index seen in the segment. | ||
func RecoverSegment(path string) (uint64, error) { | ||
// Open segment file. | ||
f, err := os.Open(path) | ||
if os.IsNotExist(err) { | ||
|
@@ -994,6 +976,16 @@ func ReadSegmentMaxIndex(path string) (uint64, error) { | |
for { | ||
var m Message | ||
if err := dec.Decode(&m); err == io.EOF { | ||
return index, nil | ||
} else if err == io.ErrUnexpectedEOF { | ||
// The decoder will unread any partially read data so we can | ||
// simply truncate at current position. | ||
if n, err := f.Seek(0, os.SEEK_CUR); err != nil { | ||
return 0, fmt.Errorf("seek: %s", err) | ||
} else if err := os.Truncate(path, n); err != nil { | ||
return 0, fmt.Errorf("truncate: n=%d, err=%s", n-1, err) | ||
} | ||
|
||
return index, nil | ||
} else if err != nil { | ||
return 0, fmt.Errorf("decode: %s", err) | ||
|
@@ -1028,6 +1020,19 @@ func NewTopicReader(path string, index uint64, streaming bool) *TopicReader { | |
} | ||
} | ||
|
||
// Seek seeks to a position the current segment. | ||
func (r *TopicReader) Seek(offset int64, whence int) (int64, error) { | ||
assert(whence == os.SEEK_CUR, "topic reader can only seek to a relative position") | ||
|
||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
|
||
if r.file == nil { | ||
return 0, nil | ||
} | ||
return r.file.Seek(offset, whence) | ||
} | ||
|
||
// Read reads the next bytes from the reader into the buffer. | ||
func (r *TopicReader) Read(p []byte) (int, error) { | ||
for { | ||
|
@@ -1120,7 +1125,7 @@ func (r *TopicReader) seekAfterIndex(f *os.File, seek uint64) error { | |
return err | ||
} else if m.Index >= seek { | ||
// Seek to message start. | ||
if _, err := f.Seek(-int64(messageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil { | ||
if _, err := f.Seek(-int64(MessageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil { | ||
return fmt.Errorf("seek: %s", err) | ||
} | ||
return nil | ||
|
@@ -1214,7 +1219,7 @@ const ( | |
) | ||
|
||
// The size of the encoded message header, in bytes. | ||
const messageHeaderSize = 2 + 8 + 8 + 4 | ||
const MessageHeaderSize = 2 + 8 + 8 + 4 | ||
|
||
// Message represents a single item in a topic. | ||
type Message struct { | ||
|
@@ -1230,34 +1235,34 @@ func (m *Message) WriteTo(w io.Writer) (n int64, err error) { | |
return int64(n), err | ||
} | ||
if n, err := w.Write(m.Data); err != nil { | ||
return int64(messageHeaderSize + n), err | ||
return int64(MessageHeaderSize + n), err | ||
} | ||
return int64(messageHeaderSize + len(m.Data)), nil | ||
return int64(MessageHeaderSize + len(m.Data)), nil | ||
} | ||
|
||
// MarshalBinary returns a binary representation of the message. | ||
// This implements encoding.BinaryMarshaler. An error cannot be returned. | ||
func (m *Message) MarshalBinary() ([]byte, error) { | ||
b := make([]byte, messageHeaderSize+len(m.Data)) | ||
b := make([]byte, MessageHeaderSize+len(m.Data)) | ||
copy(b, m.marshalHeader()) | ||
copy(b[messageHeaderSize:], m.Data) | ||
copy(b[MessageHeaderSize:], m.Data) | ||
return b, nil | ||
} | ||
|
||
// UnmarshalBinary reads a message from a binary encoded slice. | ||
// This implements encoding.BinaryUnmarshaler. | ||
func (m *Message) UnmarshalBinary(b []byte) error { | ||
m.unmarshalHeader(b) | ||
if len(b[messageHeaderSize:]) < len(m.Data) { | ||
return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data)) | ||
if len(b[MessageHeaderSize:]) < len(m.Data) { | ||
return fmt.Errorf("message data too short: %d < %d", len(b[MessageHeaderSize:]), len(m.Data)) | ||
} | ||
copy(m.Data, b[messageHeaderSize:]) | ||
copy(m.Data, b[MessageHeaderSize:]) | ||
return nil | ||
} | ||
|
||
// marshalHeader returns a byte slice with the message header. | ||
func (m *Message) marshalHeader() []byte { | ||
b := make([]byte, messageHeaderSize) | ||
b := make([]byte, MessageHeaderSize) | ||
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type)) | ||
binary.BigEndian.PutUint64(b[2:10], m.TopicID) | ||
binary.BigEndian.PutUint64(b[10:18], m.Index) | ||
|
@@ -1276,28 +1281,41 @@ func (m *Message) unmarshalHeader(b []byte) { | |
|
||
// MessageDecoder decodes messages from a reader. | ||
type MessageDecoder struct { | ||
r io.Reader | ||
r io.ReadSeeker | ||
} | ||
|
||
// NewMessageDecoder returns a new instance of the MessageDecoder. | ||
func NewMessageDecoder(r io.Reader) *MessageDecoder { | ||
func NewMessageDecoder(r io.ReadSeeker) *MessageDecoder { | ||
return &MessageDecoder{r: r} | ||
} | ||
|
||
// Decode reads a message from the decoder's reader. | ||
func (dec *MessageDecoder) Decode(m *Message) error { | ||
// Read header bytes. | ||
var b [messageHeaderSize]byte | ||
if _, err := io.ReadFull(dec.r, b[:]); err == io.EOF { | ||
// Unread if there is a partial read. | ||
var b [MessageHeaderSize]byte | ||
if n, err := io.ReadFull(dec.r, b[:]); err == io.EOF { | ||
return err | ||
} else if err == io.ErrUnexpectedEOF { | ||
if _, err := dec.r.Seek(-int64(n), os.SEEK_CUR); err != nil { | ||
return fmt.Errorf("cannot unread header: n=%d, err=%s", n, err) | ||
} | ||
warnf("unexpected eof(0): len=%d, n=%d, err=%s", len(b), n, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto? Want this, or is it debug? |
||
return err | ||
} else if err != nil { | ||
return fmt.Errorf("read header: %s", err) | ||
} | ||
m.unmarshalHeader(b[:]) | ||
|
||
// Read data. | ||
if _, err := io.ReadFull(dec.r, m.Data); err != nil { | ||
return fmt.Errorf("read body: %s", err) | ||
if n, err := io.ReadFull(dec.r, m.Data); err == io.EOF || err == io.ErrUnexpectedEOF { | ||
if _, err := dec.r.Seek(-int64(MessageHeaderSize+n), os.SEEK_CUR); err != nil { | ||
return fmt.Errorf("cannot unread header+data: n=%d, err=%s", n, err) | ||
} | ||
warnf("unexpected eof(1): len=%d, n=%d, err=%s", len(m.Data), n, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you want this output here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, for the short term. It'll give us a little extra data. We don't have these variables returned to the caller and there's no logger in here so I couldn't think of a better way to inject it. We can remove the warns in the future. |
||
return io.ErrUnexpectedEOF | ||
} else if err != nil { | ||
return fmt.Errorf("read data: %s", err) | ||
} | ||
|
||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you could remove this assert by simply having this function ignore
whence
, and then callingSeek()
withos.SEEK_CUR
. Of course, you may wish to catch errors in the future.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone passes a non-
SEEK_CUR
in the future I want to catch it immediately. If I silently override it then it'll be a pain to debug.