From 740fe23f85869c816e1dc86405d5b2d0aa61bf04 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Mon, 8 Jan 2018 14:54:49 +0100 Subject: [PATCH 1/2] stop partition while marking recovered --- partition.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/partition.go b/partition.go index c5795855..7d93a481 100644 --- a/partition.go +++ b/partition.go @@ -331,8 +331,33 @@ func (p *partition) markRecovered() (err error) { p.lastStats = newPartitionStats().init(p.stats, p.offset, p.hwm) p.lastStats.Table.Status = PartitionPreparing - err = p.st.MarkRecovered() + // before marking partition as recovered, stop reading from topic + p.proxy.Remove(p.topic) + + // drain events channel + done := make(chan bool) + go func() { + for { + select { + case <-p.ch: + case <-done: + return + } + } + }() + defer close(done) + + // while storage is being marked as recovered, we drop all messages + // from topic since this may take long + if err = p.st.MarkRecovered(); err != nil { + return + } + + // start reading from topic again + p.proxy.Add(p.topic, p.hwm) + + // update stats p.stats.Table.Status = PartitionRunning p.stats.Table.RecoveryTime = time.Now() From d6bf356e23f5cc9aba71907009dc29531d371672 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Mon, 8 Jan 2018 15:25:08 +0100 Subject: [PATCH 2/2] use catchup flag in markRecovered --- partition.go | 51 ++++++++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/partition.go b/partition.go index 7d93a481..6623646b 100644 --- a/partition.go +++ b/partition.go @@ -90,7 +90,7 @@ func (p *partition) start() error { return err } } else { - p.markRecovered() + p.markRecovered(false) } // if stopped, just return @@ -241,7 +241,7 @@ func (p *partition) load(catchup bool) error { if ev.Offset == ev.Hwm { // nothing to recover - if err := p.markRecovered(); err != nil { + if err := p.markRecovered(false); err != nil { return fmt.Errorf("error setting recovered: %v", err) } } @@ -250,7 +250,7 @@ func (p *partition) load(catchup bool) error { p.offset = ev.Hwm - 1 p.hwm = ev.Hwm - if err := p.markRecovered(); err != nil { + if err := p.markRecovered(catchup); err != nil { return fmt.Errorf("error setting recovered: %v", err) } @@ -270,7 +270,7 @@ func (p *partition) load(catchup bool) error { } p.offset = ev.Offset if p.offset >= p.hwm-1 { - if err := p.markRecovered(); err != nil { + if err := p.markRecovered(catchup); err != nil { return fmt.Errorf("error setting recovered: %v", err) } } @@ -326,36 +326,41 @@ func (p *partition) storeEvent(msg *kafka.Message) error { } // mark storage as recovered -func (p *partition) markRecovered() (err error) { +func (p *partition) markRecovered(catchup bool) (err error) { p.recoveredOnce.Do(func() { p.lastStats = newPartitionStats().init(p.stats, p.offset, p.hwm) p.lastStats.Table.Status = PartitionPreparing - // before marking partition as recovered, stop reading from topic - p.proxy.Remove(p.topic) - - // drain events channel - done := make(chan bool) - go func() { - for { - select { - case <-p.ch: - case <-done: - return + if catchup { + // if catching up (views), stop reading from topic before marking + // partition as recovered to avoid blocking other partitions when + // p.ch gets full + p.proxy.Remove(p.topic) + + // drain events channel -- we'll fetch them again later + done := make(chan bool) + go func() { + for { + select { + case <-p.ch: + case <-done: + return + } } - } - }() - defer close(done) + }() + defer close(done) + } - // while storage is being marked as recovered, we drop all messages - // from topic since this may take long + // mark storage as recovered -- this may take long if err = p.st.MarkRecovered(); err != nil { return } - // start reading from topic again - p.proxy.Add(p.topic, p.hwm) + if catchup { + // start reading from topic again if in catchup mode + p.proxy.Add(p.topic, p.hwm) + } // update stats p.stats.Table.Status = PartitionRunning