diff --git a/ChangeLog.txt b/ChangeLog.txt index a9ee74ba8..e9b985295 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] Topic alias: implemented handling of topic alias received by publishers. (#873) [feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858) [feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848) [feature] Flow-control: implemented publish's quota management on the server side. (#852) diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index 2b05980dd..210ba09e6 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -124,8 +124,6 @@ public final class BrokerConstants { public static final String BUGSNAG_ENABLE_PROPERTY_NAME = "use_bugsnag"; public static final String BUGSNAG_TOKEN_PROPERTY_NAME = "bugsnag.token"; - public static final String STORAGE_CLASS_NAME = "storage_class"; - @Deprecated public static final String PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME = IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME; @@ -134,6 +132,8 @@ public final class BrokerConstants { public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF; public static final int RECEIVE_MAXIMUM = 65 * 1024; + public static final int DISABLED_TOPIC_ALIAS = 0; + private BrokerConstants() { } } diff --git a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java index 2fcced636..421ad5fbc 100644 --- a/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java +++ b/broker/src/main/java/io/moquette/broker/BrokerConfiguration.java @@ -27,6 +27,7 @@ class BrokerConfiguration { private final boolean allowZeroByteClientId; private final boolean reauthorizeSubscriptionsOnConnect; private final int bufferFlushMillis; + private final int topicAliasMaximum; // integer max value means that the property is unset private int receiveMaximum; @@ -67,6 +68,8 @@ class BrokerConfiguration { } receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM); + + topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS); } // test method @@ -86,12 +89,21 @@ public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUser // test method public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId, boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis, int receiveMaximum) { + this(allowAnonymous, peerCertificateAsUsername, allowZeroByteClientId, reauthorizeSubscriptionsOnConnect, + bufferFlushMillis, receiveMaximum, BrokerConstants.DISABLED_TOPIC_ALIAS); + } + + // test method + public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId, + boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis, int receiveMaximum, + int topicAliasMaximum) { this.allowAnonymous = allowAnonymous; this.peerCertificateAsUsername = peerCertificateAsUsername; this.allowZeroByteClientId = allowZeroByteClientId; this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect; this.bufferFlushMillis = bufferFlushMillis; this.receiveMaximum = receiveMaximum; + this.topicAliasMaximum = topicAliasMaximum; } public boolean isAllowAnonymous() { @@ -117,4 +129,8 @@ public int getBufferFlushMillis() { public int receiveMaximum() { return receiveMaximum; } + + public int topicAliasMaximum() { + return topicAliasMaximum; + } } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 469c4c06c..74085c400 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -42,6 +42,7 @@ import java.security.cert.CertificateEncodingException; import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -67,12 +68,29 @@ final class MQTTConnection { private final IAuthenticator authenticator; private final SessionRegistry sessionRegistry; private final PostOffice postOffice; + private final int topicAliasMaximum; private volatile boolean connected; private final AtomicInteger lastPacketId = new AtomicInteger(0); private Session bindedSession; private int protocolVersion; private Quota receivedQuota; private Quota sendQuota; + private TopicAliasMapping aliasMappings; + + static final class ErrorCodeException extends Exception { + + private final String hexErrorCode; + private final MqttReasonCodes.Disconnect errorCode; + + public ErrorCodeException(MqttReasonCodes.Disconnect disconnectError) { + errorCode = disconnectError; + hexErrorCode = Integer.toHexString(disconnectError.byteValue()); + } + + public MqttReasonCodes.Disconnect getErrorCode() { + return errorCode; + } + } MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator, SessionRegistry sessionRegistry, PostOffice postOffice) { @@ -83,6 +101,7 @@ final class MQTTConnection { this.postOffice = postOffice; this.connected = false; this.protocolVersion = UNDEFINED_VERSION; + this.topicAliasMaximum = brokerConfig.topicAliasMaximum(); } void handleMessage(MqttMessage msg) { @@ -225,6 +244,12 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { channel.close().addListener(CLOSE_ON_FAILURE); return PostOffice.RouteResult.failed(clientId); } + + // initialize topic alias mapping + if (isProtocolVersion(msg, MqttVersion.MQTT_5)) { + aliasMappings = new TopicAliasMapping(); + } + receivedQuota = createQuota(brokerConfig.receiveMaximum()); sendQuota = retrieveSendQuota(msg); @@ -328,6 +353,11 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser if (receivedQuota.hasLimit()) { connAckPropertiesBuilder.receiveMaximum(receivedQuota.getMaximum()); } + + if (topicAliasMaximum != BrokerConstants.DISABLED_TOPIC_ALIAS) { + connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum); + } + final MqttProperties ackProperties = connAckPropertiesBuilder.build(); connAckBuilder.properties(ackProperties); } @@ -667,7 +697,26 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { final String clientId = getClientId(); final int messageID = msg.variableHeader().packetId(); LOG.trace("Processing PUBLISH message, topic: {}, messageId: {}, qos: {}", topicName, messageID, qos); - final Topic topic = new Topic(topicName); + Topic topic = new Topic(topicName); + + if (isProtocolVersion5()) { + MqttProperties.MqttProperty topicAlias = msg.variableHeader() + .properties().getProperty(MqttProperties.MqttPropertyType.TOPIC_ALIAS.value()); + if (topicAlias != null) { + try { + Optional mappedTopicName = updateAndMapTopicAlias((MqttProperties.IntegerProperty) topicAlias, topicName); + final String translatedTopicName = mappedTopicName.orElse(topicName); + msg = copyPublishMessageExceptTopicAlias(msg, translatedTopicName, messageID); + topic = new Topic(translatedTopicName); + } catch (ErrorCodeException e) { + brokerDisconnect(e.getErrorCode()); + disconnectSession(); + dropConnection(); + return PostOffice.RouteResult.failed(clientId); + } + } + } + if (!topic.isValid()) { LOG.debug("Drop connection because of invalid topic format"); dropConnection(); @@ -683,6 +732,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { // retain else msg is cleaned by the NewNettyMQTTHandler and is not available // in execution by SessionEventLoop Utils.retain(msg, PostOffice.BT_PUB_IN); + final MqttPublishMessage finalMsg = msg; switch (qos) { case AT_MOST_ONCE: return postOffice.routeCommand(clientId, "PUB QoS0", () -> { @@ -690,9 +740,9 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { if (!isBoundToSession()) { return null; } - postOffice.receivedPublishQos0(this, username, clientId, msg, expiry); + postOffice.receivedPublishQos0(this, username, clientId, finalMsg, expiry); return null; - }).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed")); + }).ifFailed(() -> Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - failed")); case AT_LEAST_ONCE: if (!receivedQuota.hasFreeSlots()) { LOG.warn("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota); @@ -708,16 +758,16 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { if (!isBoundToSession()) return null; receivedQuota.consumeSlot(); - postOffice.receivedPublishQos1(this, username, messageID, msg, expiry) + postOffice.receivedPublishQos1(this, username, messageID, finalMsg, expiry) .completableFuture().thenRun(() -> { receivedQuota.releaseSlot(); }); return null; - }).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed")); + }).ifFailed(() -> Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - failed")); case EXACTLY_ONCE: { if (!receivedQuota.hasFreeSlots()) { LOG.warn("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota); - Utils.release(msg, PostOffice.BT_PUB_IN + " - phase 1 QoS2 exceeded quota"); + Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - phase 1 QoS2 exceeded quota"); brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED); disconnectSession(); dropConnection(); @@ -728,7 +778,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { checkMatchSessionLoop(clientId); if (!isBoundToSession()) return null; - bindedSession.receivedPublishQos2(messageID, msg); + bindedSession.receivedPublishQos2(messageID, finalMsg); receivedQuota.consumeSlot(); return null; }); @@ -738,7 +788,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { return firstStepResult; } firstStepResult.completableFuture().thenRun(() -> - postOffice.receivedPublishQos2(this, msg, username, expiry).completableFuture() + postOffice.receivedPublishQos2(this, finalMsg, username, expiry).completableFuture() ); return firstStepResult; } @@ -748,6 +798,55 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { } } + private Optional updateAndMapTopicAlias(MqttProperties.IntegerProperty topicAlias, String topicName) throws ErrorCodeException { + if (topicAliasMaximum == BrokerConstants.DISABLED_TOPIC_ALIAS) { + // client is sending a topic alias when the feature was disabled form the server + LOG.info("Dropping connection {}, received a PUBLISH with topic alias while the feature is not enaled on the broker", channel); + throw new ErrorCodeException(MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + MqttProperties.IntegerProperty topicAliasTyped = topicAlias; + if (topicAliasTyped.value() == 0 || topicAliasTyped.value() > topicAliasMaximum) { + // invalid topic alias value + LOG.info("Dropping connection {}, received a topic alias ({}) outside of range (0..{}]", + channel, topicAliasTyped.value(), topicAliasMaximum); + throw new ErrorCodeException(MqttReasonCodes.Disconnect.TOPIC_ALIAS_INVALID); + } + + Optional mappedTopicName = aliasMappings.topicFromAlias(topicAliasTyped.value()); + if (mappedTopicName.isPresent()) { + // already established a mapping for the Topic Alias + if (topicName != null) { + // contains a topic name then update the mapping + aliasMappings.update(topicName, topicAliasTyped.value()); + } + } else { + // does not already have a mapping for this Topic Alias + if (topicName.isEmpty()) { + // protocol error, not present mapping, topic alias is present and no topic name + throw new ErrorCodeException(MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + // update the mapping + aliasMappings.update(topicName, topicAliasTyped.value()); + } + return mappedTopicName; + } + + private static MqttPublishMessage copyPublishMessageExceptTopicAlias(MqttPublishMessage msg, String translatedTopicName, int messageID) { + // Replace the topicName with the one retrieved and remove the topic alias property + // to avoid to forward or store. + final MqttProperties rewrittenProps = new MqttProperties(); + for (MqttProperties.MqttProperty property : msg.variableHeader().properties().listAll()) { + if (property.propertyId() == MqttProperties.MqttPropertyType.TOPIC_ALIAS.value()) { + continue; + } + rewrittenProps.add(property); + } + + MqttPublishVariableHeader rewrittenVariableHeader = + new MqttPublishVariableHeader(translatedTopicName, messageID, rewrittenProps); + return new MqttPublishMessage(msg.fixedHeader(), rewrittenVariableHeader, msg.payload()); + } + private Instant extractExpiryFromProperty(MqttPublishMessage msg) { MqttProperties.MqttProperty expiryProp = msg.variableHeader() .properties() diff --git a/broker/src/main/java/io/moquette/broker/TopicAliasMapping.java b/broker/src/main/java/io/moquette/broker/TopicAliasMapping.java new file mode 100644 index 000000000..757c12cee --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/TopicAliasMapping.java @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2012-2024 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.broker; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Implements a mapping cache so that the binding key -> value is also reversed, + * it maintains the invariant to be a 1:1 mapping. + * */ +class TopicAliasMapping { + + private final Map direct = new HashMap<>(); + private final Map reversed = new HashMap<>(); + + void update(String topicName, int topicAlias) { + Integer previousAlias = direct.put(topicName, topicAlias); + if (previousAlias != null) { + reversed.remove(previousAlias); + } + reversed.put(topicAlias, topicName); + } + + Optional topicFromAlias(int topicAlias) { + return Optional.ofNullable(reversed.get(topicAlias)); + } + + int size() { + return reversed.size(); + } +} diff --git a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java index 15041dc39..93d219fb1 100644 --- a/broker/src/main/java/io/moquette/broker/config/FluentConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/FluentConfig.java @@ -17,22 +17,23 @@ import static io.moquette.broker.config.IConfig.DATA_PATH_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE; import static io.moquette.broker.config.IConfig.ENABLE_TELEMETRY_NAME; -import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME; -import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME; +import static io.moquette.broker.config.IConfig.HOST_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.JKS_PATH_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.KEY_STORE_TYPE; import static io.moquette.broker.config.IConfig.NETTY_MAX_BYTES_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PASSWORD_FILE_PROPERTY_NAME; +import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME; import static io.moquette.broker.config.IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME; +import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM; -import static io.moquette.broker.config.IConfig.HOST_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE; import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.SSL_PROVIDER; +import static io.moquette.broker.config.IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME; import static io.moquette.broker.config.IConfig.WEB_SOCKET_PORT_PROPERTY_NAME; /** @@ -209,6 +210,11 @@ public FluentConfig receiveMaximum(int receiveMaximum) { return this; } + public FluentConfig topicAliasMaximum(int topicAliasMaximum) { + configAccumulator.put(TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, Integer.valueOf(topicAliasMaximum).toString()); + return this; + } + public class TLSConfig { private SSLProvider providerType; diff --git a/broker/src/main/java/io/moquette/broker/config/IConfig.java b/broker/src/main/java/io/moquette/broker/config/IConfig.java index 366be7557..ca2ef181d 100644 --- a/broker/src/main/java/io/moquette/broker/config/IConfig.java +++ b/broker/src/main/java/io/moquette/broker/config/IConfig.java @@ -67,6 +67,7 @@ public abstract class IConfig { public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size"; public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos"; public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092; + public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum"; public abstract void setProperty(String name, String value); diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index 73e28fc08..e53e3dcfa 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -15,6 +15,7 @@ */ package io.moquette.broker; +import io.moquette.BrokerConstants; import io.moquette.broker.security.PermitAllAuthorizatorPolicy; import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; import io.moquette.broker.subscriptions.ISubscriptionsDirectory; @@ -25,27 +26,47 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnectMessage; +import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; +import io.netty.handler.codec.mqtt.MqttReasonCodes; import io.netty.handler.codec.mqtt.MqttVersion; -//import org.jetbrains.annotations.NotNull; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import static io.moquette.BrokerConstants.DISABLED_TOPIC_ALIAS; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class MQTTConnectionPublishTest { + public static final int DEFAULT_TOPIC_ALIAS_MAXIMUM = 16; private static final String FAKE_CLIENT_ID = "FAKE_123"; private static final String TEST_USER = "fakeuser"; private static final String TEST_PWD = "fakepwd"; @@ -53,23 +74,28 @@ public class MQTTConnectionPublishTest { private MQTTConnection sut; private EmbeddedChannel channel; private SessionRegistry sessionRegistry; - private MqttMessageBuilders.ConnectBuilder connMsg; private MemoryQueueRepository queueRepository; private ScheduledExecutorService scheduler; + private ByteBuf payload; + private BlockingQueue forwardedPublishes; @BeforeEach public void setUp() { - connMsg = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_3_1).cleanSession(true); + forwardedPublishes = new LinkedBlockingQueue<>(); - BrokerConfiguration config = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); + BrokerConfiguration config = new BrokerConfiguration(true, false, true, + false, NO_BUFFER_FLUSH, BrokerConstants.INFLIGHT_WINDOW_SIZE, DEFAULT_TOPIC_ALIAS_MAXIMUM); scheduler = Executors.newScheduledThreadPool(1); createMQTTConnection(config); + + payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8)); } @AfterEach public void tearDown() { + payload.release(); scheduler.shutdown(); } @@ -94,11 +120,34 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel ISessionsRepository fakeSessionRepo = memorySessionsRepository(); sessionRegistry = new SessionRegistry(subscriptions, fakeSessionRepo, queueRepository, permitAll, scheduler, loopsGroup); final PostOffice postOffice = new PostOffice(subscriptions, - new MemoryRetainedRepository(), sessionRegistry, fakeSessionRepo, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, loopsGroup); + new MemoryRetainedRepository(), sessionRegistry, fakeSessionRepo, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, loopsGroup) { + + // mock the publish forwarder method + @Override + CompletableFuture receivedPublishQos0(MQTTConnection connection, String username, String clientID, + MqttPublishMessage msg, + Instant messageExpiry) { + forwardedPublishes.add(msg); + return null; + } + + @Override + RoutingResults receivedPublishQos1(MQTTConnection connection, String username, int messageID, + MqttPublishMessage msg, Instant messageExpiry) { + forwardedPublishes.add(msg); + return null; + } + + @Override + RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage msg, String username, + Instant messageExpiry) { + forwardedPublishes.add(msg); + return null; + } + }; return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); } -// @NotNull static ISessionsRepository memorySessionsRepository() { return new MemorySessionsRepository(); } @@ -106,7 +155,6 @@ static ISessionsRepository memorySessionsRepository() { @Test public void dropConnectionOnPublishWithInvalidTopicFormat() throws ExecutionException, InterruptedException { // Connect message with clean session set to true and client id is null. - final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8)); MqttPublishMessage publish = MqttMessageBuilders.publish() .topicName("") .retained(false) @@ -117,7 +165,166 @@ public void dropConnectionOnPublishWithInvalidTopicFormat() throws ExecutionExce // Verify assertFalse(channel.isOpen(), "Connection should be closed by the broker"); - payload.release(); } + @Test + public void givenPublishMessageWithInvalidTopicAliasThenConnectionDisconnects() { + connectMqtt5AndVerifyAck(sut); + + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias("kitchen/blinds", 0); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + verifyConnectionIsDropped(pubResult, MqttReasonCodes.Disconnect.TOPIC_ALIAS_INVALID); + } + + @Test + public void givenPublishMessageWithUnmappedTopicAliasThenPublishMessageIsForwardedWithoutAliasAndJustTopicName() throws InterruptedException { + connectMqtt5AndVerifyAck(sut); + + String topicName = "kitchen/blinds"; + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias(topicName, 10); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + assertNotNull(pubResult); + assertTrue(pubResult.isSuccess()); + assertTrue(channel.isOpen(), "Connection should be open"); + + // Read the forwarded publish message + MqttPublishMessage reshapedPublish = forwardedPublishes.poll(1, TimeUnit.SECONDS); + assertNotNull(reshapedPublish, "Wait time expired on reading forwarded publish message"); + assertEquals(topicName, reshapedPublish.variableHeader().topicName()); + verifyNotContainsProperty(reshapedPublish.variableHeader(), MqttProperties.MqttPropertyType.TOPIC_ALIAS); + } + + @Test + public void givenPublishMessageWithAlreadyMappedTopicAliasThenPublishMessageIsForwardedWithoutAliasAndJustTopicName() throws InterruptedException { + connectMqtt5AndVerifyAck(sut); + + String topicName = "kitchen/blinds"; + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias(topicName, 10); + + // setup the alias mapping with a first publish message + PostOffice.RouteResult pubResult = sut.processPublish(publish); + assertNotNull(pubResult); + assertTrue(pubResult.isSuccess()); + assertTrue(channel.isOpen(), "Connection should be open"); + MqttPublishMessage reshapedPublish = forwardedPublishes.poll(1, TimeUnit.SECONDS); + assertNotNull(reshapedPublish, "Wait time expired on reading forwarded publish message"); + + // Exercise, use the mapped alias + MqttPublishMessage publishWithJustAlias = createPublishWithTopicNameAndTopicAlias(10); + pubResult = sut.processPublish(publishWithJustAlias); + + // Verify + assertNotNull(pubResult); + assertTrue(pubResult.isSuccess()); + assertTrue(channel.isOpen(), "Connection should be open"); + + // Read the forwarded publish message + reshapedPublish = forwardedPublishes.poll(1, TimeUnit.SECONDS); + assertNotNull(reshapedPublish, "Wait time expired on reading forwarded publish message"); + assertEquals(topicName, reshapedPublish.variableHeader().topicName()); + verifyNotContainsProperty(reshapedPublish.variableHeader(), MqttProperties.MqttPropertyType.TOPIC_ALIAS); + } + + @Test + public void givenPublishMessageWithUnmappedTopicAliasAndEmptyTopicNameThenConnectionDisconnects() { + connectMqtt5AndVerifyAck(sut); + + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias("", 10); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + verifyConnectionIsDropped(pubResult, MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + + @Test + public void givenTopicAliasDisabledWhenPublishContainingTopicAliasIsReceivedThenConnectionIsDropped() { + // create a configuration with topic alias disabled + BrokerConfiguration config = new BrokerConfiguration(true, false, true, + false, NO_BUFFER_FLUSH, BrokerConstants.INFLIGHT_WINDOW_SIZE, + DISABLED_TOPIC_ALIAS); + // Overwrite the existing connection with new with topic alias disabled + createMQTTConnection(config); + + connectMqtt5AndVerifyAck(sut); + + MqttPublishMessage publish = createPublishWithTopicNameAndTopicAlias("kitchen/blinds", 10); + + // Exercise + PostOffice.RouteResult pubResult = sut.processPublish(publish); + + // Verify + verifyConnectionIsDropped(pubResult, MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + } + + private void verifyConnectionIsDropped(PostOffice.RouteResult pubResult, MqttReasonCodes.Disconnect protocolError) { + assertNotNull(pubResult); + assertFalse(pubResult.isSuccess()); + assertFalse(channel.isOpen(), "Connection should be closed by the broker"); + // read last message sent + MqttMessage disconnectMsg = channel.readOutbound(); + assertEquals(MqttMessageType.DISCONNECT, disconnectMsg.fixedHeader().messageType()); + assertEquals(protocolError.byteValue(), ((MqttReasonCodeAndPropertiesVariableHeader) disconnectMsg.variableHeader()).reasonCode()); + } + + private static void verifyNotContainsProperty(MqttPublishVariableHeader header, MqttProperties.MqttPropertyType typeToVerify) { + Optional match = header.properties().listAll().stream() + .filter(mp -> mp.propertyId() == typeToVerify.value()) + .findFirst(); + assertFalse(match.isPresent(), "Found a property of type " + typeToVerify); + } + + private MqttPublishMessage createPublishWithTopicNameAndTopicAlias(String topicName, int topicAlias) { + final MqttProperties propertiesWithTopicAlias = new MqttProperties(); + propertiesWithTopicAlias.add( + new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.TOPIC_ALIAS.value(), topicAlias)); + + return MqttMessageBuilders.publish() + .topicName(topicName) + .properties(propertiesWithTopicAlias) + .qos(MqttQoS.AT_MOST_ONCE) + .payload(payload) + .build(); + } + + private MqttPublishMessage createPublishWithTopicNameAndTopicAlias(int topicAlias) { + final MqttProperties propertiesWithTopicAlias = new MqttProperties(); + propertiesWithTopicAlias.add( + new MqttProperties.IntegerProperty( + MqttProperties.MqttPropertyType.TOPIC_ALIAS.value(), topicAlias)); + + return MqttMessageBuilders.publish() + .properties(propertiesWithTopicAlias) + .qos(MqttQoS.AT_MOST_ONCE) + .payload(payload) + .build(); + } + + private void connectMqtt5AndVerifyAck(MQTTConnection mqttConnection) { + MqttConnectMessage connect = MqttMessageBuilders.connect() + .protocolVersion(MqttVersion.MQTT_5) + .clientId(null) + .cleanSession(true) + .build(); + PostOffice.RouteResult connectResult = mqttConnection.processConnect(connect); + assertNotNull(connectResult); + assertTrue(connectResult.isSuccess()); + // given that CONN is processed by session event loop and from that is sent also the CONNACK, wait for + // connection to be active. + Awaitility.await() + .atMost(Duration.ofSeconds(1)) + .until(mqttConnection::isConnected); + MqttConnAckMessage connAckMsg = channel.readOutbound(); + assertEquals(MqttMessageType.CONNACK, connAckMsg.fixedHeader().messageType()); + } } diff --git a/broker/src/test/java/io/moquette/broker/RetainBufferTest.java b/broker/src/test/java/io/moquette/broker/RetainBufferTest.java new file mode 100644 index 000000000..88ad1eba0 --- /dev/null +++ b/broker/src/test/java/io/moquette/broker/RetainBufferTest.java @@ -0,0 +1,49 @@ +/* + * + * * Copyright (c) 2012-2024 The original author or authors + * * ------------------------------------------------------ + * * All rights reserved. This program and the accompanying materials + * * are made available under the terms of the Eclipse Public License v1.0 + * * and Apache License v2.0 which accompanies this distribution. + * * + * * The Eclipse Public License is available at + * * http://www.eclipse.org/legal/epl-v10.html + * * + * * The Apache License v2.0 is available at + * * http://www.opensource.org/licenses/apache2.0.php + * * + * * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.broker; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RetainBufferTest { + + + @Test + public void testRetainedDuplicate() { + ByteBuf origin = Unpooled.buffer(10); + assertEquals(1, origin.refCnt()); + + ByteBuf retainedDup = origin.retainedDuplicate(); + assertEquals(2, origin.refCnt()); + assertEquals(2, retainedDup.refCnt()); + } + + @Test + public void testDuplicate() { + ByteBuf origin = Unpooled.buffer(10); + assertEquals(1, origin.refCnt()); + + ByteBuf duplicate = origin.duplicate(); + assertEquals(1, origin.refCnt()); + assertEquals(1, duplicate.refCnt()); + } +} diff --git a/broker/src/test/java/io/moquette/broker/TopicAliasMappingTest.java b/broker/src/test/java/io/moquette/broker/TopicAliasMappingTest.java new file mode 100644 index 000000000..e31db8dee --- /dev/null +++ b/broker/src/test/java/io/moquette/broker/TopicAliasMappingTest.java @@ -0,0 +1,77 @@ +/* + * + * Copyright (c) 2012-2024 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.broker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TopicAliasMappingTest { + + private TopicAliasMapping sut; + + @BeforeEach + void setUp() { + sut = new TopicAliasMapping(); + } + + private static void verifyTopicName(String expected, Optional topicName) { + assertTrue(topicName.isPresent()); + assertEquals(expected, topicName.get()); + } + + @Test + public void givenAnEmptyMappingWhenNewBindingIsRequestedThenShouldBeRetrievable() { + sut.update("sensors/temperature", 12); + + Optional topicName = sut.topicFromAlias(12); + verifyTopicName("sensors/temperature", topicName); + assertEquals(1, sut.size()); + } + + @Test + public void givenAnExistingBindingWhenNewBindingWithSameTopicNameIsRequestedThenAliasIsUpdated() { + // first binding + sut.update("sensors/temperature", 12); + + // second update with different alias + sut.update("sensors/temperature", 22); + assertEquals(1, sut.size()); + + Optional topicName = sut.topicFromAlias(22); + verifyTopicName("sensors/temperature", topicName); + } + + @Test + public void givenAnExistingBindingWhenNewBindingWithDifferentTopicNameAndSameTopicAliasIsRequestedThenAliasIsUpdated() { + // first binding + sut.update("sensors/temperature", 12); + + // second update with different name + sut.update("finance/quotes", 12); + assertEquals(1, sut.size()); + + Optional topicName = sut.topicFromAlias(12); + verifyTopicName("finance/quotes", topicName); + } +} diff --git a/distribution/src/main/resources/moquette.conf b/distribution/src/main/resources/moquette.conf index 0787ed06d..b863cced4 100644 --- a/distribution/src/main/resources/moquette.conf +++ b/distribution/src/main/resources/moquette.conf @@ -223,3 +223,14 @@ password_file config/password_file.conf # default: 2 (exactly_once) #********************************************************************* # max_server_granted_qos 2 + +#********************************************************************* +# Maximum value acceptable value by the broker as topic alias +# +# topic_alias_maximum: +# This option is used to enable and limit the number of topic alias entries on the local cache. +# By default is 0, which means that topic alias functionality is disabled. Any value in range 0..65535 is accepted, +# but given that's a cache size per connection, is better to limit in low tens. +# default: 0 (disabled) +#********************************************************************* +# topic_alias_maximum 16