diff --git a/cmd/bridge.go b/cmd/bridge.go index 589f4f3..eb1ee86 100644 --- a/cmd/bridge.go +++ b/cmd/bridge.go @@ -4,20 +4,20 @@ import ( "context" "errors" "fmt" - "net/url" "strconv" + "sync" "time" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/svenwltr/devilctl/pkg/bll/ticker" "github.com/svenwltr/devilctl/pkg/dal/homie" "github.com/svenwltr/devilctl/pkg/dal/raumfeld" "golang.org/x/sync/errgroup" ) type HomieBridgeRunner struct { - broker string - locations []string + broker string } func (r *HomieBridgeRunner) Bind(cmd *cobra.Command) error { @@ -34,40 +34,8 @@ func (r *HomieBridgeRunner) Run(ctx context.Context) error { } defer homieBroker.MustClose() - speakers := map[string]raumfeld.Speaker{} - if len(r.locations) == 0 { - speakers, err = raumfeld.Discover(ctx) - if err != nil { - return fmt.Errorf("discover speakers: %w", err) - } - logrus.Infof("discovered %d devices", len(speakers)) - - for _, speaker := range speakers { - logrus.Warnf("consider pinning %#v with the --location flag", - speaker.TryMDNSLocation().String()) - } - - } else { - for _, locationString := range r.locations { - location, err := url.Parse(locationString) - if err != nil { - return fmt.Errorf("parse %#v: %w", location, err) - } - - speaker, err := raumfeld.New(ctx, location) - if err != nil { - return fmt.Errorf("connect to speaker %#v: %w", locationString, err) - } - - logrus.Infof("connected to %#v", locationString) - - speakers[speaker.ID()] = speaker - } - } - bridge := HomieBridge{ - Broker: homieBroker, - Speakers: speakers, + Broker: homieBroker, } return bridge.Run(ctx) @@ -79,41 +47,49 @@ type HomieBridge struct { } func (b *HomieBridge) Run(ctx context.Context) error { - b.Broker.ActionHandler = b.HandleBrokerAction - sub, err := raumfeld.NewSubsciptionServer(b) if err != nil { return fmt.Errorf("create subscription server: %w", err) } + var enableHandler sync.Once + group, ctx := errgroup.WithContext(ctx) group.Go(func() error { - for ctx.Err() == nil { - err := b.PublishHomieDefinitions(ctx) + return sub.Run(ctx) + }) + + group.Go(func() error { + for range ticker.Every(ctx, 5*time.Minute) { + speakers, err := raumfeld.Discover(ctx) + if err != nil { + return fmt.Errorf("discover speakers: %w", err) + } + logrus.Infof("discovered %d devices", len(speakers)) + b.Speakers = speakers + + enableHandler.Do(func() { + b.Broker.ActionHandler = b.HandleBrokerAction + }) + + err = b.PublishHomieDefinitions(ctx) if err != nil { return fmt.Errorf("publish homie definitions: %w", err) } - select { - case <-ctx.Done(): - case <-time.After(time.Minute): + for _, speaker := range speakers { + err := sub.Subscribe(speaker) + if err != nil { + return fmt.Errorf("subscribe: %w", err) + } } + } - return nil - }) - group.Go(func() error { - return sub.Run(ctx) + return nil }) - for _, speaker := range b.Speakers { - err := sub.Subscribe(speaker) - if err != nil { - return fmt.Errorf("subscribe: %w", err) - } - } - return group.Wait() } @@ -202,16 +178,16 @@ func (b *HomieBridge) PublishHomieDefinitions(ctx context.Context) error { } func (b *HomieBridge) OnVolumeChange(id string, volume int, channel string) { - logrus.Infof("volume changed on speaker to %#v", volume) + logrus.Infof("volume changed on speaker %#v to %#v", id, volume) b.Broker.PublishValue(id, "volume", float64(volume)/100.) } func (b *HomieBridge) OnMuteChange(id string, muted bool, channel string) { - logrus.Infof("mute changed on speaker to %#v", muted) + logrus.Infof("mute changed on speaker %#v to %#v", id, muted) b.Broker.PublishValue(id, "mute", muted) } func (b *HomieBridge) OnPowerStateChange(id, state string) { - logrus.Infof("power state changed on speaker to %#v", state) + logrus.Infof("power state changed on speaker %#v to %#v", id, state) b.Broker.PublishValue(id, "onoff", state != "MANUAL_STANDBY") } diff --git a/pkg/bll/ticker/ticker.go b/pkg/bll/ticker/ticker.go new file mode 100644 index 0000000..b174897 --- /dev/null +++ b/pkg/bll/ticker/ticker.go @@ -0,0 +1,26 @@ +package ticker + +import ( + "context" + "time" +) + +func Every(ctx context.Context, d time.Duration) <-chan time.Time { + c := make(chan time.Time) + + go func() { + defer close(c) + for { + c <- time.Now() + + select { + case <-ctx.Done(): + return + case <-time.After(d): + } + } + }() + + return c + +}