Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop partition while marking recovered #75

Merged
merged 2 commits into from
Jan 8, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (p *partition) start() error {
return err
}
} else {
p.markRecovered()
p.markRecovered(false)
}

// if stopped, just return
Expand Down Expand Up @@ -241,7 +241,7 @@ func (p *partition) load(catchup bool) error {

if ev.Offset == ev.Hwm {
// nothing to recover
if err := p.markRecovered(); err != nil {
if err := p.markRecovered(false); err != nil {
return fmt.Errorf("error setting recovered: %v", err)
}
}
Expand All @@ -250,7 +250,7 @@ func (p *partition) load(catchup bool) error {
p.offset = ev.Hwm - 1
p.hwm = ev.Hwm

if err := p.markRecovered(); err != nil {
if err := p.markRecovered(catchup); err != nil {
return fmt.Errorf("error setting recovered: %v", err)
}

Expand All @@ -270,7 +270,7 @@ func (p *partition) load(catchup bool) error {
}
p.offset = ev.Offset
if p.offset >= p.hwm-1 {
if err := p.markRecovered(); err != nil {
if err := p.markRecovered(catchup); err != nil {
return fmt.Errorf("error setting recovered: %v", err)
}
}
Expand Down Expand Up @@ -326,13 +326,43 @@ func (p *partition) storeEvent(msg *kafka.Message) error {
}

// mark storage as recovered
func (p *partition) markRecovered() (err error) {
func (p *partition) markRecovered(catchup bool) (err error) {
p.recoveredOnce.Do(func() {
p.lastStats = newPartitionStats().init(p.stats, p.offset, p.hwm)
p.lastStats.Table.Status = PartitionPreparing

err = p.st.MarkRecovered()
if catchup {
// if catching up (views), stop reading from topic before marking
// partition as recovered to avoid blocking other partitions when
// p.ch gets full
p.proxy.Remove(p.topic)

// drain events channel -- we'll fetch them again later
done := make(chan bool)
go func() {
for {
select {
case <-p.ch:
case <-done:
return

}
}
}()
defer close(done)
}

// mark storage as recovered -- this may take long
if err = p.st.MarkRecovered(); err != nil {
return
}

if catchup {
// start reading from topic again if in catchup mode
p.proxy.Add(p.topic, p.hwm)
}

// update stats
p.stats.Table.Status = PartitionRunning
p.stats.Table.RecoveryTime = time.Now()

Expand Down