Skip to content

Commit

Permalink
refactor(output): logic for outputting the first data
Browse files Browse the repository at this point in the history
Moved channel reading into `select` block.
  • Loading branch information
takuo committed Jan 9, 2024
1 parent 4e3718f commit 2c70c46
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
18 changes: 12 additions & 6 deletions output/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,33 @@ func (m *Mqtt) Initialize(ctx context.Context) error {

go func() {
var cur *types.Data
m.publish(<-m.r) // publish first data immediately
tick := time.NewTicker(time.Second * time.Duration(m.Interval))
var tick *time.Ticker
var c <-chan time.Time
for {
select {
case <-ctx.Done():
cur = nil
m.Close()
tick.Stop()
case <-tick.C:
m.Close()
case <-c:
if cur == nil {
continue
}
m.publish(cur)
cur = nil // dismiss
cur = nil
case d, more := <-m.r:
if !more {
slog.Debug("reader channel has been closed", "outputter", m.Name())
tick.Stop()
return
} else if tick == nil {
// publish first data
m.publish(<-m.r)
tick = time.NewTicker(time.Second * time.Duration(m.Interval))
c = tick.C
} else {
cur = d
}
cur = d
}
}
}()
Expand Down
17 changes: 12 additions & 5 deletions output/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,33 @@ func (s *Stdout) Close() {

func (s *Stdout) run(ctx context.Context) {
var cur *types.Data
s.write(<-s.r) // output first data immediately
tick := time.NewTicker(time.Second * time.Duration(s.Interval))
var tick *time.Ticker
var c <-chan time.Time
for {
select {
case <-ctx.Done():
cur = nil
tick.Stop()
s.Close()
case <-tick.C:
case <-c:
if cur == nil {
continue
}
s.write(cur)
cur = nil // dismiss
cur = nil
case d, more := <-s.r:
if !more {
slog.Debug("Output channel has been closed", "outputter", s.Name())
tick.Stop()
return
} else if tick == nil {
// output first data
s.write(d)
tick = time.NewTicker(time.Second * time.Duration(s.Interval))
c = tick.C
} else {
cur = d
}
cur = d
}
}
}
Expand Down

0 comments on commit 2c70c46

Please sign in to comment.