From 8f7336633618937ce81ab9ac687d0012c2a70650 Mon Sep 17 00:00:00 2001 From: skobow Date: Mon, 13 May 2024 15:43:20 +0200 Subject: [PATCH 1/6] Feature complete --- .gitignore | 3 ++ build.gradle.kts | 4 ++ gradle/libs.versions.toml | 2 +- .../conf/sparkplug.properties | 1 + .../SparkplugPublishInboundInterceptor.java | 42 ++++++++++++++++-- .../configuration/SparkplugConfiguration.java | 7 ++- .../sparkplug/aware/utils/MetricMessage.java | 44 +++++++++++++++++++ .../sparkplug/aware/utils/PayloadUtil.java | 21 ++++++++- ...parkplugPublishInboundInterceptorTest.java | 36 ++++++++++++--- ...arkplugPublishOutboundInterceptorTest.java | 7 --- .../SparkplugConfigurationTest.java | 2 +- 11 files changed, 149 insertions(+), 20 deletions(-) create mode 100644 src/main/java/com/hivemq/extensions/sparkplug/aware/utils/MetricMessage.java diff --git a/.gitignore b/.gitignore index a5b3e62..158f044 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ out/ .java-version .DS_Store + +# HiveMQ +hivemq-4.*.0 \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 3a1b598..a79f8e6 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -22,6 +22,10 @@ hivemqExtension { } } +tasks.prepareHivemqHome { + hivemqHomeDirectory.set(file("hivemq-${libs.versions.hivemq.extensionSdk.get()}")) +} + task("checksum") { dependsOn("hivemqExtensionZip") checksumAlgorithm.set(Checksum.Algorithm.SHA256) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f22d127..2c3414b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,7 +1,7 @@ [versions] commonsLang = "3.8.1" guava = "31.1-jre" -hivemq-extensionSdk = "4.24.0" +hivemq-extensionSdk = "4.28.0" jackson = "2.13.2" jacksonv1 = "1.9.13" jetbrains-annotations = "24.0.1" diff --git a/src/hivemq-extension/conf/sparkplug.properties b/src/hivemq-extension/conf/sparkplug.properties index 316bdce..20ba9b8 100644 --- a/src/hivemq-extension/conf/sparkplug.properties +++ b/src/hivemq-extension/conf/sparkplug.properties @@ -9,3 +9,4 @@ sparkplug.systopic=$sparkplug/certificates/ sparkplug.compression=false sparkplug.json.log=false sparkplug.systopic.msgExpiry=4294967296 +sparkplug.metrics2topic=true \ No newline at end of file diff --git a/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java b/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java index 84bfb0b..51a619d 100644 --- a/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java +++ b/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java @@ -33,10 +33,10 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.Map; import java.util.concurrent.CompletableFuture; -import static com.hivemq.extensions.sparkplug.aware.utils.PayloadUtil.logFormattedPayload; -import static com.hivemq.extensions.sparkplug.aware.utils.PayloadUtil.modifySparkplugTimestamp; +import static com.hivemq.extensions.sparkplug.aware.utils.PayloadUtil.*; /** * {@link PublishInboundInterceptor}, @@ -51,6 +51,7 @@ public class SparkplugPublishInboundInterceptor implements PublishInboundInterce private final @NotNull String sysTopic; private final boolean useCompression; private final boolean jsonLogEnabled; + private final boolean metric2topicEnabled; private final Long messageExpiry; public SparkplugPublishInboundInterceptor(final @NotNull SparkplugConfiguration configuration, @@ -59,6 +60,7 @@ public SparkplugPublishInboundInterceptor(final @NotNull SparkplugConfiguration this.sysTopic = configuration.getSparkplugSysTopic(); this.useCompression = configuration.getCompression(); this.jsonLogEnabled = configuration.getJsonLogEnabled(); + this.metric2topicEnabled = configuration.getSparkplugMetric2topicEnabled(); this.publishService = publishService; this.messageExpiry = configuration.getSparkplugSystopicMsgexpiry(); } @@ -120,6 +122,8 @@ public void onInboundPublish( } else { log.warn("No payload present in the sparkplug message"); } + } else if (metric2topicEnabled && topicStructure.getMessageType() == MessageType.DDATA) { + publishMetrics2Topic(origin, publishPacket); } if (jsonLogEnabled) { @@ -136,8 +140,40 @@ private void publishToSysTopic(final @NotNull String origin, final @NotNull Publ log.trace("Published CLONE Msg from: {} to: {} ", origin, sysTopic + origin); } } else { - log.error("Publish to sysTopic: {} failed: {} ", sysTopic + origin, throwable.fillInStackTrace()); + log.error("Publish to sysTopic: {} failed.", sysTopic + origin, throwable.fillInStackTrace()); } }); } + + private void publishMetrics2Topic(final @NotNull String origin, final @NotNull PublishPacket publishPacket) { + publishPacket.getPayload().ifPresent(byteBuffer -> { + final Map jsonMessages = getMetricsAsMessages(origin, byteBuffer); + jsonMessages.forEach((key, value) -> doPublishMetric(origin, key, value)); + } + ); + } + + private void doPublishMetric(final String origin, final String metric, final String value) { + try { + final String newTopic = origin + "/" + metric; + // Build the publish + final PublishBuilder publishBuilder = Builders.publish(); + publishBuilder.topic(newTopic); + publishBuilder.qos(Qos.AT_LEAST_ONCE); + publishBuilder.messageExpiryInterval(messageExpiry); + publishBuilder.payload(ByteBuffer.wrap(value.getBytes())); + final CompletableFuture future = publishService.publish(publishBuilder.build()); + future.whenComplete((aVoid, throwable) -> { + if (throwable == null) { + if (log.isTraceEnabled()) { + log.trace("Published Msg from Metric: {} to: {} ", metric, newTopic); + } + } else { + log.error("Published Msg from Metric: {} failed.", metric, throwable.fillInStackTrace()); + } + }); + } catch (Exception all) { + log.error("Published Msg from Metric {} failed: {}", metric, all.getMessage()); + } + } } \ No newline at end of file diff --git a/src/main/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfiguration.java b/src/main/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfiguration.java index b7ec240..0e78b6b 100644 --- a/src/main/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfiguration.java +++ b/src/main/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfiguration.java @@ -44,7 +44,8 @@ public class SparkplugConfiguration extends PropertiesReader { private static final @NotNull String SPARKPLUG_JSON_LOG_ENABLED = "sparkplug.json.log"; private static final @NotNull String SPARKPLUG_JSON_LOG_DEFAULT = "false"; - + private static final @NotNull String SPARKPLUG_METRIC2TOPIC_ENABLED = "sparkplug.metrics2topic"; + private static final @NotNull String SPARKPLUG_METRIC2TOPIC_DEFAULT = "false"; private static final @NotNull String SPARKPLUG_SYSTOPIC_MSGEXPIRY = "sparkplug.systopic.msgExpiry"; private static final @NotNull Long SPARKPLUG_SYSTOPIC_MSGEXPIRY_DEFAULT = 4294967296L; @@ -74,6 +75,10 @@ public SparkplugConfiguration(@NotNull final File configFilePath) { return validateBooleanProperty(SPARKPLUG_JSON_LOG_ENABLED, SPARKPLUG_JSON_LOG_DEFAULT); } + public @NotNull Boolean getSparkplugMetric2topicEnabled() { + return validateBooleanProperty(SPARKPLUG_METRIC2TOPIC_ENABLED, SPARKPLUG_METRIC2TOPIC_DEFAULT); + } + private Boolean validateBooleanProperty(String key, String defaultValue) { checkNotNull(key, "Key to fetch property must not be null"); checkNotNull(defaultValue, "Default value for property must not be null"); diff --git a/src/main/java/com/hivemq/extensions/sparkplug/aware/utils/MetricMessage.java b/src/main/java/com/hivemq/extensions/sparkplug/aware/utils/MetricMessage.java new file mode 100644 index 0000000..ae78969 --- /dev/null +++ b/src/main/java/com/hivemq/extensions/sparkplug/aware/utils/MetricMessage.java @@ -0,0 +1,44 @@ +/* + * Copyright 2018-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hivemq.extensions.sparkplug.aware.utils; + +import com.hivemq.extension.sdk.api.annotations.NotNull; + +import java.text.MessageFormat; +import java.util.Date; + +public class MetricMessage { + + private static final String JSON_MESSAGE_FORMAT = + "'{'\n" + + " \"name\": \"{0}\", \n" + + " \"timestamp\": {1}, \n" + + " \"dataType\": \"{2}\", \n" + + " \"value\": {3}\n" + + "'}'"; + + private MetricMessage() { + } + + public static String createJSON(final @NotNull String name, final @NotNull Date timestamp, final @NotNull String value, final @NotNull String dataType) { + final MessageFormat mf = new MessageFormat(JSON_MESSAGE_FORMAT); + return mf.format(new Object[]{ + name, + timestamp != null ? timestamp.toInstant().getEpochSecond() : 0, + dataType, + value}); + } +} diff --git a/src/main/java/com/hivemq/extensions/sparkplug/aware/utils/PayloadUtil.java b/src/main/java/com/hivemq/extensions/sparkplug/aware/utils/PayloadUtil.java index d3c17dc..573537f 100644 --- a/src/main/java/com/hivemq/extensions/sparkplug/aware/utils/PayloadUtil.java +++ b/src/main/java/com/hivemq/extensions/sparkplug/aware/utils/PayloadUtil.java @@ -23,6 +23,7 @@ import org.eclipse.tahu.message.PayloadDecoder; import org.eclipse.tahu.message.SparkplugBPayloadDecoder; import org.eclipse.tahu.message.SparkplugBPayloadEncoder; +import org.eclipse.tahu.message.model.Metric; import org.eclipse.tahu.message.model.SparkplugBPayload; import org.eclipse.tahu.util.CompressionAlgorithm; import org.jetbrains.annotations.VisibleForTesting; @@ -31,7 +32,7 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Date; +import java.util.*; public final class PayloadUtil { private static final @NotNull Logger log = LoggerFactory.getLogger(PayloadUtil.class); @@ -100,6 +101,24 @@ public static String getPayloadAsJSON(@NotNull ByteBuffer payload) { return ""; } + public static Map getMetricsAsMessages(final String topic, final ByteBuffer byteBuffer) { + SparkplugBPayload inboundPayload = getSparkplugBPayload(byteBuffer); + if (inboundPayload == null) { + log.warn("No payload present in the sparkplug message"); + return Collections.emptyMap(); + } + TreeMap metricAsJSONMessage = new TreeMap<>(); + List metricList = inboundPayload.getMetrics(); + for (Metric m : metricList) { + if (m.getDataType().toIntValue() > 11) { + continue; + } + String message = MetricMessage.createJSON(m.getName(), m.getTimestamp(), m.getValue().toString(), m.getDataType().toString()); + metricAsJSONMessage.put(m.getName(), message); + } + return metricAsJSONMessage; + } + private static SparkplugBPayload getSparkplugBPayload(@NotNull ByteBuffer payload) { try { byte[] bytes = getBytesFromBuffer(payload); diff --git a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java index e9104b1..3fb493d 100644 --- a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java +++ b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java @@ -40,32 +40,33 @@ */ class SparkplugPublishInboundInterceptorTest { - private @NotNull SparkplugPublishInboundInterceptor sparkplugPublishInboundInterceptor; private @NotNull PublishInboundInput publishInboundInput; private @NotNull PublishInboundOutput publishInboundOutput; private @NotNull ModifiablePublishPacket publishPacket; private @NotNull Path file; - String target = "$sparkplug/certificates/spBv1.0/group/NBIRTH/edgeItem/node"; private @NotNull ClientInformation clientInformation; private @NotNull PublishService publishService; + private String target = "$sparkplug/certificates/spBv1.0/group/NBIRTH/edgeItem/node"; + @BeforeEach void setUp(final @TempDir @NotNull Path tempDir) { file = tempDir.resolve("sparkplug.properties"); - SparkplugConfiguration configuration = new SparkplugConfiguration(file.toFile()); + publishService = mock(PublishService.class); - sparkplugPublishInboundInterceptor = new SparkplugPublishInboundInterceptor(configuration, publishService); publishInboundInput = mock(PublishInboundInput.class); publishInboundOutput = mock(PublishInboundOutput.class); publishPacket = mock(ModifiablePublishPacket.class); - when(publishInboundOutput.getPublishPacket()).thenReturn(publishPacket); clientInformation = mock(ClientInformation.class); + when(publishInboundOutput.getPublishPacket()).thenReturn(publishPacket); } @Test void topicSparkplug_published() throws IOException { - Files.write(file, List.of("sparkplug.version:spBv1.0")); + final SparkplugConfiguration configuration = getSparkplugConfiguration(List.of("sparkplug.version:spBv1.0")); + final SparkplugPublishInboundInterceptor sparkplugPublishInboundInterceptor = new SparkplugPublishInboundInterceptor(configuration, publishService); + when(publishPacket.getTopic()).thenReturn("spBv1.0/group/NBIRTH/edgeItem/node"); when(publishInboundInput.getClientInformation()).thenReturn(clientInformation); when(clientInformation.getClientId()).thenReturn("alf"); @@ -73,4 +74,27 @@ void topicSparkplug_published() throws IOException { assertEquals("$sparkplug/certificates/spBv1.0/group/NBIRTH/edgeItem/node", target); } + @Test + void metric2TopicSparkplug_published() throws IOException { + final SparkplugConfiguration configuration = getSparkplugConfiguration(List.of("sparkplug.version:spBv1.0", "sparkplug.metrics2topic:true")); + final SparkplugPublishInboundInterceptor sparkplugPublishInboundInterceptor = new SparkplugPublishInboundInterceptor(configuration, publishService); + + publishInboundInput = mock(PublishInboundInput.class); + publishInboundOutput = mock(PublishInboundOutput.class); + + when(publishPacket.getTopic()).thenReturn("spBv1.0/group/NDATA/edgeItem/node"); + when(publishInboundInput.getClientInformation()).thenReturn(clientInformation); + when(clientInformation.getClientId()).thenReturn("alf"); + + sparkplugPublishInboundInterceptor.onInboundPublish(publishInboundInput, publishInboundOutput); + } + + private SparkplugConfiguration getSparkplugConfiguration(final List properties) throws IOException { + Files.write(file, properties); + + final SparkplugConfiguration configuration = new SparkplugConfiguration(file.getParent().toFile()); + configuration.readPropertiesFromFile(); + + return configuration; + } } \ No newline at end of file diff --git a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishOutboundInterceptorTest.java b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishOutboundInterceptorTest.java index e4c67af..46f53bc 100644 --- a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishOutboundInterceptorTest.java +++ b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishOutboundInterceptorTest.java @@ -17,14 +17,10 @@ import com.hivemq.extension.sdk.api.annotations.NotNull; import com.hivemq.extension.sdk.api.client.parameter.ClientInformation; -import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundInput; -import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundOutput; import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishOutboundInput; import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishOutboundOutput; import com.hivemq.extension.sdk.api.packets.publish.ModifiableOutboundPublish; import com.hivemq.extension.sdk.api.packets.publish.ModifiablePublishPacket; -import com.hivemq.extension.sdk.api.services.builder.PublishBuilder; -import com.hivemq.extension.sdk.api.services.publish.PublishService; import com.hivemq.extensions.sparkplug.aware.configuration.SparkplugConfiguration; import org.eclipse.tahu.SparkplugInvalidTypeException; import org.eclipse.tahu.message.SparkplugBPayloadEncoder; @@ -33,11 +29,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Date; @@ -45,7 +39,6 @@ import java.util.Optional; import static org.eclipse.tahu.message.model.MetricDataType.Int32; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; /** diff --git a/src/test/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfigurationTest.java b/src/test/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfigurationTest.java index 5137dfc..a662d59 100644 --- a/src/test/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfigurationTest.java +++ b/src/test/java/com/hivemq/extensions/sparkplug/aware/configuration/SparkplugConfigurationTest.java @@ -19,7 +19,7 @@ import java.io.File; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertTrue; class SparkplugConfigurationTest { From e5058aa576333105a6bd195006eaae5a087300bb Mon Sep 17 00:00:00 2001 From: skobow Date: Mon, 13 May 2024 15:55:53 +0200 Subject: [PATCH 2/6] Fix broken test --- .../sparkplug/aware/SparkplugPublishInboundInterceptor.java | 2 +- .../aware/SparkplugPublishInboundInterceptorTest.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java b/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java index 51a619d..7568783 100644 --- a/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java +++ b/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java @@ -73,7 +73,7 @@ public void onInboundPublish( final @NotNull String clientId = publishInboundInput.getClientInformation().getClientId(); final @NotNull PublishPacket publishPacket = publishInboundInput.getPublishPacket(); - final @NotNull String origin = publishInboundOutput.getPublishPacket().getTopic(); + final @NotNull String origin = publishPacket.getTopic(); final @NotNull TopicStructure topicStructure = new TopicStructure(origin); if (log.isTraceEnabled()) { diff --git a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java index 3fb493d..a5af2e9 100644 --- a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java +++ b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java @@ -82,7 +82,8 @@ void metric2TopicSparkplug_published() throws IOException { publishInboundInput = mock(PublishInboundInput.class); publishInboundOutput = mock(PublishInboundOutput.class); - when(publishPacket.getTopic()).thenReturn("spBv1.0/group/NDATA/edgeItem/node"); + when(publishPacket.getTopic()).thenReturn("spBv1.0/group/DDATA/edgeItem/node"); + when(publishInboundInput.getPublishPacket()).thenReturn(publishPacket); when(publishInboundInput.getClientInformation()).thenReturn(clientInformation); when(clientInformation.getClientId()).thenReturn("alf"); From 7bedab352d659c8c73fb09f81a8101b49d53b8fe Mon Sep 17 00:00:00 2001 From: skobow Date: Tue, 14 May 2024 16:18:01 +0200 Subject: [PATCH 3/6] Add unit test for metrics2topic feature --- .../SparkplugPublishInboundInterceptor.java | 11 +++++-- ...parkplugPublishInboundInterceptorTest.java | 30 ++++++++++++++++--- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java b/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java index 7568783..1847a9d 100644 --- a/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java +++ b/src/main/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptor.java @@ -29,6 +29,7 @@ import com.hivemq.extensions.sparkplug.aware.configuration.SparkplugConfiguration; import com.hivemq.extensions.sparkplug.aware.topics.MessageType; import com.hivemq.extensions.sparkplug.aware.topics.TopicStructure; +import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ public class SparkplugPublishInboundInterceptor implements PublishInboundInterceptor { private static final @NotNull Logger log = LoggerFactory.getLogger(SparkplugPublishInboundInterceptor.class); private final PublishService publishService; + private final PublishBuilder publishBuilder; private final @NotNull String sparkplugVersion; private final @NotNull String sysTopic; private final boolean useCompression; @@ -56,12 +58,19 @@ public class SparkplugPublishInboundInterceptor implements PublishInboundInterce public SparkplugPublishInboundInterceptor(final @NotNull SparkplugConfiguration configuration, final @NotNull PublishService publishService) { + this(configuration, publishService, Builders.publish()); + } + + @VisibleForTesting + SparkplugPublishInboundInterceptor(final @NotNull SparkplugConfiguration configuration, + final @NotNull PublishService publishService, final @NotNull PublishBuilder publishBuilder) { this.sparkplugVersion = configuration.getSparkplugVersion(); this.sysTopic = configuration.getSparkplugSysTopic(); this.useCompression = configuration.getCompression(); this.jsonLogEnabled = configuration.getJsonLogEnabled(); this.metric2topicEnabled = configuration.getSparkplugMetric2topicEnabled(); this.publishService = publishService; + this.publishBuilder = publishBuilder; this.messageExpiry = configuration.getSparkplugSystopicMsgexpiry(); } @@ -89,7 +98,6 @@ public void onInboundPublish( //it is a sparkplug publish try { // Build the publish - final PublishBuilder publishBuilder = Builders.publish(); publishBuilder.fromPublish(publishPacket); publishBuilder.topic(sysTopic + origin); publishBuilder.qos(Qos.AT_LEAST_ONCE); @@ -157,7 +165,6 @@ private void doPublishMetric(final String origin, final String metric, final Str try { final String newTopic = origin + "/" + metric; // Build the publish - final PublishBuilder publishBuilder = Builders.publish(); publishBuilder.topic(newTopic); publishBuilder.qos(Qos.AT_LEAST_ONCE); publishBuilder.messageExpiryInterval(messageExpiry); diff --git a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java index a5af2e9..aab6b92 100644 --- a/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java +++ b/src/test/java/com/hivemq/extensions/sparkplug/aware/SparkplugPublishInboundInterceptorTest.java @@ -19,7 +19,10 @@ import com.hivemq.extension.sdk.api.client.parameter.ClientInformation; import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundInput; import com.hivemq.extension.sdk.api.interceptor.publish.parameter.PublishInboundOutput; +import com.hivemq.extension.sdk.api.packets.general.Qos; import com.hivemq.extension.sdk.api.packets.publish.ModifiablePublishPacket; +import com.hivemq.extension.sdk.api.services.builder.PublishBuilder; +import com.hivemq.extension.sdk.api.services.publish.Publish; import com.hivemq.extension.sdk.api.services.publish.PublishService; import com.hivemq.extensions.sparkplug.aware.configuration.SparkplugConfiguration; import org.junit.jupiter.api.BeforeEach; @@ -27,19 +30,22 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** * @author Anja Helmbrecht-Schaar */ class SparkplugPublishInboundInterceptorTest { + private byte DDATA_PUBLISH_PAYLOAD[] = {8, -25, -34, -59, -70, -9, 49, 18, 12, 10, 6, 87, 101, 105, 103, 104, 116, 32, 3, 80, 7, 18, 19, 10, 13, 65, 103, 105, 116, 97, 116, 111, 114, 83, 112, 101, 101, 100, 32, 3, 80, 2, 18, 19, 10, 6, 83, 116, 97, 116, 117, 115, 32, 12, 122, 7, 82, 117, 110, 110, 105, 110, 103, 18, 24, 10, 11, 84, 101, 109, 112, 101, 114, 97, 116, 117, 114, 101, 32, 10, 105, -113, -1, -35, 93, 108, 44, -95, 63, 18, 21, 10, 8, 80, 114, 101, 115, 115, 117, 114, 101, 32, 10, 105, 126, 24, 3, -13, 108, 11, -82, 63, 24, 4}; + private @NotNull PublishInboundInput publishInboundInput; private @NotNull PublishInboundOutput publishInboundOutput; private @NotNull ModifiablePublishPacket publishPacket; @@ -65,7 +71,8 @@ void setUp(final @TempDir @NotNull Path tempDir) { @Test void topicSparkplug_published() throws IOException { final SparkplugConfiguration configuration = getSparkplugConfiguration(List.of("sparkplug.version:spBv1.0")); - final SparkplugPublishInboundInterceptor sparkplugPublishInboundInterceptor = new SparkplugPublishInboundInterceptor(configuration, publishService); + final PublishBuilder publishBuilder = mock(PublishBuilder.class); + final SparkplugPublishInboundInterceptor sparkplugPublishInboundInterceptor = new SparkplugPublishInboundInterceptor(configuration, publishService, publishBuilder); when(publishPacket.getTopic()).thenReturn("spBv1.0/group/NBIRTH/edgeItem/node"); when(publishInboundInput.getClientInformation()).thenReturn(clientInformation); @@ -77,17 +84,32 @@ void topicSparkplug_published() throws IOException { @Test void metric2TopicSparkplug_published() throws IOException { final SparkplugConfiguration configuration = getSparkplugConfiguration(List.of("sparkplug.version:spBv1.0", "sparkplug.metrics2topic:true")); - final SparkplugPublishInboundInterceptor sparkplugPublishInboundInterceptor = new SparkplugPublishInboundInterceptor(configuration, publishService); + + final PublishBuilder publishBuilder = mock(PublishBuilder.class); + when(publishBuilder.fromPublish(any(Publish.class))).thenReturn(publishBuilder); + when(publishBuilder.topic(anyString())).thenReturn(publishBuilder); + when(publishBuilder.qos(any(Qos.class))).thenReturn(publishBuilder); + when(publishBuilder.payload(any(ByteBuffer.class))).thenReturn(publishBuilder); + when(publishBuilder.build()).thenReturn(mock(Publish.class)); + + final SparkplugPublishInboundInterceptor sparkplugPublishInboundInterceptor = new SparkplugPublishInboundInterceptor(configuration, publishService, publishBuilder); publishInboundInput = mock(PublishInboundInput.class); publishInboundOutput = mock(PublishInboundOutput.class); + when(publishPacket.getPayload()).thenReturn(Optional.of(ByteBuffer.wrap(DDATA_PUBLISH_PAYLOAD))); + when(publishPacket.getTopic()).thenReturn("spBv1.0/group/DDATA/edgeItem/node"); + when(publishInboundInput.getPublishPacket()).thenReturn(publishPacket); + when(publishInboundInput.getClientInformation()).thenReturn(clientInformation); + when(clientInformation.getClientId()).thenReturn("alf"); when(publishPacket.getTopic()).thenReturn("spBv1.0/group/DDATA/edgeItem/node"); when(publishInboundInput.getPublishPacket()).thenReturn(publishPacket); when(publishInboundInput.getClientInformation()).thenReturn(clientInformation); when(clientInformation.getClientId()).thenReturn("alf"); sparkplugPublishInboundInterceptor.onInboundPublish(publishInboundInput, publishInboundOutput); + + verify(publishService, times(4)).publish(any(Publish.class)); } private SparkplugConfiguration getSparkplugConfiguration(final List properties) throws IOException { From 2e85ebf97c96e18ff2c8aee6a8259b6d98d7ebec Mon Sep 17 00:00:00 2001 From: skobow Date: Wed, 15 May 2024 17:01:41 +0200 Subject: [PATCH 4/6] minor cleanup of gradle dependency catalog --- gradle/libs.versions.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2c3414b..47510cd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,7 +3,7 @@ commonsLang = "3.8.1" guava = "31.1-jre" hivemq-extensionSdk = "4.28.0" jackson = "2.13.2" -jacksonv1 = "1.9.13" +jackson-asl = "1.9.13" jetbrains-annotations = "24.0.1" junit-jupiter = "5.10.0" mockito = "5.6.0" @@ -15,7 +15,7 @@ commonsLang = { module = "org.apache.commons:commons-lang3", version.ref = "comm guava = { module = "com.google.guava:guava", version.ref = "guava" } jackson = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" } jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" } -jackson-mapper-asl = { module = "org.codehaus.jackson:jackson-mapper-asl", version.ref = "jacksonv1" } +jackson-mapper-asl = { module = "org.codehaus.jackson:jackson-mapper-asl", version.ref = "jackson-asl" } jetbrains-annotations = { module = "org.jetbrains:annotations", version.ref = "jetbrains-annotations" } mockito = { module = "org.mockito:mockito-core", version.ref = "mockito" } protobuf = { module = "com.google.protobuf:protobuf-java", version.ref = "protobuf" } From 7697bc18f904f991d2cb71702d60bdc364c82d90 Mon Sep 17 00:00:00 2001 From: skobow Date: Wed, 15 May 2024 17:30:26 +0200 Subject: [PATCH 5/6] Integrate default release process --- .github/workflows/gradle-build.yml | 44 ++++++++ .github/workflows/gradle-release.yml | 153 +++++++++++++++++++++++++++ build.gradle.kts | 8 ++ gradle.properties | 2 +- gradle/libs.versions.toml | 1 + 5 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/gradle-build.yml create mode 100644 .github/workflows/gradle-release.yml diff --git a/.github/workflows/gradle-build.yml b/.github/workflows/gradle-build.yml new file mode 100644 index 0000000..f25dcbd --- /dev/null +++ b/.github/workflows/gradle-build.yml @@ -0,0 +1,44 @@ +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. +# This workflow will build a Java project with Gradle and cache/restore any dependencies to improve the workflow execution time +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-gradle + +name: Extension Build and Test +on: + push: + branches: + - feature/** + tags: + - '*' + pull_request: + branches: + - main + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Cache gradle + uses: actions/cache@v3.3.3 + with: + path: | + ~/.gradle + key: gradle-${{ hashFiles('**/build.gradle.kts') }} + + - name: Set up JDK 11 + uses: actions/setup-java@v4.0.0 + with: + java-version: '11' + distribution: 'temurin' + check-latest: true + cache: gradle + + - name: Build with Gradle + run: | + ./gradlew check test build \ No newline at end of file diff --git a/.github/workflows/gradle-release.yml b/.github/workflows/gradle-release.yml new file mode 100644 index 0000000..b640223 --- /dev/null +++ b/.github/workflows/gradle-release.yml @@ -0,0 +1,153 @@ +name: Create Release +on: + workflow_dispatch: + branches: + - main + - feature/** + inputs: + version: + description: 'Version to release (e.g. 0.0.0)' + required: true + default: '4.28.0' + nextVersion: + description: 'Next version for development (e.g. 0.0.0; do not include -SNAPSHOT)' + required: true + default: '4.29.0' + preRelease: + description: 'Is this a pre-release?' + required: true + default: false +# updateWorkflow: +# description: 'Update workflow default versions? NOTE: A personal access token must be available as secret WORKFLOW_TOKEN for this to work!' +# required: false +# default: false + +permissions: + contents: write + actions: write + +jobs: + release: + name: Release + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Validate versions + run: | + VERSION_REGEX="^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$" + + echo "Checking if version and nextVersion comply with semantic versioning..." + if [[ ! "${{ github.event.inputs.version }}" =~ $VERSION_REGEX ]] || [[ ! "${{ github.event.inputs.nextVersion }}" =~ $VERSION_REGEX ]]; then + echo "Error: version does not comply with semantic versioning! Use MAJOR.MINOR.PATCH pattern." + exit 1 + fi + + echo "Checking if version and nextVersion are identical..." + if [[ ( "${{ github.event.inputs.version }}" == "${{ github.event.inputs.nextVersion }}" ) ]]; then + echo "Error: version and nextVersion may not be identical." + exit 1 + fi + + echo "Checking if nextVersion is higher than version..." + IFS='.' read -ra VERSION_ARR <<< "${{ github.event.inputs.version }}" + IFS='.' read -ra NEXT_VERSION_ARR <<< "${{ github.event.inputs.nextVersion }}" + + for i in "${!VERSION_ARR[@]}"; do + if (( NEXT_VERSION_ARR[i] < VERSION_ARR[i] )); then + echo "Error: next version needs to be higher than release version" + exit 1 + elif (( NEXT_VERSION_ARR[i] > VERSION_ARR[i] )); then + break + fi + done + + echo "Checking if tag v${{ github.event.inputs.version }} already exists..." + if git rev-parse "v${{ github.event.inputs.version }}" >/dev/null 2>&1; then + echo "Tag v${{ github.event.inputs.version }} already exists!" + exit 1 + fi + + echo "Creating release with version v${{ github.event.inputs.version }}." + echo "Next version will be v${{ github.event.inputs.nextVersion }}-SNAPSHOT." + + echo "VERSION=${{ github.event.inputs.version }}" >> $GITHUB_ENV + echo "NEXT_VERSION=${{ github.event.inputs.nextVersion }}" >> $GITHUB_ENV + echo "BRANCH_NAME=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV + - name: Check pre-release + run: | + echo "Creating pre-release: ${{ github.event.inputs.preRelease }}" + if [[ ${{ github.event.inputs.preRelease }} == "true" ]]; then + echo "PRE_RELEASE=true" >> $GITHUB_ENV + else + echo "Creating final release from ${{ env.BRANCH_NAME }}." + if [[ "${{ env.BRANCH_NAME }}" != "main" ]]; then + echo "Error: final releases can only be created from main branch." + exit 1 + fi + echo "PRE_RELEASE=false" >> $GITHUB_ENV + fi + + - name: Set up JDK 11 + uses: actions/setup-java@v4.0.0 + with: + java-version: '11' + distribution: 'temurin' + check-latest: true + cache: gradle + + - name: Setup Gradle + uses: gradle/gradle-build-action@v3.0.0 + + - name: Setup git config + run: | + git config --global user.name "GitHub Action" + git config --global user.email "<>" + + - name: Release with Gradle + run: ./gradlew release -Prelease.useAutomaticVersion=true -Prelease.releaseVersion=${{ env.VERSION }} -Prelease.newVersion=${{ env.NEXT_VERSION }}-SNAPSHOT + + - name: Generate changelog + id: changelog + run: | + CHANGELOG=$(git log $(git describe --tags --abbrev=0)..HEAD --pretty=format:"- %s") + echo "::set-output name=changelog::$CHANGELOG" + + - name: Create GitHub Release + uses: softprops/action-gh-release@v1 + with: + name: Release v${{ env.VERSION }} + tag_name: ${{ env.VERSION }} + prerelease: ${{ env.PRE_RELEASE }} + body: ${{ steps.changelog.outputs.changelog }} + files: | + ./build/hivemq-extension/hivemq-sparkplug-aware-extension-*.zip + ./build/hivemq-extension/hivemq-sparkplug-aware-extension-*.zip.sha256 + +# - name: Update workflow +# run: | +# +# if [[ ${{ github.event.inputs.updateWorkflow }} == "false" ]]; then +# echo "Skipping workflow update." +# exit 0 +# fi +# +# IFS='.' read -ra NEXT_VERSION_ARR <<< "${{ env.NEXT_VERSION }}" +# NEXT_VERSION_MINOR=$((NEXT_VERSION_ARR[1] + 1)) +# NEW_NEXT_VERSION="${NEXT_VERSION_ARR[0]}.${NEXT_VERSION_MINOR}.0" +# +# echo $NEW_NEXT_VERSION +# +# set -x +# VERSION=${{ env.NEXT_VERSION }} yq eval '.on.workflow_dispatch.inputs.version.default = env(VERSION)' -i .github/workflows/gradle-release.yml +# NEXT_VERSION=$NEW_NEXT_VERSION yq eval '.on.workflow_dispatch.inputs.nextVersion.default = env(NEXT_VERSION)' -i .github/workflows/gradle-release.yml +# set +x +# +# cat .github/workflows/gradle-release.yml +# +# git config --global user.name "GitHub Action" +# git config --global user.email "<>" +# git add .github/workflows/gradle-release.yml +# git commit -m "Update workflow default versions" +# git push diff --git a/build.gradle.kts b/build.gradle.kts index a79f8e6..2b0b706 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,3 +1,4 @@ +import net.researchgate.release.ReleaseExtension import org.gradle.crypto.checksum.Checksum plugins { @@ -5,6 +6,7 @@ plugins { alias(libs.plugins.defaults) alias(libs.plugins.license) alias(libs.plugins.checksum) + alias(libs.plugins.release) } group = "com.hivemq.extensions.sparkplug.aware" @@ -22,6 +24,12 @@ hivemqExtension { } } +configure { + ignoredSnapshotDependencies.set(listOf("net.researchgate:gradle-release")) + revertOnFail.set(true) + buildTasks.set(listOf("clean", "hivemqExtensionZip", "checksum")) +} + tasks.prepareHivemqHome { hivemqHomeDirectory.set(file("hivemq-${libs.versions.hivemq.extensionSdk.get()}")) } diff --git a/gradle.properties b/gradle.properties index 9e50a12..f314626 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=4.24.0 +version=4.28.0 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 47510cd..fe34196 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -26,3 +26,4 @@ hivemq-extension = { id = "com.hivemq.extension", version = "3.1.0" } defaults = { id = "io.github.sgtsilvio.gradle.defaults", version = "0.2.0" } license = { id = "com.github.hierynomus.license", version = "0.16.1" } checksum = { id = "org.gradle.crypto.checksum", version = "1.4.0"} +release = { id = "net.researchgate.release", version = "3.0.2" } From a8d7d1a5e41d2da8c8f4cde39d33acb6b1f8b4ca Mon Sep 17 00:00:00 2001 From: skobow Date: Thu, 16 May 2024 09:21:55 +0200 Subject: [PATCH 6/6] Delete check workflow --- .github/workflows/check.yml | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 .github/workflows/check.yml diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml deleted file mode 100644 index 99d3828..0000000 --- a/.github/workflows/check.yml +++ /dev/null @@ -1,18 +0,0 @@ -name: CI Check - -on: [ push ] - -jobs: - check: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v3 - - name: Setup Java - uses: actions/setup-java@v3 - with: - distribution: temurin - java-version: 11 - cache: gradle - - name: Check - run: ./gradlew check