Skip to content

Commit

Permalink
periodically discover new speakers (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
svenwltr authored Aug 14, 2023
1 parent 496b225 commit d394414
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 57 deletions.
90 changes: 33 additions & 57 deletions cmd/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import (
"context"
"errors"
"fmt"
"net/url"
"strconv"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/svenwltr/devilctl/pkg/bll/ticker"
"github.com/svenwltr/devilctl/pkg/dal/homie"
"github.com/svenwltr/devilctl/pkg/dal/raumfeld"
"golang.org/x/sync/errgroup"
)

type HomieBridgeRunner struct {
broker string
locations []string
broker string
}

func (r *HomieBridgeRunner) Bind(cmd *cobra.Command) error {
Expand All @@ -34,40 +34,8 @@ func (r *HomieBridgeRunner) Run(ctx context.Context) error {
}
defer homieBroker.MustClose()

speakers := map[string]raumfeld.Speaker{}
if len(r.locations) == 0 {
speakers, err = raumfeld.Discover(ctx)
if err != nil {
return fmt.Errorf("discover speakers: %w", err)
}
logrus.Infof("discovered %d devices", len(speakers))

for _, speaker := range speakers {
logrus.Warnf("consider pinning %#v with the --location flag",
speaker.TryMDNSLocation().String())
}

} else {
for _, locationString := range r.locations {
location, err := url.Parse(locationString)
if err != nil {
return fmt.Errorf("parse %#v: %w", location, err)
}

speaker, err := raumfeld.New(ctx, location)
if err != nil {
return fmt.Errorf("connect to speaker %#v: %w", locationString, err)
}

logrus.Infof("connected to %#v", locationString)

speakers[speaker.ID()] = speaker
}
}

bridge := HomieBridge{
Broker: homieBroker,
Speakers: speakers,
Broker: homieBroker,
}

return bridge.Run(ctx)
Expand All @@ -79,41 +47,49 @@ type HomieBridge struct {
}

func (b *HomieBridge) Run(ctx context.Context) error {
b.Broker.ActionHandler = b.HandleBrokerAction

sub, err := raumfeld.NewSubsciptionServer(b)
if err != nil {
return fmt.Errorf("create subscription server: %w", err)
}

var enableHandler sync.Once

group, ctx := errgroup.WithContext(ctx)

group.Go(func() error {
for ctx.Err() == nil {
err := b.PublishHomieDefinitions(ctx)
return sub.Run(ctx)
})

group.Go(func() error {
for range ticker.Every(ctx, 5*time.Minute) {
speakers, err := raumfeld.Discover(ctx)
if err != nil {
return fmt.Errorf("discover speakers: %w", err)
}
logrus.Infof("discovered %d devices", len(speakers))
b.Speakers = speakers

enableHandler.Do(func() {
b.Broker.ActionHandler = b.HandleBrokerAction
})

err = b.PublishHomieDefinitions(ctx)
if err != nil {
return fmt.Errorf("publish homie definitions: %w", err)
}

select {
case <-ctx.Done():
case <-time.After(time.Minute):
for _, speaker := range speakers {
err := sub.Subscribe(speaker)
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
}

}
return nil
})

group.Go(func() error {
return sub.Run(ctx)
return nil
})

for _, speaker := range b.Speakers {
err := sub.Subscribe(speaker)
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
}

return group.Wait()
}

Expand Down Expand Up @@ -202,16 +178,16 @@ func (b *HomieBridge) PublishHomieDefinitions(ctx context.Context) error {
}

func (b *HomieBridge) OnVolumeChange(id string, volume int, channel string) {
logrus.Infof("volume changed on speaker to %#v", volume)
logrus.Infof("volume changed on speaker %#v to %#v", id, volume)
b.Broker.PublishValue(id, "volume", float64(volume)/100.)
}

func (b *HomieBridge) OnMuteChange(id string, muted bool, channel string) {
logrus.Infof("mute changed on speaker to %#v", muted)
logrus.Infof("mute changed on speaker %#v to %#v", id, muted)
b.Broker.PublishValue(id, "mute", muted)
}

func (b *HomieBridge) OnPowerStateChange(id, state string) {
logrus.Infof("power state changed on speaker to %#v", state)
logrus.Infof("power state changed on speaker %#v to %#v", id, state)
b.Broker.PublishValue(id, "onoff", state != "MANUAL_STANDBY")
}
26 changes: 26 additions & 0 deletions pkg/bll/ticker/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ticker

import (
"context"
"time"
)

func Every(ctx context.Context, d time.Duration) <-chan time.Time {
c := make(chan time.Time)

go func() {
defer close(c)
for {
c <- time.Now()

select {
case <-ctx.Done():
return
case <-time.After(d):
}
}
}()

return c

}

0 comments on commit d394414

Please sign in to comment.