From facdc39709286779e4a8ca2d65116a0494f79cc3 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Tue, 26 Nov 2024 16:28:43 +0100 Subject: [PATCH] Implement topic alias management (#873) Implemented handling of topic alias received by publishers. Doesn't implement the alias remapping in forwarded publishes. Updates CONNACK to set topic alias maximum property if user explicitly configures topic_alias_maximum configuration setting. Adds a topic alias to topic name cache to MQTTConnection, named TopicAliasMapping. Updates PUBLISH processing to handled the incoming topic alias, checking the various error condition (respect to the specification), resolve the alias to topic name and forward processing of publish message with topic name and without topic alias. Added unit tests to MQTTConnection to cover the feature. --- ChangeLog.txt | 1 + .../java/io/moquette/BrokerConstants.java | 4 +- .../moquette/broker/BrokerConfiguration.java | 16 ++ .../io/moquette/broker/MQTTConnection.java | 115 ++++++++- .../io/moquette/broker/TopicAliasMapping.java | 49 ++++ .../moquette/broker/config/FluentConfig.java | 12 +- .../io/moquette/broker/config/IConfig.java | 1 + .../broker/MQTTConnectionPublishTest.java | 223 +++++++++++++++++- .../io/moquette/broker/RetainBufferTest.java | 49 ++++ .../broker/TopicAliasMappingTest.java | 77 ++++++ distribution/src/main/resources/moquette.conf | 11 + 11 files changed, 537 insertions(+), 21 deletions(-) create mode 100644 broker/src/main/java/io/moquette/broker/TopicAliasMapping.java create mode 100644 broker/src/test/java/io/moquette/broker/RetainBufferTest.java create mode 100644 broker/src/test/java/io/moquette/broker/TopicAliasMappingTest.java diff --git a/ChangeLog.txt b/ChangeLog.txt index a9ee74ba8..e9b985295 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] Topic alias: implemented handling of topic alias received by publishers. (#873) [feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858) [feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848) [feature] Flow-control: implemented publish's quota management on the server side. (#852) diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index 2b05980dd..210ba09e6 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -124,8 +124,6 @@ public final class BrokerConstants { public static final String BUGSNAG_ENABLE_PROPERTY_NAME = "use_bugsnag"; public static final String BUGSNAG_TOKEN_PROPERTY_NAME = "bugsnag.token"; - public static final String STORAGE_CLASS_NAME = "storage_class"; - @Deprecated public static final String PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME = IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME; @@ -134,6 +132,8 @@ public final class BrokerConstants { public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF; public static final int RECEIVE_MAXIMUM = 65 * 1024; + public static final int DISABLED_TOPIC_ALIAS = 0; + private BrokerConstants() { } } diff --git a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java index 2fcced636..421ad5fbc 100644 --- a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java +++ b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java @@ -27,6 +27,7 @@ class BrokerConfiguration { private final boolean allowZeroByteClientId; private final boolean reauthorizeSubscriptionsOnConnect; private final int bufferFlushMillis; + private final int topicAliasMaximum; // integer max value means that the property is unset private int receiveMaximum; @@ -67,6 +68,8 @@ class BrokerConfiguration { } receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM); + + topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS); } // test method @@ -86,12 +89,21 @@ public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUser // test method public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId, boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis, int receiveMaximum) { + this(allowAnonymous, peerCertificateAsUsername, allowZeroByteClientId, reauthorizeSubscriptionsOnConnect, + bufferFlushMillis, receiveMaximum, BrokerConstants.DISABLED_TOPIC_ALIAS); + } + + // test method + public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId, + boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis, int receiveMaximum, + int topicAliasMaximum) { this.allowAnonymous = allowAnonymous; this.peerCertificateAsUsername = peerCertificateAsUsername; this.allowZeroByteClientId = allowZeroByteClientId; this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect; this.bufferFlushMillis = bufferFlushMillis; this.receiveMaximum = receiveMaximum; + this.topicAliasMaximum = topicAliasMaximum; } public boolean isAllowAnonymous() { @@ -117,4 +129,8 @@ public int getBufferFlushMillis() { public int receiveMaximum() { return receiveMaximum; } + + public int topicAliasMaximum() { + return topicAliasMaximum; + } } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 469c4c06c..74085c400 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -42,6 +42,7 @@ import java.security.cert.CertificateEncodingException; import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -67,12 +68,29 @@ final class MQTTConnection { private final IAuthenticator authenticator; private final SessionRegistry sessionRegistry; private final PostOffice postOffice; + private final int topicAliasMaximum; private volatile boolean connected; private final AtomicInteger lastPacketId = new AtomicInteger(0); private Session bindedSession; private int protocolVersion; private Quota receivedQuota; private Quota sendQuota; + private TopicAliasMapping aliasMappings; + + static final class ErrorCodeException extends Exception { + + private final String hexErrorCode; + private final MqttReasonCodes.Disconnect errorCode; + + public ErrorCodeException(MqttReasonCodes.Disconnect disconnectError) { + errorCode = disconnectError; + hexErrorCode = Integer.toHexString(disconnectError.byteValue()); + } + + public MqttReasonCodes.Disconnect getErrorCode() { + return errorCode; + } + } MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator, SessionRegistry sessionRegistry, PostOffice postOffice) { @@ -83,6 +101,7 @@ final class MQTTConnection { this.postOffice = postOffice; this.connected = false; this.protocolVersion = UNDEFINED_VERSION; + this.topicAliasMaximum = brokerConfig.topicAliasMaximum(); } void handleMessage(MqttMessage msg) { @@ -225,6 +244,12 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { channel.close().addListener(CLOSE_ON_FAILURE); return PostOffice.RouteResult.failed(clientId); } + + // initialize topic alias mapping + if (isProtocolVersion(msg, MqttVersion.MQTT_5)) { + aliasMappings = new TopicAliasMapping(); + } + receivedQuota = createQuota(brokerConfig.receiveMaximum()); sendQuota = retrieveSendQuota(msg); @@ -328,6 +353,11 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser if (receivedQuota.hasLimit()) { connAckPropertiesBuilder.receiveMaximum(receivedQuota.getMaximum()); } + + if (topicAliasMaximum != BrokerConstants.DISABLED_TOPIC_ALIAS) { + connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum); + } + final MqttProperties ackProperties = connAckPropertiesBuilder.build(); connAckBuilder.properties(ackProperties); } @@ -667,7 +697,26 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { final String clientId = getClientId(); final int messageID = msg.variableHeader().packetId(); LOG.trace("Processing PUBLISH message, topic: {}, messageId: {}, qos: {}", topicName, messageID, qos); - final Topic topic = new Topic(topicName); + Topic topic = new Topic(topicName); + + if (isProtocolVersion5()) { + MqttProperties.MqttProperty topicAlias = msg.variableHeader() + .properties().getProperty(MqttProperties.MqttPropertyType.TOPIC_ALIAS.value()); + if (topicAlias != null) { + try { + Optional mappedTopicName = updateAndMapTopicAlias((MqttProperties.IntegerProperty) topicAlias, topicName); + final String translatedTopicName = mappedTopicName.orElse(topicName); + msg = copyPublishMessageExceptTopicAlias(msg, translatedTopicName, messageID); + topic = new Topic(translatedTopicName); + } catch (ErrorCodeException e) { + brokerDisconnect(e.getErrorCode()); + disconnectSession(); + dropConnection(); + return PostOffice.RouteResult.failed(clientId); + } + } + } + if (!topic.isValid()) { LOG.debug("Drop connection because of invalid topic format"); dropConnection(); @@ -683,6 +732,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { // retain else msg is cleaned by the NewNettyMQTTHandler and is not available // in execution by SessionEventLoop Utils.retain(msg, PostOffice.BT_PUB_IN); + final MqttPublishMessage finalMsg = msg; switch (qos) { case AT_MOST_ONCE: return postOffice.routeCommand(clientId, "PUB QoS0", () -> { @@ -690,9 +740,9 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { if (!isBoundToSession()) { return null; } - postOffice.receivedPublishQos0(this, username, clientId, msg, expiry); + postOffice.receivedPublishQos0(this, username, clientId, finalMsg, expiry); return null; - }).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed")); + }).ifFailed(() -> Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - failed")); case AT_LEAST_ONCE: if (!receivedQuota.hasFreeSlots()) { LOG.warn("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota); @@ -708,16 +758,16 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { if (!isBoundToSession()) return null; receivedQuota.consumeSlot(); - postOffice.receivedPublishQos1(this, username, messageID, msg, expiry) + postOffice.receivedPublishQos1(this, username, messageID, finalMsg, expiry) .completableFuture().thenRun(() -> { receivedQuota.releaseSlot(); }); return null; - }).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed")); + }).ifFailed(() -> Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - failed")); case EXACTLY_ONCE: { if (!receivedQuota.hasFreeSlots()) { LOG.warn("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota); - Utils.release(msg, PostOffice.BT_PUB_IN + " - phase 1 QoS2 exceeded quota"); + Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - phase 1 QoS2 exceeded quota"); brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED); disconnectSession(); dropConnection(); @@ -728,7 +778,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { checkMatchSessionLoop(clientId); if (!isBoundToSession()) return null; - bindedSession.receivedPublishQos2(messageID, msg); + bindedSession.receivedPublishQos2(messageID, finalMsg); receivedQuota.consumeSlot(); return null; }); @@ -738,7 +788,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { return firstStepResult; } firstStepResult.completableFuture().thenRun(() -> - postOffice.receivedPublishQos2(this, msg, username, expiry).completableFuture() + postOffice.receivedPublishQos2(this, finalMsg, username, expiry).completableFuture() ); return firstStepResult; } @@ -748,6 +798,55 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { } } + private Optional updateAndMapTopicAlias(MqttProperties.IntegerProperty topicAlias, String topicName) throws ErrorCodeException { + if (topicAliasMaximum == BrokerConstants.DISABLED_TOPIC_ALIAS) { + // client is sending a topic alias when the feature was disabled form the server + LOG.info("Dropping connection {}, received a PUBLISH with topic alias while the feature is not enaled on the broker", channel); + throw new ErrorCodeException(MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + MqttProperties.IntegerProperty topicAliasTyped = topicAlias; + if (topicAliasTyped.value() == 0 || topicAliasTyped.value() > topicAliasMaximum) { + // invalid topic alias value + LOG.info("Dropping connection {}, received a topic alias ({}) outside of range (0..{}]", + channel, topicAliasTyped.value(), topicAliasMaximum); + throw new ErrorCodeException(MqttReasonCodes.Disconnect.TOPIC_ALIAS_INVALID); + } + + Optional mappedTopicName = aliasMappings.topicFromAlias(topicAliasTyped.value()); + if (mappedTopicName.isPresent()) { + // already established a mapping for the Topic Alias + if (topicName != null) { + // contains a topic name then update the mapping + aliasMappings.update(topicName, topicAliasTyped.value()); + } + } else { + // does not already have a mapping for this Topic Alias + if (topicName.isEmpty()) { + // protocol error, not present mapping, topic alias is present and no topic name + throw new ErrorCodeException(MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + // update the mapping + aliasMappings.update(topicName, topicAliasTyped.value()); + } + return mappedTopicName; + } + + private static MqttPublishMessage copyPublishMessageExceptTopicAlias(MqttPublishMessage msg, String translatedTopicName, int messageID) { + // Replace the topicName with the one retrieved and remove the topic alias property + // to avoid to forward or store. + final MqttProperties rewrittenProps = new MqttProperties(); + for (MqttProperties.MqttProperty property : msg.variableHeader().properties().listAll()) { + if (property.propertyId() == MqttProperties.MqttPropertyType.TOPIC_ALIAS.value()) { + continue; + } + rewrittenProps.add(property); + } + + MqttPublishVariableHeader rewrittenVariableHeader = + new MqttPublishVariableHeader(translatedTopicName, messageID, rewrittenProps); + return new MqttPublishMessage(msg.fixedHeader(), rewrittenVariableHeader, msg.payload()); + } + private Instant extractExpiryFromProperty(MqttPublishMessage msg) { MqttProperties.MqttProperty expiryProp = msg.variableHeader() .properties() diff --git a/broker/src/main/java/io/moquette/broker/TopicAliasMapping.java b/broker/src/main/java/io/moquette/broker/TopicAliasMapping.java new file mode 100644 index 000000000..757c12cee --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/TopicAliasMapping.java @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2012-2024 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.broker; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Implements a mapping cache so that the binding key -> value is also reversed, + * it maintains the invariant to be a 1:1 mapping. + * */ +class TopicAliasMapping { + + private final Map direct = new HashMap<>(); + private final Map reversed = new HashMap<>(); + + void update(String topicName, int topicAlias) { + Integer previousAlias = direct.put(topicName, topicAlias); + if (previousAlias != null) { + reversed.remove(previousAlias); + } + reversed.put(topicAlias, topicName); + } + + Optional topicFromAlias(int topicAlias) { + return Optional.ofNullable(reversed.get(topicAlias)); + } + + int size() { + return reversed.size(); + } +} diff --git a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java index 15041dc39..93d219fb1 100644 --- a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java @@ -17,22 +17,23 @@ import static io.moquette.broker.config.IConfig.DATA_PATH_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE; import static io.moquette.broker.config.IConfig.ENABLE_TELEMETRY_NAME; -import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME; -import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME; +import static io.moquette.broker.config.IConfig.HOST_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.JKS_PATH_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.KEY_STORE_TYPE; import static io.moquette.broker.config.IConfig.NETTY_MAX_BYTES_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PASSWORD_FILE_PROPERTY_NAME; +import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME; import static io.moquette.broker.config.IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME; +import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM; -import static io.moquette.broker.config.IConfig.HOST_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE; import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SSL_PROVIDER; +import static io.moquette.broker.config.IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.WEB_SOCKET_PORT_PROPERTY_NAME; /** @@ -209,6 +210,11 @@ public FluentConfig receiveMaximum(int receiveMaximum) { return this; } + public FluentConfig topicAliasMaximum(int topicAliasMaximum) { + configAccumulator.put(TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, Integer.valueOf(topicAliasMaximum).toString()); + return this; + } + public class TLSConfig { private SSLProvider providerType; diff --git a/broker/src/main/java/io/moquette/broker/config/IConfig.java b/broker/src/main/java/io/moquette/broker/config/IConfig.java index 366be7557..ca2ef181d 100644 --- a/broker/src/main/java/io/moquette/broker/config/IConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/IConfig.java @@ -67,6 +67,7 @@ public abstract class IConfig { public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size"; public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos"; public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092; + public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum"; public abstract void setProperty(String name, String value); diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index 73e28fc08..e53e3dcfa 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -15,6 +15,7 @@ */ package io.moquette.broker; +import io.moquette.BrokerConstants; import io.moquette.broker.security.PermitAllAuthorizatorPolicy; import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; import io.moquette.broker.subscriptions.ISubscriptionsDirectory; @@ -25,27 +26,47 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; +import io.netty.handler.codec.mqtt.MqttReasonCodes; import io.netty.handler.codec.mqtt.MqttVersion; -//import org.jetbrains.annotations.NotNull; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import static io.moquette.BrokerConstants.DISABLED_TOPIC_ALIAS; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MQTTConnectionPublishTest { + public static final int DEFAULT_TOPIC_ALIAS_MAXIMUM = 16; private static final String FAKE_CLIENT_ID = "FAKE_123"; private static final String TEST_USER = "fakeuser"; private static final String TEST_PWD = "fakepwd"; @@ -53,23 +74,28 @@ public class MQTTConnectionPublishTest { private MQTTConnection sut; private EmbeddedChannel channel; private SessionRegistry sessionRegistry; - private MqttMessageBuilders.ConnectBuilder connMsg; private MemoryQueueRepository queueRepository; private ScheduledExecutorService scheduler; + private ByteBuf payload; + private BlockingQueue forwardedPublishes; @BeforeEach public void setUp() { - connMsg = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_3_1).cleanSession(true); + forwardedPublishes = new LinkedBlockingQueue<>(); - BrokerConfiguration config = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); + BrokerConfiguration config = new BrokerConfiguration(true, false, true, + false, NO_BUFFER_FLUSH, BrokerConstants.INFLIGHT_WINDOW_SIZE, DEFAULT_TOPIC_ALIAS_MAXIMUM); scheduler = Executors.newScheduledThreadPool(1); createMQTTConnection(config); + + payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8)); } @AfterEach public void tearDown() { + payload.release(); scheduler.shutdown(); } @@ -94,11 +120,34 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel ISessionsRepository fakeSessionRepo = memorySessionsRepository(); sessionRegistry = new SessionRegistry(subscriptions, fakeSessionRepo, queueRepository, permitAll, scheduler, loopsGroup); final PostOffice postOffice = new PostOffice(subscriptions, - new MemoryRetainedRepository(), sessionRegistry, fakeSessionRepo, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, loopsGroup); + new MemoryRetainedRepository(), sessionRegistry, fakeSessionRepo, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, loopsGroup) { + + // mock the publish forwarder method + @Override + CompletableFuture receivedPublishQos0(MQTTConnection connection, String username, String clientID, + MqttPublishMessage msg, + Instant messageExpiry) { + forwardedPublishes.add(msg); + return null; + } + + @Override + RoutingResults receivedPublishQos1(MQTTConnection connection, String username, int messageID, + MqttPublishMessage msg, Instant messageExpiry) { + forwardedPublishes.add(msg); + return null; + } + + @Override + RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage msg, String username, + Instant messageExpiry) { + forwardedPublishes.add(msg); + return null; + } + }; return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); } -// @NotNull static ISessionsRepository memorySessionsRepository() { return new MemorySessionsRepository(); } @@ -106,7 +155,6 @@ static ISessionsRepository memorySessionsRepository() { @Test public void dropConnectionOnPublishWithInvalidTopicFormat() throws ExecutionException, InterruptedException { // Connect message with clean session set to true and client id is null. - final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8)); MqttPublishMessage publish = MqttMessageBuilders.publish() .topicName("") .retained(false) @@ -117,7 +165,166 @@ public void dropConnectionOnPublishWithInvalidTopicFormat() throws ExecutionExce // Verify assertFalse(channel.isOpen(), "Connection should be closed by the broker"); - payload.release(); } + @Test + public void givenPublishMessageWithInvalidTopicAliasThenConnectionDisconnects() { + connectMqtt5AndVerifyAck(sut); + + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias("kitchen/blinds", 0); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + verifyConnectionIsDropped(pubResult, MqttReasonCodes.Disconnect.TOPIC_ALIAS_INVALID); + } + + @Test + public void givenPublishMessageWithUnmappedTopicAliasThenPublishMessageIsForwardedWithoutAliasAndJustTopicName() throws InterruptedException { + connectMqtt5AndVerifyAck(sut); + + String topicName = "kitchen/blinds"; + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias(topicName, 10); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + assertNotNull(pubResult); + assertTrue(pubResult.isSuccess()); + assertTrue(channel.isOpen(), "Connection should be open"); + + // Read the forwarded publish message + MqttPublishMessage reshapedPublish = forwardedPublishes.poll(1, TimeUnit.SECONDS); + assertNotNull(reshapedPublish, "Wait time expired on reading forwarded publish message"); + assertEquals(topicName, reshapedPublish.variableHeader().topicName()); + verifyNotContainsProperty(reshapedPublish.variableHeader(), MqttProperties.MqttPropertyType.TOPIC_ALIAS); + } + + @Test + public void givenPublishMessageWithAlreadyMappedTopicAliasThenPublishMessageIsForwardedWithoutAliasAndJustTopicName() throws InterruptedException { + connectMqtt5AndVerifyAck(sut); + + String topicName = "kitchen/blinds"; + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias(topicName, 10); + + // setup the alias mapping with a first publish message + PostOffice.RouteResult pubResult = sut.processPublish(publish); + assertNotNull(pubResult); + assertTrue(pubResult.isSuccess()); + assertTrue(channel.isOpen(), "Connection should be open"); + MqttPublishMessage reshapedPublish = forwardedPublishes.poll(1, TimeUnit.SECONDS); + assertNotNull(reshapedPublish, "Wait time expired on reading forwarded publish message"); + + // Exercise, use the mapped alias + MqttPublishMessage publishWithJustAlias = createPublishWithTopicNameAndTopicAlias(10); + pubResult = sut.processPublish(publishWithJustAlias); + + // Verify + assertNotNull(pubResult); + assertTrue(pubResult.isSuccess()); + assertTrue(channel.isOpen(), "Connection should be open"); + + // Read the forwarded publish message + reshapedPublish = forwardedPublishes.poll(1, TimeUnit.SECONDS); + assertNotNull(reshapedPublish, "Wait time expired on reading forwarded publish message"); + assertEquals(topicName, reshapedPublish.variableHeader().topicName()); + verifyNotContainsProperty(reshapedPublish.variableHeader(), MqttProperties.MqttPropertyType.TOPIC_ALIAS); + } + + @Test + public void givenPublishMessageWithUnmappedTopicAliasAndEmptyTopicNameThenConnectionDisconnects() { + connectMqtt5AndVerifyAck(sut); + + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias("", 10); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + verifyConnectionIsDropped(pubResult, MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + + @Test + public void givenTopicAliasDisabledWhenPublishContainingTopicAliasIsReceivedThenConnectionIsDropped() { + // create a configuration with topic alias disabled + BrokerConfiguration config = new BrokerConfiguration(true, false, true, + false, NO_BUFFER_FLUSH, BrokerConstants.INFLIGHT_WINDOW_SIZE, + DISABLED_TOPIC_ALIAS); + // Overwrite the existing connection with new with topic alias disabled + createMQTTConnection(config); + + connectMqtt5AndVerifyAck(sut); + + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias("kitchen/blinds", 10); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + verifyConnectionIsDropped(pubResult, MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + + private void verifyConnectionIsDropped(PostOffice.RouteResult pubResult, MqttReasonCodes.Disconnect protocolError) { + assertNotNull(pubResult); + assertFalse(pubResult.isSuccess()); + assertFalse(channel.isOpen(), "Connection should be closed by the broker"); + // read last message sent + MqttMessage disconnectMsg = channel.readOutbound(); + assertEquals(MqttMessageType.DISCONNECT, disconnectMsg.fixedHeader().messageType()); + assertEquals(protocolError.byteValue(), ((MqttReasonCodeAndPropertiesVariableHeader) disconnectMsg.variableHeader()).reasonCode()); + } + + private static void verifyNotContainsProperty(MqttPublishVariableHeader header, MqttProperties.MqttPropertyType typeToVerify) { + Optional match = header.properties().listAll().stream() + .filter(mp -> mp.propertyId() == typeToVerify.value()) + .findFirst(); + assertFalse(match.isPresent(), "Found a property of type " + typeToVerify); + } + + private MqttPublishMessage createPublishWithTopicNameAndTopicAlias(String topicName, int topicAlias) { + final MqttProperties propertiesWithTopicAlias = new MqttProperties(); + propertiesWithTopicAlias.add( + new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.TOPIC_ALIAS.value(), topicAlias)); + + return MqttMessageBuilders.publish() + .topicName(topicName) + .properties(propertiesWithTopicAlias) + .qos(MqttQoS.AT_MOST_ONCE) + .payload(payload) + .build(); + } + + private MqttPublishMessage createPublishWithTopicNameAndTopicAlias(int topicAlias) { + final MqttProperties propertiesWithTopicAlias = new MqttProperties(); + propertiesWithTopicAlias.add( + new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.TOPIC_ALIAS.value(), topicAlias)); + + return MqttMessageBuilders.publish() + .properties(propertiesWithTopicAlias) + .qos(MqttQoS.AT_MOST_ONCE) + .payload(payload) + .build(); + } + + private void connectMqtt5AndVerifyAck(MQTTConnection mqttConnection) { + MqttConnectMessage connect = MqttMessageBuilders.connect() + .protocolVersion(MqttVersion.MQTT_5) + .clientId(null) + .cleanSession(true) + .build(); + PostOffice.RouteResult connectResult = mqttConnection.processConnect(connect); + assertNotNull(connectResult); + assertTrue(connectResult.isSuccess()); + // given that CONN is processed by session event loop and from that is sent also the CONNACK, wait for + // connection to be active. + Awaitility.await() + .atMost(Duration.ofSeconds(1)) + .until(mqttConnection::isConnected); + MqttConnAckMessage connAckMsg = channel.readOutbound(); + assertEquals(MqttMessageType.CONNACK, connAckMsg.fixedHeader().messageType()); + } } diff --git a/broker/src/test/java/io/moquette/broker/RetainBufferTest.java b/broker/src/test/java/io/moquette/broker/RetainBufferTest.java new file mode 100644 index 000000000..88ad1eba0 --- /dev/null +++ b/broker/src/test/java/io/moquette/broker/RetainBufferTest.java @@ -0,0 +1,49 @@ +/* + * + * * Copyright (c) 2012-2024 The original author or authors + * * ------------------------------------------------------ + * * All rights reserved. This program and the accompanying materials + * * are made available under the terms of the Eclipse Public License v1.0 + * * and Apache License v2.0 which accompanies this distribution. + * * + * * The Eclipse Public License is available at + * * http://www.eclipse.org/legal/epl-v10.html + * * + * * The Apache License v2.0 is available at + * * http://www.opensource.org/licenses/apache2.0.php + * * + * * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.broker; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RetainBufferTest { + + + @Test + public void testRetainedDuplicate() { + ByteBuf origin = Unpooled.buffer(10); + assertEquals(1, origin.refCnt()); + + ByteBuf retainedDup = origin.retainedDuplicate(); + assertEquals(2, origin.refCnt()); + assertEquals(2, retainedDup.refCnt()); + } + + @Test + public void testDuplicate() { + ByteBuf origin = Unpooled.buffer(10); + assertEquals(1, origin.refCnt()); + + ByteBuf duplicate = origin.duplicate(); + assertEquals(1, origin.refCnt()); + assertEquals(1, duplicate.refCnt()); + } +} diff --git a/broker/src/test/java/io/moquette/broker/TopicAliasMappingTest.java b/broker/src/test/java/io/moquette/broker/TopicAliasMappingTest.java new file mode 100644 index 000000000..e31db8dee --- /dev/null +++ b/broker/src/test/java/io/moquette/broker/TopicAliasMappingTest.java @@ -0,0 +1,77 @@ +/* + * + * Copyright (c) 2012-2024 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.broker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TopicAliasMappingTest { + + private TopicAliasMapping sut; + + @BeforeEach + void setUp() { + sut = new TopicAliasMapping(); + } + + private static void verifyTopicName(String expected, Optional topicName) { + assertTrue(topicName.isPresent()); + assertEquals(expected, topicName.get()); + } + + @Test + public void givenAnEmptyMappingWhenNewBindingIsRequestedThenShouldBeRetrievable() { + sut.update("sensors/temperature", 12); + + Optional topicName = sut.topicFromAlias(12); + verifyTopicName("sensors/temperature", topicName); + assertEquals(1, sut.size()); + } + + @Test + public void givenAnExistingBindingWhenNewBindingWithSameTopicNameIsRequestedThenAliasIsUpdated() { + // first binding + sut.update("sensors/temperature", 12); + + // second update with different alias + sut.update("sensors/temperature", 22); + assertEquals(1, sut.size()); + + Optional topicName = sut.topicFromAlias(22); + verifyTopicName("sensors/temperature", topicName); + } + + @Test + public void givenAnExistingBindingWhenNewBindingWithDifferentTopicNameAndSameTopicAliasIsRequestedThenAliasIsUpdated() { + // first binding + sut.update("sensors/temperature", 12); + + // second update with different name + sut.update("finance/quotes", 12); + assertEquals(1, sut.size()); + + Optional topicName = sut.topicFromAlias(12); + verifyTopicName("finance/quotes", topicName); + } +} diff --git a/distribution/src/main/resources/moquette.conf b/distribution/src/main/resources/moquette.conf index 0787ed06d..b863cced4 100644 --- a/distribution/src/main/resources/moquette.conf +++ b/distribution/src/main/resources/moquette.conf @@ -223,3 +223,14 @@ password_file config/password_file.conf # default: 2 (exactly_once) #********************************************************************* # max_server_granted_qos 2 + +#********************************************************************* +# Maximum value acceptable value by the broker as topic alias +# +# topic_alias_maximum: +# This option is used to enable and limit the number of topic alias entries on the local cache. +# By default is 0, which means that topic alias functionality is disabled. Any value in range 0..65535 is accepted, +# but given that's a cache size per connection, is better to limit in low tens. +# default: 0 (disabled) +#********************************************************************* +# topic_alias_maximum 16