From 24d5fed0df2ee97c301ea7eb1b792a35ff792670 Mon Sep 17 00:00:00 2001 From: Takuo Kitame Date: Mon, 8 Jan 2024 23:06:50 +0900 Subject: [PATCH 1/3] feat(main): manage active outputters When an outputter is deactivated, the main loop will cease sending data to the outputter. Please note that this commit includes a few changes to the type of Outputter interface. --- main.go | 58 ++++++++++++++++++++++++++--------- options/options.go | 3 ++ output/base.go | 6 ++-- output/mqtt.go | 29 +++++++++++++----- output/outputter.go | 22 ++++++++++++-- output/stdout.go | 73 +++++++++++++++++++++++++++++---------------- 6 files changed, 141 insertions(+), 50 deletions(-) diff --git a/main.go b/main.go index cc6a72a..b9d76f3 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "regexp" "strconv" "strings" + "sync/atomic" "time" "github.com/alecthomas/kong" @@ -48,9 +49,15 @@ type Chissoku struct { // available outputters outputters map[string]output.Outputter + // active outputters + activeOutputters atomic.Value // reader channel rchan chan *types.Data + // deactivate outputter + dechan chan string + // cancel + cancel func() // serial device port serial.Port @@ -71,21 +78,26 @@ func (c *Chissoku) AfterApply(opts *options.Options) error { slog.SetDefault(slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{Level: level}))) c.rchan = make(chan *types.Data) + c.dechan = make(chan string) + ctx := output.ContextWithDeactivateChannel(context.Background(), c.dechan) + ctx = context.WithValue(ctx, options.ContextKeyOptions{}, opts) + ctx, c.cancel = context.WithCancel(ctx) - enabled := opts.Output[:0] - for _, v := range opts.Output { - if o, ok := c.outputters[v]; ok { - if err := o.Initialize(opts); err != nil { + // initialize and filter outputters + a := make(map[string]output.Outputter, len(opts.Output)) + for _, name := range opts.Output { + if o, ok := c.outputters[name]; ok { + if err := o.Initialize(ctx); err != nil { slog.Error("Initialize outputter", "outputter", o.Name(), "error", err) continue } - enabled = append(enabled, v) + a[name] = o } } - opts.Output = enabled - if len(opts.Output) == 0 { - return fmt.Errorf("no outputters are avaiable") + if len(a) == 0 { + return fmt.Errorf("no active outputters are avaiable") } + c.activeOutputters.Store(a) return nil } @@ -103,6 +115,7 @@ const ( ) func (c *Chissoku) cleanup() { + c.cancel() if c.port != nil { slog.Debug("Closing Serial port") // nolint: errcheck @@ -111,8 +124,9 @@ func (c *Chissoku) cleanup() { // nolint: errcheck c.port.Close() } - for _, v := range c.Options.Output { - c.outputters[v].Close() + + for _, o := range c.activeOutputters.Load().(map[string]output.Outputter) { + o.Close() } } @@ -175,18 +189,34 @@ func (c *Chissoku) readDevice() error { } if c.scanner.Err() != nil { slog.Error("Scanner read error", "error", c.scanner.Err()) + c.cleanup() return c.scanner.Err() } return nil } func (c *Chissoku) dispatch() { - for d := range c.rchan { - for _, v := range c.Options.Output { - c.outputters[v].Output(d) + defer c.cleanup() + for { + select { + case deactivate := <-c.dechan: + a := c.activeOutputters.Load().(map[string]output.Outputter) + delete(a, deactivate) + if len(a) == 0 { + slog.Debug("No outputers are alive") + return + } + c.activeOutputters.Store(a) + case data, more := <-c.rchan: + if !more { + slog.Debug("Reader channel has ben closed") + return + } + for _, o := range c.activeOutputters.Load().(map[string]output.Outputter) { + o.Output(data) + } } } - slog.Debug("Reader channel has ben closed") } // initialize and prepare the device diff --git a/options/options.go b/options/options.go index b1c3c5e..a69c9ae 100644 --- a/options/options.go +++ b/options/options.go @@ -20,3 +20,6 @@ type Options struct { // Debug Debug bool `short:"d" help:"print debug log"` } + +// ContextKeyOptions context value key for global Options +type ContextKeyOptions struct{} diff --git a/output/base.go b/output/base.go index 2bdf926..4a1130e 100644 --- a/output/base.go +++ b/output/base.go @@ -2,10 +2,10 @@ package output import ( + "context" "reflect" "strings" - "github.com/northeye/chissoku/options" "github.com/northeye/chissoku/types" ) @@ -16,6 +16,8 @@ type Base struct { // receiver channel r chan *types.Data + // cancel + cancel func() } // Close sample implementation @@ -33,7 +35,7 @@ func (b *Base) Output(d *types.Data) { } // Initialize initialize outputter -func (b *Base) Initialize(_ *options.Options) (_ error) { +func (b *Base) Initialize(ctx context.Context) (_ error) { b.r = make(chan *types.Data) return } diff --git a/output/mqtt.go b/output/mqtt.go index fd40b50..ca3c946 100644 --- a/output/mqtt.go +++ b/output/mqtt.go @@ -10,10 +10,10 @@ import ( "os" "reflect" "strings" + "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/northeye/chissoku/options" "github.com/northeye/chissoku/types" ) @@ -42,11 +42,16 @@ type Mqtt struct { // mqtt mqtt Client interface client mqtt.Client + + // close + close func() } // Initialize initialize outputter -func (m *Mqtt) Initialize(_ *options.Options) error { - m.Base.Initialize(nil) +func (m *Mqtt) Initialize(ctx context.Context) error { + m.Base.Initialize(ctx) + ctx, m.cancel = context.WithCancel(ctx) + o := mqtt.NewClientOptions() o.AddBroker(m.Address) if m.ClientID != "" { @@ -64,12 +69,24 @@ func (m *Mqtt) Initialize(_ *options.Options) error { return t.Error() } + m.close = sync.OnceFunc(func() { + deactivate(ctx, m) + close(m.r) + if m.client.IsConnected() { + m.client.Disconnect(1000) + } + }) + go func() { var cur *types.Data - m.publish(<-m.r) // publish first data + m.publish(<-m.r) // publish first data immediately tick := time.NewTicker(time.Second * time.Duration(m.Interval)) for { select { + case <-ctx.Done(): + cur = nil + m.Close() + tick.Stop() case <-tick.C: if cur == nil { continue @@ -98,9 +115,7 @@ func (m *Mqtt) Name() string { // Close outputter interface method // clsoe the MQTT connection func (m *Mqtt) Close() { - if m.client.IsConnected() { - m.client.Disconnect(1000) - } + m.close() } func (m *Mqtt) publish(d *types.Data) { diff --git a/output/outputter.go b/output/outputter.go index 4bd094b..9234082 100644 --- a/output/outputter.go +++ b/output/outputter.go @@ -2,7 +2,9 @@ package output import ( - "github.com/northeye/chissoku/options" + "context" + "log/slog" + "github.com/northeye/chissoku/types" ) @@ -14,7 +16,7 @@ type Outputter interface { // Intialize the outputter. // When it returns non-nil error the outputter will be disabled. - Initialize(*options.Options) error + Initialize(context.Context) error // Output the data. // This method must be non-blocking and light-weight. @@ -23,3 +25,19 @@ type Outputter interface { // Close cleanup the outputter. Close() } + +// contextKeyDeactivateOutputterChannel context value key for DeactivateOutputterChannel +type contextKeyDeactivateOutputterChannel struct{} + +// ContextWithDeactivateChannel new context with deactivate channel +func ContextWithDeactivateChannel(ctx context.Context, c chan string) context.Context { + return context.WithValue(ctx, contextKeyDeactivateOutputterChannel{}, c) +} + +// deactivate deactivate an outputter +func deactivate(ctx context.Context, o Outputter) { + if c, ok := ctx.Value(contextKeyDeactivateOutputterChannel{}).(chan string); ok { + slog.Debug("Deactivate", "outputter", o.Name()) + c <- o.Name() + } +} diff --git a/output/stdout.go b/output/stdout.go index c9a7dda..08893db 100644 --- a/output/stdout.go +++ b/output/stdout.go @@ -1,47 +1,39 @@ package output import ( + "context" "encoding/json" "fmt" "log/slog" "reflect" "strings" + "sync" "time" - "github.com/northeye/chissoku/options" "github.com/northeye/chissoku/types" ) // Stdout outputter for Stdout type Stdout struct { Base + + // close + close func() } // Initialize initialize outputter -func (s *Stdout) Initialize(_ *options.Options) error { - s.Base.Initialize(nil) - go func() { - var cur *types.Data - s.write(<-s.r) // ouput first data - tick := time.NewTicker(time.Second * time.Duration(s.Interval)) - for { - select { - case <-tick.C: - if cur == nil { - continue - } - s.write(cur) - cur = nil // dismiss - case d, more := <-s.r: - if !more { - slog.Debug("Output cannel has been closed", "outputter", s.Name()) - return - } - cur = d - } - } - }() - return nil +func (s *Stdout) Initialize(ctx context.Context) (_ error) { + s.Base.Initialize(ctx) + ctx, s.cancel = context.WithCancel(ctx) + + s.close = sync.OnceFunc(func() { + deactivate(ctx, s) + slog.Debug("Closing receiver channel", "outputter", s.Name()) + close(s.r) + }) + + go s.run(ctx) + return } // Name outputter interface method @@ -49,6 +41,37 @@ func (s *Stdout) Name() string { return strings.ToLower(reflect.TypeOf(s).Elem().Name()) } +// Close close channel +func (s *Stdout) Close() { + s.close() +} + +func (s *Stdout) run(ctx context.Context) { + var cur *types.Data + s.write(<-s.r) // output first data immediately + tick := time.NewTicker(time.Second * time.Duration(s.Interval)) + for { + select { + case <-ctx.Done(): + cur = nil + tick.Stop() + s.Close() + case <-tick.C: + if cur == nil { + continue + } + s.write(cur) + cur = nil // dismiss + case d, more := <-s.r: + if !more { + slog.Debug("Output channel has been closed", "outputter", s.Name()) + return + } + cur = d + } + } +} + func (s *Stdout) write(d *types.Data) { b, err := json.Marshal(d) if err != nil { From 990d80391e30b59073357b5d30ddf1ab0db10f55 Mon Sep 17 00:00:00 2001 From: Takuo Kitame Date: Mon, 8 Jan 2024 23:06:59 +0900 Subject: [PATCH 2/3] doc(output): update README.md Update document with new interface and add note about `context` --- output/README.md | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/output/README.md b/output/README.md index 0e51970..764b74e 100644 --- a/output/README.md +++ b/output/README.md @@ -33,8 +33,8 @@ func (k *Kinesis) Name() string { } // Initialize initialize outputter -func (k *Kinesis) Initialize(_ *options.Options) (_ error) { - k.Base.Initialize(nil) +func (k *Kinesis) Initialize(ctx context.Context) (_ error) { + k.Base.Initialize(ctx) // データレシーバの初期化ルーチンを書く @@ -52,6 +52,29 @@ func (k *Kinesis) Initialize(_ *options.Options) (_ error) { `Output()` メソッドは `Base` に最低限で実装されているのでチャンネルで受け取る形で十分であれば実装する必要はありませんが、 `Interval` オプションが不要な場合など `Base` を埋め込まない場合は実装する必要があります。 +### context + +context には 以下のValueが埋め込まれています。 + +| Key | Value | 説明 | +|-----|----|----| +|`options.ContextKeyOptions{}`|`*options.Options{}`|グローバルオプション構造体のポインタ| + +### outputter 側から自身を無効化する + +`Initialize(ctx)` で受け取った `ctx` と自身のポインタを引数として `output.deactivate()` に渡します + +```go +func (o *foo) Initialize(ctx context.Context) error { + // ... + go func () { + defer deactivate(ctx, o) + for { + // ... + } + }() +} +``` ### プログラム本体に追加する From 84a00747a8fd22f8344b75d0a5323dfe69ad1edd Mon Sep 17 00:00:00 2001 From: Takuo Kitame Date: Mon, 8 Jan 2024 23:07:05 +0900 Subject: [PATCH 3/3] feat(output): add `--stdout.iterations` option --- README.md | 1 + output/stdout.go | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6453be5..cbaa926 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ outputter のオプションは基本的に outputter の名前がプレフィ |オプション|意味| |----|----| |--stdout.interval=`INT`|データを出力する間隔(秒)(`default: 60`)| +|--stdout.iterations=`INT`|データを出力する回数(`default: 0(制限なし)`)| ### MQTT Outputter diff --git a/output/stdout.go b/output/stdout.go index 08893db..eefc855 100644 --- a/output/stdout.go +++ b/output/stdout.go @@ -8,6 +8,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "time" "github.com/northeye/chissoku/types" @@ -16,6 +17,10 @@ import ( // Stdout outputter for Stdout type Stdout struct { Base + Iterations int64 `help:"Inavtive on maximum iterations INT"` + + // iteration counter + count atomic.Int64 // close close func() @@ -78,5 +83,14 @@ func (s *Stdout) write(d *types.Data) { slog.Error("json.Marshal", "error", err, "outputter", s.Name()) return } - fmt.Println(string(b)) + if s.Iterations <= 0 { + fmt.Println(string(b)) + return + } + if i := s.count.Add(1); i <= s.Iterations { + fmt.Println(string(b)) + if i == s.Iterations { + s.cancel() + } + } }