Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker log recovery #2167

Merged
merged 2 commits into from
Apr 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
- [#2152](https://github.com/influxdb/influxdb/issues/2152): Influxd process with stats enabled crashing with 'Unsuported protocol scheme for ""'
- [#2156](https://github.com/influxdb/influxdb/pull/2156): Propagate error when resolving UDP address in Graphite UDP server.
- [#2163](https://github.com/influxdb/influxdb/pull/2163): Fix up paths for default data and run storage.
- [#2164](https://github.com/influxdb/influxdb/pull/2164): Append STDOUT/STDERR in init script.
- [#2164](https://github.com/influxdb/influxdb/pull/2164): Append STDOUT/STDERR in initscript.
- [#2165](https://github.com/influxdb/influxdb/pull/2165): Better name for config section for stats and diags.
- [#2165](https://github.com/influxdb/influxdb/pull/2165): Monitoring database and retention policy are not configurable.
- [#2167](https://github.com/influxdb/influxdb/pull/2167): Add broker log recovery.

## v0.9.0-rc19 [2015-04-01]

Expand Down
108 changes: 63 additions & 45 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,10 @@ func (b *Broker) Publish(m *Message) (uint64, error) {
}

// TopicReader returns a new topic reader for a topic starting from a given index.
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) io.ReadCloser {
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface {
io.ReadCloser
io.Seeker
} {
return NewTopicReader(b.TopicPath(topicID), index, streaming)
}

Expand Down Expand Up @@ -733,9 +736,9 @@ func (t *Topic) Open() error {
s := segments.Last()

// Read the last segment and extract the last message index.
index, err := ReadSegmentMaxIndex(s.Path)
index, err := RecoverSegment(s.Path)
if err != nil {
return fmt.Errorf("read segment max index: %s", err)
return fmt.Errorf("recover segment: %s", err)
}
t.index = index

Expand Down Expand Up @@ -775,28 +778,6 @@ func (t *Topic) close() error {
return nil
}

// ReadIndex reads the highest available index for a topic from disk.
func (t *Topic) ReadIndex() (uint64, error) {
// Read a list of all segments.
segments, err := ReadSegments(t.path)
if err != nil && !os.IsNotExist(err) {
return 0, fmt.Errorf("read segments: %s", err)
}

// Ignore if there are no available segments.
if len(segments) == 0 {
return 0, nil
}

// Read highest index on the last segment.
index, err := ReadSegmentMaxIndex(segments.Last().Path)
if err != nil {
return 0, fmt.Errorf("read segment max index: %s", err)
}

return index, nil
}

// WriteMessage writes a message to the end of the topic.
func (t *Topic) WriteMessage(m *Message) error {
t.mu.Lock()
Expand Down Expand Up @@ -827,9 +808,9 @@ func (t *Topic) WriteMessage(m *Message) error {
}

// Encode message.
b := make([]byte, messageHeaderSize+len(m.Data))
b := make([]byte, MessageHeaderSize+len(m.Data))
copy(b, m.marshalHeader())
copy(b[messageHeaderSize:], m.Data)
copy(b[MessageHeaderSize:], m.Data)

// Write to last segment.
if _, err := t.file.Write(b); err != nil {
Expand Down Expand Up @@ -977,8 +958,9 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) {
return segments[len(segments)-1], nil
}

// ReadSegmentMaxIndex returns the highest index recorded in a segment.
func ReadSegmentMaxIndex(path string) (uint64, error) {
// RecoverSegment parses the entire segment and truncates at any partial messages.
// Returns the last index seen in the segment.
func RecoverSegment(path string) (uint64, error) {
// Open segment file.
f, err := os.Open(path)
if os.IsNotExist(err) {
Expand All @@ -994,6 +976,16 @@ func ReadSegmentMaxIndex(path string) (uint64, error) {
for {
var m Message
if err := dec.Decode(&m); err == io.EOF {
return index, nil
} else if err == io.ErrUnexpectedEOF {
// The decoder will unread any partially read data so we can
// simply truncate at current position.
if n, err := f.Seek(0, os.SEEK_CUR); err != nil {
return 0, fmt.Errorf("seek: %s", err)
} else if err := os.Truncate(path, n); err != nil {
return 0, fmt.Errorf("truncate: n=%d, err=%s", n-1, err)
}

return index, nil
} else if err != nil {
return 0, fmt.Errorf("decode: %s", err)
Expand Down Expand Up @@ -1028,6 +1020,19 @@ func NewTopicReader(path string, index uint64, streaming bool) *TopicReader {
}
}

// Seek seeks to a position the current segment.
func (r *TopicReader) Seek(offset int64, whence int) (int64, error) {
assert(whence == os.SEEK_CUR, "topic reader can only seek to a relative position")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you could remove this assert by simply having this function ignore whence, and then calling Seek() with os.SEEK_CUR. Of course, you may wish to catch errors in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone passes a non-SEEK_CUR in the future I want to catch it immediately. If I silently override it then it'll be a pain to debug.


r.mu.Lock()
defer r.mu.Unlock()

if r.file == nil {
return 0, nil
}
return r.file.Seek(offset, whence)
}

// Read reads the next bytes from the reader into the buffer.
func (r *TopicReader) Read(p []byte) (int, error) {
for {
Expand Down Expand Up @@ -1120,7 +1125,7 @@ func (r *TopicReader) seekAfterIndex(f *os.File, seek uint64) error {
return err
} else if m.Index >= seek {
// Seek to message start.
if _, err := f.Seek(-int64(messageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil {
if _, err := f.Seek(-int64(MessageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil {
return fmt.Errorf("seek: %s", err)
}
return nil
Expand Down Expand Up @@ -1214,7 +1219,7 @@ const (
)

// The size of the encoded message header, in bytes.
const messageHeaderSize = 2 + 8 + 8 + 4
const MessageHeaderSize = 2 + 8 + 8 + 4

// Message represents a single item in a topic.
type Message struct {
Expand All @@ -1230,34 +1235,34 @@ func (m *Message) WriteTo(w io.Writer) (n int64, err error) {
return int64(n), err
}
if n, err := w.Write(m.Data); err != nil {
return int64(messageHeaderSize + n), err
return int64(MessageHeaderSize + n), err
}
return int64(messageHeaderSize + len(m.Data)), nil
return int64(MessageHeaderSize + len(m.Data)), nil
}

// MarshalBinary returns a binary representation of the message.
// This implements encoding.BinaryMarshaler. An error cannot be returned.
func (m *Message) MarshalBinary() ([]byte, error) {
b := make([]byte, messageHeaderSize+len(m.Data))
b := make([]byte, MessageHeaderSize+len(m.Data))
copy(b, m.marshalHeader())
copy(b[messageHeaderSize:], m.Data)
copy(b[MessageHeaderSize:], m.Data)
return b, nil
}

// UnmarshalBinary reads a message from a binary encoded slice.
// This implements encoding.BinaryUnmarshaler.
func (m *Message) UnmarshalBinary(b []byte) error {
m.unmarshalHeader(b)
if len(b[messageHeaderSize:]) < len(m.Data) {
return fmt.Errorf("message data too short: %d < %d", len(b[messageHeaderSize:]), len(m.Data))
if len(b[MessageHeaderSize:]) < len(m.Data) {
return fmt.Errorf("message data too short: %d < %d", len(b[MessageHeaderSize:]), len(m.Data))
}
copy(m.Data, b[messageHeaderSize:])
copy(m.Data, b[MessageHeaderSize:])
return nil
}

// marshalHeader returns a byte slice with the message header.
func (m *Message) marshalHeader() []byte {
b := make([]byte, messageHeaderSize)
b := make([]byte, MessageHeaderSize)
binary.BigEndian.PutUint16(b[0:2], uint16(m.Type))
binary.BigEndian.PutUint64(b[2:10], m.TopicID)
binary.BigEndian.PutUint64(b[10:18], m.Index)
Expand All @@ -1276,28 +1281,41 @@ func (m *Message) unmarshalHeader(b []byte) {

// MessageDecoder decodes messages from a reader.
type MessageDecoder struct {
r io.Reader
r io.ReadSeeker
}

// NewMessageDecoder returns a new instance of the MessageDecoder.
func NewMessageDecoder(r io.Reader) *MessageDecoder {
func NewMessageDecoder(r io.ReadSeeker) *MessageDecoder {
return &MessageDecoder{r: r}
}

// Decode reads a message from the decoder's reader.
func (dec *MessageDecoder) Decode(m *Message) error {
// Read header bytes.
var b [messageHeaderSize]byte
if _, err := io.ReadFull(dec.r, b[:]); err == io.EOF {
// Unread if there is a partial read.
var b [MessageHeaderSize]byte
if n, err := io.ReadFull(dec.r, b[:]); err == io.EOF {
return err
} else if err == io.ErrUnexpectedEOF {
if _, err := dec.r.Seek(-int64(n), os.SEEK_CUR); err != nil {
return fmt.Errorf("cannot unread header: n=%d, err=%s", n, err)
}
warnf("unexpected eof(0): len=%d, n=%d, err=%s", len(b), n, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto? Want this, or is it debug?

return err
} else if err != nil {
return fmt.Errorf("read header: %s", err)
}
m.unmarshalHeader(b[:])

// Read data.
if _, err := io.ReadFull(dec.r, m.Data); err != nil {
return fmt.Errorf("read body: %s", err)
if n, err := io.ReadFull(dec.r, m.Data); err == io.EOF || err == io.ErrUnexpectedEOF {
if _, err := dec.r.Seek(-int64(MessageHeaderSize+n), os.SEEK_CUR); err != nil {
return fmt.Errorf("cannot unread header+data: n=%d, err=%s", n, err)
}
warnf("unexpected eof(1): len=%d, n=%d, err=%s", len(m.Data), n, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want this output here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for the short term. It'll give us a little extra data. We don't have these variables returned to the caller and there's no logger in here so I couldn't think of a better way to inject it. We can remove the warns in the future.

return io.ErrUnexpectedEOF
} else if err != nil {
return fmt.Errorf("read data: %s", err)
}

return nil
Expand Down
102 changes: 90 additions & 12 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,77 @@ func (b *RaftFSMBroker) Index() uint64 { return 0 }
func (b *RaftFSMBroker) WriteTo(w io.Writer) (n int64, err error) { return 0, nil }
func (b *RaftFSMBroker) ReadFrom(r io.Reader) (n int64, err error) { return 0, nil }

// Ensure a topic can recover if it has a partial message.
func TestTopic_Recover(t *testing.T) {
topic := OpenTopic()
defer topic.Close()

// Write a messages.
if err := topic.WriteMessage(&messaging.Message{Index: 1, Data: make([]byte, 10)}); err != nil {
t.Fatal(err)
} else if err = topic.WriteMessage(&messaging.Message{Index: 2, Data: make([]byte, 10)}); err != nil {
t.Fatal(err)
} else if err = topic.WriteMessage(&messaging.Message{Index: 3, Data: make([]byte, 10)}); err != nil {
t.Fatal(err)
}

// Close topic and trim the file by a few bytes.
topic.Topic.Close()
if fi, err := os.Stat(filepath.Join(topic.Path(), "1")); err != nil {
t.Fatal(err)
} else if err = os.Truncate(filepath.Join(topic.Path(), "1"), fi.Size()-5); err != nil {
t.Fatal(err)
}

// Reopen topic.
if err := topic.Open(); err != nil {
t.Fatal(err)
}

// Rewrite the third message with a different data size.
if err := topic.WriteMessage(&messaging.Message{Index: 3, Data: make([]byte, 20)}); err != nil {
t.Fatal(err)
}

// Read all messages.
a := MustDecodeAllMessages(messaging.NewTopicReader(topic.Path(), 0, false))
if len(a) != 3 {
t.Fatalf("unexpected message count: %d", len(a))
} else if !reflect.DeepEqual(a[0], &messaging.Message{Index: 1, Data: make([]byte, 10)}) {
t.Fatalf("unexpected message(0): %#v", a[0])
} else if !reflect.DeepEqual(a[1], &messaging.Message{Index: 2, Data: make([]byte, 10)}) {
t.Fatalf("unexpected message(1): %#v", a[1])
} else if !reflect.DeepEqual(a[2], &messaging.Message{Index: 3, Data: make([]byte, 20)}) {
t.Fatalf("unexpected message(2): %#v", a[2])
}

}

// Topic is a wrapper for messaging.Topic that creates the topic in a temporary location.
type Topic struct {
*messaging.Topic
}

// NewTopic returns a new Topic instance.
func NewTopic() *Topic {
return &Topic{messaging.NewTopic(1, tempfile())}
}

// OpenTopic returns a new, open Topic instance.
func OpenTopic() *Topic {
t := NewTopic()
if err := t.Open(); err != nil {
panic("open: " + err.Error())
}
return t
}

// Close closes and deletes the temporary topic.
func (t *Topic) Close() {
defer os.RemoveAll(t.Path())
t.Topic.Close()
}

// Ensure a list of topics can be read from a directory.
func TestReadTopics(t *testing.T) {
path, _ := ioutil.TempDir("", "")
Expand Down Expand Up @@ -699,20 +770,10 @@ func (b *Broker) Log() *BrokerLog {
}

// MustReadAllTopic reads all messages on a topic. Panic on error.
func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) {
func (b *Broker) MustReadAllTopic(topicID uint64) []*messaging.Message {
r := b.TopicReader(topicID, 0, false)
defer r.Close()

dec := messaging.NewMessageDecoder(r)
for {
m := &messaging.Message{}
if err := dec.Decode(m); err == io.EOF {
return
} else if err != nil {
panic("read all topic: " + err.Error())
}
a = append(a, m)
}
return MustDecodeAllMessages(r)
}

// BrokerLog is a mockable object that implements Broker.Log.
Expand Down Expand Up @@ -771,6 +832,23 @@ func MustWriteFile(filename string, data []byte) {
}
}

// MustDecodeAllMessages reads all messages on a reader.
func MustDecodeAllMessages(r interface {
io.ReadCloser
io.Seeker
}) (a []*messaging.Message) {
dec := messaging.NewMessageDecoder(r)
for {
m := &messaging.Message{}
if err := dec.Decode(m); err == io.EOF {
return
} else if err != nil {
panic("read all: " + err.Error())
}
a = append(a, m)
}
}

// MustMarshalMessages marshals a slice of messages to bytes. Panic on error.
func MustMarshalMessages(a []*messaging.Message) []byte {
var buf bytes.Buffer
Expand Down
8 changes: 7 additions & 1 deletion messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
c.Logger.Printf("connected to broker: %s", req.URL.String())

// Continuously decode messages from request body in a separate goroutine.
dec := NewMessageDecoder(resp.Body)
dec := NewMessageDecoder(&nopSeeker{resp.Body})
for {
// Decode message from the stream.
m := &Message{}
Expand Down Expand Up @@ -702,3 +702,9 @@ func urlsEqual(a, b []url.URL) bool {
}
return true
}

type nopSeeker struct {
io.Reader
}

func (*nopSeeker) Seek(offset int64, whence int) (int64, error) { return 0, nil }
5 changes: 4 additions & 1 deletion messaging/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ type Handler struct {
URLs() []url.URL
IsLeader() bool
LeaderURL() url.URL
TopicReader(topicID, index uint64, streaming bool) io.ReadCloser
TopicReader(topicID, index uint64, streaming bool) interface {
io.ReadCloser
io.Seeker
}
Publish(m *Message) (uint64, error)
SetTopicMaxIndex(topicID, index uint64, u url.URL) error
}
Expand Down
Loading