Skip to content

Commit

Permalink
reprovider: make reprovide cmd error if reprovider is active
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Aug 3, 2017
1 parent 4ae8b14 commit a07e3f9
Showing 1 changed file with 35 additions and 6 deletions.
41 changes: 35 additions & 6 deletions exchange/reprovide/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
var log = logging.Logger("reprovider")

type keyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type doneFunc func(error)

type Reprovider struct {
ctx context.Context
trigger chan context.CancelFunc
trigger chan doneFunc

// The routing system to provide values through
rsys routing.ContentRouting
Expand All @@ -29,7 +30,7 @@ type Reprovider struct {
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider {
return &Reprovider{
ctx: ctx,
trigger: make(chan context.CancelFunc),
trigger: make(chan doneFunc),

rsys: rsys,
keyProvider: keyProvider,
Expand All @@ -42,7 +43,7 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done context.CancelFunc
var done doneFunc
for {
select {
case <-rp.ctx.Done():
Expand All @@ -51,14 +52,19 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
case <-after:
}

unmute := rp.muteTrigger()

err := rp.Reprovide()
if err != nil {
log.Debug(err)
}

if done != nil {
done()
done(err)
}

unmute()

after = time.After(tick)
}
}
Expand Down Expand Up @@ -93,13 +99,36 @@ func (rp *Reprovider) Reprovide() error {
func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx)

var err error
df := func(e error) {
err = e
done()
}

select {
case <-rp.ctx.Done():
return context.Canceled
case <-ctx.Done():
return context.Canceled
case rp.trigger <- done:
case rp.trigger <- df:
<-progressCtx.Done()
return nil
return err
}
}

func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()

return cf
}

0 comments on commit a07e3f9

Please sign in to comment.