Skip to content

Commit

Permalink
Merge pull request #1867 from influxdb/fix-1866
Browse files Browse the repository at this point in the history
fix #1866: race accessing topic replicas map
  • Loading branch information
dgnorton committed Mar 6, 2015
2 parents 20b964b + ac8a062 commit fca0a95
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v0.9.0-rc9 [2015-03-??]

### Bugfixes

- [#1867](https://github.com/influxdb/influxdb/pull/1867): Fix race accessing topic replicas map

## v0.9.0-rc8 [2015-03-05]

### Bugfixes
Expand Down
27 changes: 23 additions & 4 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,9 +762,24 @@ type topic struct {

file *os.File // on-disk representation

mu sync.RWMutex
replicas map[uint64]*Replica // replicas subscribed to topic
}

// addReplica adds a replica to the topic.
func (t *topic) addReplica(r *Replica) {
t.mu.Lock()
defer t.mu.Unlock()
t.replicas[r.id] = r
}

// replica returns the replica with the specified ID.
func (t *topic) replica(id uint64) *Replica {
t.mu.RLock()
defer t.mu.RUnlock()
return t.replicas[id]
}

// open opens a topic for writing.
func (t *topic) open() error {
assert(t.file == nil, "topic already open: %d", t.id)
Expand Down Expand Up @@ -890,9 +905,13 @@ func (t *topic) encode(m *Message) error {
t.index = m.Index

// Write message out to all replicas.
for _, r := range t.replicas {
_, _ = r.Write(b)
}
func() {
t.mu.Lock()
defer t.mu.Unlock()
for _, r := range t.replicas {
_, _ = r.Write(b)
}
}()

return nil
}
Expand Down Expand Up @@ -1010,7 +1029,7 @@ func (r *Replica) WriteTo(w io.Writer) (int64, error) {
}

// Attach replica to topic to tail new messages.
t.replicas[r.id] = r
t.addReplica(r)
}

// Wait for writer to close and then return.
Expand Down

0 comments on commit fca0a95

Please sign in to comment.