diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index be70c29..0000000 --- a/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM gradle:7.6.1-jdk17 as builder - -COPY --chown=gradle:gradle . /home/gradle/src -WORKDIR /home/gradle/src -RUN gradle --no-daemon installDist - -FROM openjdk:17-jdk-slim -COPY --from=builder /home/gradle/src/build/install/esphome2influxdb /app -COPY src/main/resources/esphome2influxdb.yaml /app/conf/esphome2influxdb.yaml -RUN mkdir /app/logs - -VOLUME /app/conf /app/logs - -WORKDIR /app -CMD /app/bin/esphome2influxdb conf/esphome2influxdb.yaml diff --git a/build.gradle.kts b/build.gradle.kts index 15b90de..aadf5e5 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,11 +1,12 @@ plugins { java application - id("net.ltgt.errorprone") version "3.0.1" jacoco - id("org.sonarqube") version "4.0.0.2929" - id("com.google.cloud.tools.jib") version "3.3.1" - id("com.gorylenko.gradle-git-properties") version "2.4.1" + alias(libs.plugins.errorprone) + alias(libs.plugins.sonarqube) + alias(libs.plugins.git.properties) + alias(libs.plugins.gradle.versions) + alias(libs.plugins.jib) } repositories { @@ -14,24 +15,24 @@ repositories { } group = "com.homeclimatecontrol.esphome2influxdb" -version = "1.0.0" +version = "1.0.1" dependencies { - implementation("org.apache.logging.log4j:log4j-api:2.20.0") - implementation("org.apache.logging.log4j:log4j-core:2.20.0") - implementation("org.yaml:snakeyaml:1.33") + implementation(libs.log4j.api) + implementation(libs.log4j.core) + implementation(libs.jackson.databind) + implementation(libs.jackson.dataformat.yaml) - // https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 - implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5") + implementation(libs.hivemq.mqtt.client) - implementation("org.influxdb:influxdb-java:2.23") + implementation(libs.influxdb) - testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.2") - testImplementation("org.junit.jupiter:junit-jupiter-params:5.9.3") - testImplementation("org.mockito:mockito-core:5.2.0") - testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.2") - testImplementation("org.assertj:assertj-core:3.24.2") - errorprone("com.google.errorprone:error_prone_core:2.18.0") + testImplementation(libs.junit5.api) + testImplementation(libs.junit5.params) + testImplementation(libs.mockito) + testRuntimeOnly(libs.junit5.engine) + testImplementation(libs.assertj.core) + errorprone(libs.errorprone) } application { diff --git a/docs/build/docker.md b/docs/build/docker.md new file mode 100644 index 0000000..b6f5b69 --- /dev/null +++ b/docs/build/docker.md @@ -0,0 +1,84 @@ +esphome2influxdb: Build for (not with) Docker +== + +## Pull + +Run `docker pull climategadgets/esphome2influxdb` and then proceed to [configuration section](#configure-and-run) below - don't forget to adjust the image name to `climategadgets/esphome2influxdb`. + +Just in case, the image is hosted at https://hub.docker.com/r/climategadgets/esphome2influxdb. + +## Build + +Simply run `./gradlew jibDockerBuild`, this will build the image into the local Docker container (might need `sudo` if Docker is only configured to run as `root`). +See [Jib Gradle Plugin](https://github.com/GoogleContainerTools/jib/tree/master/jib-gradle-plugin) docs for details. +Mind the image name, it will be different for different release trains. + +### Build on Raspberry Pi + +The above will work seamlessly only on `amd64` architectures. If you need to build the image on Raspberry Pi, apply the following diff: + +``` +diff --git a/build.gradle.kts b/build.gradle.kts +index e38dbe8..1eaebe3 100644 +--- a/build.gradle.kts ++++ b/build.gradle.kts +@@ -74,6 +74,16 @@ sonarqube { + + jib { + ++ from { ++ platforms { ++ platform { ++ architecture = "arm" ++ os = "linux" ++ } ++ } ++ } ++ + to { + image ="climategadgets/esphome2influxdb" + } +``` + +> NOTE: Ideally, this should be seamless, but it is not quite trivial and there's no demand. If it bothers you, please [submit a ticket](https://github.com/home-climate-control/esphome2influxdb/issues). + +## Configure and Run + +By default, the `esphome2influxdb` Docker image will be created with configuration found in `src/main/resources/esphome2influxdb.yaml` (specifically, source host of `mqtt-esphome` and target host of `influxdb-esphome`). + +To make it do something meaningful, either create DNS records for these hosts and pass them to Docker daemon when you create the container, and/or provide your own configuration and map Docker volumes +(this is the preferred option since you will have to provide your tags at some point anyway, see [Tagging](../tagging.md) for details). + +### Option 1 (default configuration, use this to make sure it works) + +``` +docker run \ + --name esphome2influxdb-defaultconf \ + --rm -it \ + -e TZ=${your-time-zone} \ + --dns ${your-dns-server-host} \ + --dns-search ${your-search-domain} \ + esphome2influxdb + +``` + +With this configuration, the logs will be only stored in the container - execute `docker exec -it esphome2influxdb-defaultconf /bin/bash` to connect to it and see what's going on. + +### Option 2 (custom configuration, logs exposed - this is what you'll have to do in the long run) + +``` +docker run \ + --name esphome2influxdb \ + --restart=unless-stopped \ + -e TZ=${your-time-zone} \ + --dns ${your-dns-server-host} \ + --dns-search ${your-search-domain} \ + -v ${your-esphome2influxdb-config-directory}:/app/conf \ + -v ${your-esphome2influxdb-log-directory}:/app/logs \ + esphome2influxdb +``` + +Config directory must have a `esphome2influxdb.yaml` file containing the configuration (see [Minimal Configuration](../minimal-configuration.md) for details). + +--- +[^^^ Index](../index.md) diff --git a/docs/build/gradle.md b/docs/build/gradle.md new file mode 100644 index 0000000..bde6cdf --- /dev/null +++ b/docs/build/gradle.md @@ -0,0 +1,10 @@ +esphome2influxdb: Build with Gradle +== + +Run `./gradlew installDist` (Java 17 or higher). + +This will generate a relocatable directory tree in `./build/install/esphome2influxdb`, +including a runnable Shell script at `./build/install/esphome2influxdb/bin/esphome2influxdb`. + +--- +[^^^ Index](../index.md) diff --git a/docs/index.md b/docs/index.md index 92d8f9c..50784f6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,7 +13,11 @@ esphome2influxdb: Docs ## Current Limitations * Currently works only with [ESPHome Sensor](https://esphome.io/components/sensor/index.html) components, more to come. -* The project's just been open sourced, documentation is coming, keep checking this wiki for updates. +* The project's just been open sourced, documentation is coming, keep checking this document tree for updates. + +## Release Notes + +[Here](./release-notes.md). ## Further Down the Rabbit Hole diff --git a/docs/release-notes.md b/docs/release-notes.md new file mode 100644 index 0000000..242a59f --- /dev/null +++ b/docs/release-notes.md @@ -0,0 +1,7 @@ +esphome2influxdb: Release Notes +== + +## v1.0.1 (2024-08-22) + +* Eclipse Paho MQTT has been replaced with [HiveMQ](https://github.com/hivemq/hivemq-mqtt-client) ([#17](https://github.com/home-climate-control/esphome2influxdb/issues/17)) +* Quality of Life improvements diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml new file mode 100644 index 0000000..b5c3948 --- /dev/null +++ b/gradle/libs.versions.toml @@ -0,0 +1,34 @@ +[versions] + +assertj = "3.26.3" +errorprone = "2.30.0" +influxdb = "2.24" +jackson = "2.17.2" +jib = "3.4.3" +junit5 = "5.11.0" +log4j = "2.22.0" +mockito = "5.12.0" +hivemq-mqtt = "1.3.3" +sonarqube = "4.4.1.3373" + +[libraries] + +assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj" } +errorprone = { module = "com.google.errorprone:error_prone_core", version.ref = "errorprone" } +hivemq-mqtt-client = { module = "com.hivemq:hivemq-mqtt-client", version.ref = "hivemq-mqtt" } +influxdb = { module = "org.influxdb:influxdb-java", version.ref = "influxdb" } +jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" } +jackson-dataformat-yaml = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml", version.ref = "jackson" } +junit5-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit5" } +junit5-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit5" } +junit5-params = { module = "org.junit.jupiter:junit-jupiter-params", version.ref = "junit5" } +log4j-api = { module = "org.apache.logging.log4j:log4j-api", version.ref = "log4j" } +log4j-core = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" } +mockito = { module = "org.mockito:mockito-core", version.ref = "mockito" } + +[plugins] +errorprone = { id = "net.ltgt.errorprone", version = "4.0.1" } +git-properties = { id = "com.gorylenko.gradle-git-properties", version = "2.4.2" } +gradle-versions = { id = "com.github.ben-manes.versions", version = "0.51.0"} +jib = { id = "com.google.cloud.tools.jib", version.ref = "jib" } +sonarqube = { id = "org.sonarqube", version.ref = "sonarqube" } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 943f0cb..d64cd49 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 37aef8d..9355b41 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 65dcd68..1aa94a4 100755 --- a/gradlew +++ b/gradlew @@ -83,10 +83,8 @@ done # This is normally unused # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,10 +131,13 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. @@ -144,7 +145,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -152,7 +153,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -197,11 +198,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Configuration.java b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Configuration.java index 5e34505..48ce938 100644 --- a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Configuration.java +++ b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Configuration.java @@ -1,15 +1,15 @@ package com.homeclimatecontrol.esphome2influxdb; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.yaml.snakeyaml.Yaml; + import java.io.StringReader; import java.io.StringWriter; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.yaml.snakeyaml.Yaml; - /** * Main configuration class. */ @@ -22,7 +22,7 @@ public class Configuration implements Verifiable { public Set targets = new LinkedHashSet<>(); public Set devices = new LinkedHashSet<>(); - private Set parsed = new LinkedHashSet<>(); + private final Set parsed = new LinkedHashSet<>(); /** * Verify the currently loaded configuration. @@ -106,7 +106,7 @@ private void parseDevices() { logger.trace("{}: {}", o.getClass().getName(), o); @SuppressWarnings({ "unchecked" }) - Map m = (Map) o; + var m = (Map) o; String type = m.get("type"); if (type == null) { @@ -146,7 +146,6 @@ private void parseDevices() { } public Set getDevices() { - return parsed; } diff --git a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Device.java b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Device.java index 50baaaf..047252e 100644 --- a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Device.java +++ b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Device.java @@ -1,12 +1,12 @@ package com.homeclimatecontrol.esphome2influxdb; -import java.util.Map; -import java.util.TreeMap; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; +import java.util.Map; +import java.util.TreeMap; + public abstract class Device implements Verifiable { protected final Logger logger = LogManager.getLogger(); @@ -75,7 +75,7 @@ public void verify() { // ...but if the topic doesn't contain the source, and the source is not specified, we blow up anyway, // ...and only set the name to default (being same as source) if it is not explicitly provided. - String[] result = resolve(topicPrefix); + var result = resolve(topicPrefix); // Topic prefix may mutate topicPrefix = result[0]; @@ -182,8 +182,8 @@ private String resolveSource(String topic) { throw new IllegalArgumentException("Can't accept null topic here"); } - String[] tokens = topic.split("/"); - String result = tokens[tokens.length - 1]; + var tokens = topic.split("/"); + var result = tokens[tokens.length - 1]; var deviceOffset = 2; if ("".equals(result)) { @@ -193,7 +193,7 @@ private String resolveSource(String topic) { } // Make sure that we're working with the right device - String type = tokens[tokens.length - deviceOffset]; + var type = tokens[tokens.length - deviceOffset]; if (!getType().literal.equals(type)) { diff --git a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Endpoint.java b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Endpoint.java index 9b75cc9..425a814 100644 --- a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Endpoint.java +++ b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Endpoint.java @@ -1,11 +1,10 @@ package com.homeclimatecontrol.esphome2influxdb; -import java.util.ArrayList; -import java.util.List; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; + /** * TCP endpoint. */ @@ -31,9 +30,9 @@ public void setPort(int port) { @Override public void verify() { - List messages = new ArrayList<>(); + var messages = new ArrayList(); - if (host == null || "".equals(host)) { + if (host == null || host.isEmpty()) { messages.add("host can't be null or empty"); } diff --git a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Gateway.java b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Gateway.java index 5f02ce9..71cd0cd 100644 --- a/src/main/java/com/homeclimatecontrol/esphome2influxdb/Gateway.java +++ b/src/main/java/com/homeclimatecontrol/esphome2influxdb/Gateway.java @@ -1,22 +1,28 @@ package com.homeclimatecontrol.esphome2influxdb; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.homeclimatecontrol.esphome2influxdb.runtime.GitProperties; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; +import org.yaml.snakeyaml.scanner.ScannerException; + import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.LinkedHashSet; -import java.util.Set; import java.util.concurrent.CountDownLatch; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.ThreadContext; -import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.scanner.ScannerException; - public class Gateway { private final Logger logger = LogManager.getLogger(); + private final ObjectMapper objectMapper; + + private Gateway() { + objectMapper = new ObjectMapper(new YAMLFactory()); + } /** * Run the application. @@ -50,6 +56,10 @@ private void run(String[] args) { } else { + // It would be nice to tell them which version is running BEFORE trying to parse the configuration, in case versions are incompatible + + reportGitProperties(); + cf = parseConfiguration(args[0]); } @@ -62,6 +72,16 @@ private void run(String[] args) { } } + private void reportGitProperties() throws IOException { + + var p = GitProperties.get(); + + logger.debug("git.branch={}", p.get("git.branch")); + logger.debug("git.commit.id={}", p.get("git.commit.id")); + logger.debug("git.commit.id.abbrev={}", p.get("git.commit.id.abbrev")); + logger.debug("git.commit.id.describe={}", p.get("git.commit.id.describe")); + logger.debug("git.build.version={}", p.get("git.build.version")); + } private Configuration parseConfiguration(String source) { ThreadContext.push("parseConfiguration"); @@ -69,9 +89,7 @@ private Configuration parseConfiguration(String source) { logger.info("Reading configuration from {}", source); - var yaml = new Yaml(); - - Configuration cf = yaml.loadAs(getStream(source), Configuration.class); + Configuration cf = objectMapper.readValue(getStream(source), Configuration.class); if (cf == null) { throw new IllegalArgumentException("No usable configuration at " + source + "?"); @@ -79,7 +97,9 @@ private Configuration parseConfiguration(String source) { cf.verify(); - logger.debug("configuration: {}", cf); + var yaml = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(cf); + + logger.debug("configuration:\n{}", yaml); if (!cf.needToStart()) { logger.info("Terminating"); @@ -153,14 +173,14 @@ private void execute(Configuration cf) { var stopGate = new CountDownLatch(1); var stoppedGate = new CountDownLatch(cf.sources.size() + cf.targets.size()); - Set readers = new LinkedHashSet<>(); - Set writers = new LinkedHashSet<>(); + var readers = new LinkedHashSet(); + var writers = new LinkedHashSet(); - for (MqttEndpoint e : cf.sources) { + for (var e : cf.sources) { readers.add(new MqttReader(e, cf.getDevices(), cf.autodiscover, stopGate, stoppedGate)); } - for (InfluxDbEndpoint e : cf.targets) { + for (var e : cf.targets) { writers.add(new InfluxDbWriter(e, readers, stoppedGate)); } @@ -169,13 +189,13 @@ private void execute(Configuration cf) { var roffset = 0; var woffset = 0; - for (Runnable r : readers) { + for (var r : readers) { new Thread(r, "thread-reader" + roffset++).start(); } logger.info("Started {} reader[s]", readers.size()); - for (Runnable r : writers) { + for (var r : writers) { new Thread(r, "thread-writer" + woffset++).start(); } diff --git a/src/main/java/com/homeclimatecontrol/esphome2influxdb/InfluxDbWriter.java b/src/main/java/com/homeclimatecontrol/esphome2influxdb/InfluxDbWriter.java index a2fea32..7c4113d 100644 --- a/src/main/java/com/homeclimatecontrol/esphome2influxdb/InfluxDbWriter.java +++ b/src/main/java/com/homeclimatecontrol/esphome2influxdb/InfluxDbWriter.java @@ -1,5 +1,11 @@ package com.homeclimatecontrol.esphome2influxdb; +import org.apache.logging.log4j.ThreadContext; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; + import java.math.BigDecimal; import java.time.Clock; import java.util.Queue; @@ -8,12 +14,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.logging.log4j.ThreadContext; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; -import org.influxdb.dto.Point; -import org.influxdb.dto.Query; - public class InfluxDbWriter extends Worker { private final Clock clock = Clock.systemUTC(); @@ -101,7 +101,7 @@ public void consume(long timestamp, Device device, String payload) { try { - logger.debug("payload: {}", payload); + logger.trace("payload: {}", payload); var s = new Sample(timestamp, device, payload); @@ -156,7 +156,7 @@ synchronized void flush(InfluxDB db, Queue queue) { // Known problem if ("nan".equalsIgnoreCase(sample.payload)) { - logger.warn("NaN payload, ignored: {}", sample); + logger.debug("NaN payload, ignored: {}", sample); queue.remove(); continue; } diff --git a/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttEndpoint.java b/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttEndpoint.java index 1940f37..74971ef 100644 --- a/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttEndpoint.java +++ b/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttEndpoint.java @@ -13,6 +13,14 @@ public class MqttEndpoint extends Endpoint { */ public String topic = "#"; + /** + * Whether to reconnect automatically. + * + * Automatic reconnect is disabled by default, here's why: + * https://github.com/hivemq/hivemq-mqtt-client/issues/496 + */ + public Boolean autoReconnect = false; + public MqttEndpoint() { setPort(1883); } diff --git a/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttReader.java b/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttReader.java index 7fa662e..6ca80b6 100644 --- a/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttReader.java +++ b/src/main/java/com/homeclimatecontrol/esphome2influxdb/MqttReader.java @@ -1,5 +1,12 @@ package com.homeclimatecontrol.esphome2influxdb; +import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import org.apache.logging.log4j.ThreadContext; + +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.util.Collection; import java.util.LinkedHashMap; @@ -9,19 +16,9 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.logging.log4j.ThreadContext; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -public class MqttReader extends Worker implements MqttCallback { +public class MqttReader extends Worker { private final Clock clock = Clock.systemUTC(); @@ -46,7 +43,7 @@ public class MqttReader extends Worker implements MqttCallback { */ public final String clientId = UUID.randomUUID().toString(); - private final IMqttClient client; + private final Mqtt5AsyncClient client; /** * Create an instance. @@ -64,25 +61,62 @@ public MqttReader(MqttEndpoint e, Collection devices, boolean autodiscov this.autodiscover = autodiscover; this.stopGate = stopGate; + client = createClient(e); + } + + private Mqtt5AsyncClient createClient(MqttEndpoint endpoint) { + + ThreadContext.push("createClient"); + try { - // Only authenticate if both credentials are present + + // VT: NOTE: Automatic reconnect is disabled by default, here's why: + // https://github.com/hivemq/hivemq-mqtt-client/issues/496 + + var prototype= Mqtt5Client.builder() + .identifier("esphome2influxdb-" + UUID.randomUUID()) + .serverHost(endpoint.host) + .serverPort(endpoint.getPort()); + + if (endpoint.autoReconnect) { + prototype = prototype.automaticReconnectWithDefaultConfig(); + } + + var result = prototype.buildAsync(); + + var instance = result.toBlocking().connectWith(); + if (endpoint.username != null && endpoint.password != null) { - client = new MqttClient("tcp://" + endpoint.username + ":" + endpoint.password + "@" + endpoint.host + ":" + endpoint.getPort(), clientId); - } else { - if (endpoint.username != null) { - // Bad idea to have no password - logger.warn("Missing MQTT password, connecting unauthenticated. This behavior will not be allowed in future releases."); - } - client = new MqttClient("tcp://" + endpoint.host + ":" + endpoint.getPort(), clientId); + instance = instance.simpleAuth() + .username(endpoint.username) + .password(endpoint.password.getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth(); + } + + try { + + logger.info("{}{}: connecting", + endpoint, + endpoint.autoReconnect ? " (disable reconnect if this gets stuck)" : ""); + + var ack = instance.send(); + + // send() throws an exception upon failure, will this ever be anything other than SUCCESS? + logger.info("{}: connected: {}", endpoint, ack); + + } catch (Mqtt3ConnAckException ex) { + throw new IllegalStateException("Can't connect to " + endpoint, ex); } - } catch (MqttException ex) { - throw new IllegalStateException("Failed to create a client for " + endpoint); + + return result; + + } finally { + ThreadContext.pop(); } } - private Map parseTopic(Collection source) { - Map result = new LinkedHashMap<>(); + var result = new LinkedHashMap(); for (Device d : source) { result.put( @@ -101,7 +135,7 @@ public void run() { logger.info("Started"); - connect(); + subscribe(); stopGate.await(); @@ -110,8 +144,6 @@ public void run() { } catch (InterruptedException ex) { logger.error("Interrupted, terminating", ex); Thread.currentThread().interrupt(); - } catch (MqttException ex) { - logger.fatal("MQTT problem", ex); } finally { stoppedGate.countDown(); logger.info("Shut down"); @@ -119,46 +151,22 @@ public void run() { } } - private void connect() throws MqttException { - - var options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - options.setCleanSession(true); - options.setConnectionTimeout(10); - options.setUserName(endpoint.username); - - // https://github.com/eclipse/paho.mqtt.java/issues/804 - // https://github.com/home-climate-control/dz/issues/148 - - if (endpoint.password != null) { - options.setPassword(endpoint.password.toCharArray()); - } - - client.setCallback(this); - client.connect(options); + private void subscribe() { - client.subscribe(endpoint.topic, 0); + client + .subscribeWith() + .topicFilter(endpoint.topic) + .callback(this::callback) + .send(); } - @Override - public void connectionLost(Throwable cause) { - logger.error("Lost connection", cause); - logger.info("Attempting to reconnect"); - try { - // VT: NOTE: This may not be enough, let's see how reliable this is - connect(); - } catch (MqttException ex) { - logger.fatal("Reconnect failed, giving up", ex); - } - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { + private void callback(Mqtt5Publish message) { ThreadContext.push("messageArrived"); try { - var payload = message.toString(); + var topic = message.getTopic().toString(); + var payload = new String(message.getPayloadAsBytes()); logger.debug("topic={}, message={}", topic, payload); @@ -166,7 +174,6 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { autodiscover(topic, payload); } - } finally { ThreadContext.pop(); } @@ -182,7 +189,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { */ private boolean consume(String topic, String payload) { - for (Map.Entry d : devices.entrySet()) { + for (var d : devices.entrySet()) { // Only the first match is considered, any other way doesn't make sense @@ -218,7 +225,7 @@ boolean consume(Map.Entry d, String topic, String payload, Set