Skip to content

Commit

Permalink
Merge pull request eclipse-paho#98 from bkneis/feature/implement-auto…
Browse files Browse the repository at this point in the history
…paho-rpc

Feature/implement autopaho rpc (also tweaks paho/rpc and a few other tweaks).
  • Loading branch information
MattBrittan authored Aug 26, 2022
2 parents 4df2dcd + c650e72 commit d63b3b2
Show file tree
Hide file tree
Showing 8 changed files with 525 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bin/
autopaho/cmd/rpc/.env
39 changes: 35 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
.PHONY: test unittest

unittest:
go test -race -tags=unittest ./autopaho/ -v -count 1
go test -coverprofile /tmp/autopaho_coverage.out -race -tags=unittest ./autopaho/ -v -count 1

test: unittest
go test -race ./packets/ -v -count 1
go test -race ./paho/ -v -count 1

go test -coverprofile /tmp/packets_coverage.out -race ./packets/ -v -count 1
go test -coverprofile /tmp/paho_coverage.out -race ./paho/ -v -count 1

cover:
go tool cover -func=/tmp/autopaho_coverage.out
go tool cover -func=/tmp/packets_coverage.out
go tool cover -func=/tmp/paho_coverage.out

cover_browser:
go tool cover -html=/tmp/autopaho_coverage.out
go tool cover -html=/tmp/packets_coverage.out
go tool cover -html=/tmp/paho_coverage.out

.PHONY: download
download:
go mod download

build_chat:
go build -o ./bin/chat ./paho/cmd/chat

build_rpc:
go build -o ./bin/rpc ./paho/cmd/rpc

build_rpc_cm:
go build -o ./bin/rpc_auto ./autopaho/cmd/rpc

build_pub:
go build -o ./bin/stdinpub ./paho/cmd/stdinpub

build_sub:
go build -o ./bin/stdoutsub ./paho/cmd/stdoutsub

.PHONY: build
build: build_chat build_rpc build_pub build_sub build_rpc_cm
193 changes: 193 additions & 0 deletions autopaho/cmd/rpc/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package main

import (
"context"
"fmt"
"log"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
)

// Retrieve config from environmental variables

// Configuration will be pulled from the environment using the following keys
const (
envServerURL = "subdemo_serverURL" // server URL
envClientID = "subdemo_clientID" // client id to connect with
envTopic = "subdemo_topic" // topic to publish on
envQos = "subdemo_qos" // qos to utilise when publishing

envKeepAlive = "subdemo_keepAlive" // seconds between keepalive packets
envConnectRetryDelay = "subdemo_connectRetryDelay" // milliseconds to delay between connection attempts

envWriteToStdOut = "subdemo_writeToStdout" // if "true" then received packets will be written stdout
envWriteToDisk = "subdemo_writeToDisk" // if "true" then received packets will be written to file
envOutputFile = "subdemo_OutputFile" // name of file to use if above is true

envDebug = "subdemo_debug" // if "true" then the libraries will be instructed to print debug info
)

// config holds the configuration
type config struct {
serverURL *url.URL // MQTT server URL
clientID string // Client ID to use when connecting to server
topic string // Topic on which to publish messaged
qos byte // QOS to use when publishing

keepAlive uint16 // seconds between keepalive packets
connectRetryDelay time.Duration // Period between connection attempts

writeToStdOut bool // If true received messages will be written to stdout
writeToDisk bool // if true received messages will be written to below file
outputFileName string // filename to save messages to

debug bool // autopaho and paho debug output requested
}

// getConfig - Retrieves the configuration from the environment
func getConfig() (config, error) {
var cfg config
var err error

srvURL, err := stringFromEnv(envServerURL)
if err != nil {
return config{}, err
}
cfg.serverURL, err = url.Parse(srvURL)
if err != nil {
return config{}, fmt.Errorf("environmental variable %s must be a valid URL (%w)", envServerURL, err)
}

if cfg.clientID, err = stringFromEnv(envClientID); err != nil {
return config{}, err
}
if cfg.topic, err = stringFromEnv(envTopic); err != nil {
return config{}, err
}

iQos, err := intFromEnv(envQos)
if err != nil {
return config{}, err
}
cfg.qos = byte(iQos)

iKa, err := intFromEnv(envKeepAlive)
if err != nil {
return config{}, err
}
cfg.keepAlive = uint16(iKa)

if cfg.connectRetryDelay, err = milliSecondsFromEnv(envConnectRetryDelay); err != nil {
return config{}, err
}

if cfg.writeToStdOut, err = booleanFromEnv(envWriteToStdOut); err != nil {
return config{}, err
}
if cfg.writeToDisk, err = booleanFromEnv(envWriteToDisk); err != nil {
return config{}, err
}
if cfg.outputFileName, err = stringFromEnv(envOutputFile); cfg.writeToDisk && err != nil {
return config{}, err
}

if cfg.debug, err = booleanFromEnv(envDebug); err != nil {
return config{}, err
}

return cfg, nil
}

func getCmConfig(cfg config) autopaho.ClientConfig {
return autopaho.ClientConfig{
BrokerUrls: []*url.URL{cfg.serverURL},
KeepAlive: cfg.keepAlive,
ConnectRetryDelay: cfg.connectRetryDelay,
ConnectTimeout: time.Duration(5 * time.Second),
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
cfg.topic: {QoS: cfg.qos},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
return
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ClientConfig: paho.ClientConfig{
ClientID: cfg.clientID,
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
log.Printf("%v+", m)
}),
OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, d.Properties.ReasonString)
} else {
fmt.Printf("%s requested disconnect; reason code: %d\n", cfg.clientID, d.ReasonCode)
}
},
},
}
}

// stringFromEnv - Retrieves a string from the environment and ensures it is not blank (ort non-existent)
func stringFromEnv(key string) (string, error) {
s := os.Getenv(key)
if len(s) == 0 {
return "", fmt.Errorf("environmental variable %s must not be blank", key)
}
return s, nil
}

// intFromEnv - Retrieves an integer from the environment (must be present and valid)
func intFromEnv(key string) (int, error) {
s := os.Getenv(key)
if len(s) == 0 {
return 0, fmt.Errorf("environmental variable %s must not be blank", key)
}
i, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("environmental variable %s must be an integer", key)
}
return i, nil
}

// milliSecondsFromEnv - Retrieves milliseconds (as time.Duration) from the environment (must be present and valid)
func milliSecondsFromEnv(key string) (time.Duration, error) {
s := os.Getenv(key)
if len(s) == 0 {
return 0, fmt.Errorf("environmental variable %s must not be blank", key)
}
i, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("environmental variable %s must be an integer", key)
}
return time.Duration(i) * time.Millisecond, nil
}

// booleanFromEnv - Retrieves boolean from the environment (must be present and valid)
func booleanFromEnv(key string) (bool, error) {
s := os.Getenv(key)
if len(s) == 0 {
return false, fmt.Errorf("environmental variable %s must not be blank", key)
}
switch strings.ToUpper(s) {
case "TRUE", "T", "1":
return true, nil
case "FALSE", "F", "0":
return false, nil
default:
return false, fmt.Errorf("environmental variable %s be a valid boolean option (is %s)", key, s)
}
}
Loading

0 comments on commit d63b3b2

Please sign in to comment.