Skip to content

Commit

Permalink
Merge pull request #44 from lovoo/recovered
Browse files Browse the repository at this point in the history
methods for checking recovery status
  • Loading branch information
db7 committed Oct 9, 2017
2 parents 418b608 + b00d27c commit 965e50d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
27 changes: 26 additions & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (g *Processor) createPartition(id int32) error {
}
}
for _, v := range g.views {
wait = append(wait, v.Ready)
wait = append(wait, v.Recovered)
}

g.partitions[id] = newPartition(
Expand Down Expand Up @@ -710,3 +710,28 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup

return nil
}

// Recovered returns true when the processor has caught up with events from kafka.
func (g *Processor) Recovered() bool {
for _, v := range g.views {
if !v.Recovered() {
return false
}
}

for _, part := range g.partitionViews {
for _, topicPart := range part {
if !topicPart.ready() {
return false
}
}
}

for _, p := range g.partitions {
if !p.ready() {
return false
}
}

return true
}
4 changes: 2 additions & 2 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ func (v *View) run() {
}
}

// Ready returns true when the view has caught up with events from kafka.
func (v *View) Ready() bool {
// Recovered returns true when the view has caught up with events from kafka.
func (v *View) Recovered() bool {
for _, p := range v.partitions {
if !p.ready() {
return false
Expand Down

0 comments on commit 965e50d

Please sign in to comment.