Skip to content

Commit

Permalink
Implement topic alias management (moquette-io#873)
Browse files Browse the repository at this point in the history
Implemented handling of topic alias received by publishers. Doesn't implement the alias remapping in forwarded publishes.

Updates CONNACK to set topic alias maximum property if user explicitly configures topic_alias_maximum configuration setting.
Adds a topic alias to topic name cache to MQTTConnection, named TopicAliasMapping.
Updates PUBLISH processing to handled the incoming topic alias, checking the various error condition (respect to the specification), resolve the alias to topic name and forward processing of publish message with topic name and without topic alias.
Added unit tests to MQTTConnection to cover the feature.
  • Loading branch information
andsel authored Nov 26, 2024
1 parent 9bb4d90 commit facdc39
Show file tree
Hide file tree
Showing 11 changed files with 537 additions and 21 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Topic alias: implemented handling of topic alias received by publishers. (#873)
[feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858)
[feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848)
[feature] Flow-control: implemented publish's quota management on the server side. (#852)
Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ public final class BrokerConstants {
public static final String BUGSNAG_ENABLE_PROPERTY_NAME = "use_bugsnag";
public static final String BUGSNAG_TOKEN_PROPERTY_NAME = "bugsnag.token";

public static final String STORAGE_CLASS_NAME = "storage_class";

@Deprecated
public static final String PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME = IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME;

Expand All @@ -134,6 +132,8 @@ public final class BrokerConstants {
public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF;
public static final int RECEIVE_MAXIMUM = 65 * 1024;

public static final int DISABLED_TOPIC_ALIAS = 0;

private BrokerConstants() {
}
}
16 changes: 16 additions & 0 deletions broker/src/main/java/io/moquette/broker/BrokerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class BrokerConfiguration {
private final boolean allowZeroByteClientId;
private final boolean reauthorizeSubscriptionsOnConnect;
private final int bufferFlushMillis;
private final int topicAliasMaximum;
// integer max value means that the property is unset
private int receiveMaximum;

Expand Down Expand Up @@ -67,6 +68,8 @@ class BrokerConfiguration {
}

receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM);

topicAliasMaximum = props.intProp(IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, BrokerConstants.DISABLED_TOPIC_ALIAS);
}

// test method
Expand All @@ -86,12 +89,21 @@ public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUser
// test method
public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis, int receiveMaximum) {
this(allowAnonymous, peerCertificateAsUsername, allowZeroByteClientId, reauthorizeSubscriptionsOnConnect,
bufferFlushMillis, receiveMaximum, BrokerConstants.DISABLED_TOPIC_ALIAS);
}

// test method
public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis, int receiveMaximum,
int topicAliasMaximum) {
this.allowAnonymous = allowAnonymous;
this.peerCertificateAsUsername = peerCertificateAsUsername;
this.allowZeroByteClientId = allowZeroByteClientId;
this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect;
this.bufferFlushMillis = bufferFlushMillis;
this.receiveMaximum = receiveMaximum;
this.topicAliasMaximum = topicAliasMaximum;
}

public boolean isAllowAnonymous() {
Expand All @@ -117,4 +129,8 @@ public int getBufferFlushMillis() {
public int receiveMaximum() {
return receiveMaximum;
}

public int topicAliasMaximum() {
return topicAliasMaximum;
}
}
115 changes: 107 additions & 8 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.security.cert.CertificateEncodingException;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -67,12 +68,29 @@ final class MQTTConnection {
private final IAuthenticator authenticator;
private final SessionRegistry sessionRegistry;
private final PostOffice postOffice;
private final int topicAliasMaximum;
private volatile boolean connected;
private final AtomicInteger lastPacketId = new AtomicInteger(0);
private Session bindedSession;
private int protocolVersion;
private Quota receivedQuota;
private Quota sendQuota;
private TopicAliasMapping aliasMappings;

static final class ErrorCodeException extends Exception {

private final String hexErrorCode;
private final MqttReasonCodes.Disconnect errorCode;

public ErrorCodeException(MqttReasonCodes.Disconnect disconnectError) {
errorCode = disconnectError;
hexErrorCode = Integer.toHexString(disconnectError.byteValue());
}

public MqttReasonCodes.Disconnect getErrorCode() {
return errorCode;
}
}

MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator,
SessionRegistry sessionRegistry, PostOffice postOffice) {
Expand All @@ -83,6 +101,7 @@ final class MQTTConnection {
this.postOffice = postOffice;
this.connected = false;
this.protocolVersion = UNDEFINED_VERSION;
this.topicAliasMaximum = brokerConfig.topicAliasMaximum();
}

void handleMessage(MqttMessage msg) {
Expand Down Expand Up @@ -225,6 +244,12 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
channel.close().addListener(CLOSE_ON_FAILURE);
return PostOffice.RouteResult.failed(clientId);
}

// initialize topic alias mapping
if (isProtocolVersion(msg, MqttVersion.MQTT_5)) {
aliasMappings = new TopicAliasMapping();
}

receivedQuota = createQuota(brokerConfig.receiveMaximum());

sendQuota = retrieveSendQuota(msg);
Expand Down Expand Up @@ -328,6 +353,11 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
if (receivedQuota.hasLimit()) {
connAckPropertiesBuilder.receiveMaximum(receivedQuota.getMaximum());
}

if (topicAliasMaximum != BrokerConstants.DISABLED_TOPIC_ALIAS) {
connAckPropertiesBuilder.topicAliasMaximum(topicAliasMaximum);
}

final MqttProperties ackProperties = connAckPropertiesBuilder.build();
connAckBuilder.properties(ackProperties);
}
Expand Down Expand Up @@ -667,7 +697,26 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
final String clientId = getClientId();
final int messageID = msg.variableHeader().packetId();
LOG.trace("Processing PUBLISH message, topic: {}, messageId: {}, qos: {}", topicName, messageID, qos);
final Topic topic = new Topic(topicName);
Topic topic = new Topic(topicName);

if (isProtocolVersion5()) {
MqttProperties.MqttProperty topicAlias = msg.variableHeader()
.properties().getProperty(MqttProperties.MqttPropertyType.TOPIC_ALIAS.value());
if (topicAlias != null) {
try {
Optional<String> mappedTopicName = updateAndMapTopicAlias((MqttProperties.IntegerProperty) topicAlias, topicName);
final String translatedTopicName = mappedTopicName.orElse(topicName);
msg = copyPublishMessageExceptTopicAlias(msg, translatedTopicName, messageID);
topic = new Topic(translatedTopicName);
} catch (ErrorCodeException e) {
brokerDisconnect(e.getErrorCode());
disconnectSession();
dropConnection();
return PostOffice.RouteResult.failed(clientId);
}
}
}

if (!topic.isValid()) {
LOG.debug("Drop connection because of invalid topic format");
dropConnection();
Expand All @@ -683,16 +732,17 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
// retain else msg is cleaned by the NewNettyMQTTHandler and is not available
// in execution by SessionEventLoop
Utils.retain(msg, PostOffice.BT_PUB_IN);
final MqttPublishMessage finalMsg = msg;
switch (qos) {
case AT_MOST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS0", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession()) {
return null;
}
postOffice.receivedPublishQos0(this, username, clientId, msg, expiry);
postOffice.receivedPublishQos0(this, username, clientId, finalMsg, expiry);
return null;
}).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed"));
}).ifFailed(() -> Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - failed"));
case AT_LEAST_ONCE:
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota);
Expand All @@ -708,16 +758,16 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
if (!isBoundToSession())
return null;
receivedQuota.consumeSlot();
postOffice.receivedPublishQos1(this, username, messageID, msg, expiry)
postOffice.receivedPublishQos1(this, username, messageID, finalMsg, expiry)
.completableFuture().thenRun(() -> {
receivedQuota.releaseSlot();
});
return null;
}).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed"));
}).ifFailed(() -> Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - failed"));
case EXACTLY_ONCE: {
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota);
Utils.release(msg, PostOffice.BT_PUB_IN + " - phase 1 QoS2 exceeded quota");
Utils.release(finalMsg, PostOffice.BT_PUB_IN + " - phase 1 QoS2 exceeded quota");
brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED);
disconnectSession();
dropConnection();
Expand All @@ -728,7 +778,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
bindedSession.receivedPublishQos2(messageID, msg);
bindedSession.receivedPublishQos2(messageID, finalMsg);
receivedQuota.consumeSlot();
return null;
});
Expand All @@ -738,7 +788,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
return firstStepResult;
}
firstStepResult.completableFuture().thenRun(() ->
postOffice.receivedPublishQos2(this, msg, username, expiry).completableFuture()
postOffice.receivedPublishQos2(this, finalMsg, username, expiry).completableFuture()
);
return firstStepResult;
}
Expand All @@ -748,6 +798,55 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
}
}

private Optional<String> updateAndMapTopicAlias(MqttProperties.IntegerProperty topicAlias, String topicName) throws ErrorCodeException {
if (topicAliasMaximum == BrokerConstants.DISABLED_TOPIC_ALIAS) {
// client is sending a topic alias when the feature was disabled form the server
LOG.info("Dropping connection {}, received a PUBLISH with topic alias while the feature is not enaled on the broker", channel);
throw new ErrorCodeException(MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
}
MqttProperties.IntegerProperty topicAliasTyped = topicAlias;
if (topicAliasTyped.value() == 0 || topicAliasTyped.value() > topicAliasMaximum) {
// invalid topic alias value
LOG.info("Dropping connection {}, received a topic alias ({}) outside of range (0..{}]",
channel, topicAliasTyped.value(), topicAliasMaximum);
throw new ErrorCodeException(MqttReasonCodes.Disconnect.TOPIC_ALIAS_INVALID);
}

Optional<String> mappedTopicName = aliasMappings.topicFromAlias(topicAliasTyped.value());
if (mappedTopicName.isPresent()) {
// already established a mapping for the Topic Alias
if (topicName != null) {
// contains a topic name then update the mapping
aliasMappings.update(topicName, topicAliasTyped.value());
}
} else {
// does not already have a mapping for this Topic Alias
if (topicName.isEmpty()) {
// protocol error, not present mapping, topic alias is present and no topic name
throw new ErrorCodeException(MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
}
// update the mapping
aliasMappings.update(topicName, topicAliasTyped.value());
}
return mappedTopicName;
}

private static MqttPublishMessage copyPublishMessageExceptTopicAlias(MqttPublishMessage msg, String translatedTopicName, int messageID) {
// Replace the topicName with the one retrieved and remove the topic alias property
// to avoid to forward or store.
final MqttProperties rewrittenProps = new MqttProperties();
for (MqttProperties.MqttProperty property : msg.variableHeader().properties().listAll()) {
if (property.propertyId() == MqttProperties.MqttPropertyType.TOPIC_ALIAS.value()) {
continue;
}
rewrittenProps.add(property);
}

MqttPublishVariableHeader rewrittenVariableHeader =
new MqttPublishVariableHeader(translatedTopicName, messageID, rewrittenProps);
return new MqttPublishMessage(msg.fixedHeader(), rewrittenVariableHeader, msg.payload());
}

private Instant extractExpiryFromProperty(MqttPublishMessage msg) {
MqttProperties.MqttProperty expiryProp = msg.variableHeader()
.properties()
Expand Down
49 changes: 49 additions & 0 deletions broker/src/main/java/io/moquette/broker/TopicAliasMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* Copyright (c) 2012-2024 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*
*/

package io.moquette.broker;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Implements a mapping cache so that the binding key -> value is also reversed,
* it maintains the invariant to be a 1:1 mapping.
* */
class TopicAliasMapping {

private final Map<String, Integer> direct = new HashMap<>();
private final Map<Integer, String> reversed = new HashMap<>();

void update(String topicName, int topicAlias) {
Integer previousAlias = direct.put(topicName, topicAlias);
if (previousAlias != null) {
reversed.remove(previousAlias);
}
reversed.put(topicAlias, topicName);
}

Optional<String> topicFromAlias(int topicAlias) {
return Optional.ofNullable(reversed.get(topicAlias));
}

int size() {
return reversed.size();
}
}
12 changes: 9 additions & 3 deletions broker/src/main/java/io/moquette/broker/config/FluentConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@
import static io.moquette.broker.config.IConfig.DATA_PATH_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE;
import static io.moquette.broker.config.IConfig.ENABLE_TELEMETRY_NAME;
import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.HOST_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.JKS_PATH_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.KEY_STORE_TYPE;
import static io.moquette.broker.config.IConfig.NETTY_MAX_BYTES_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PASSWORD_FILE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME;
import static io.moquette.broker.config.IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.RECEIVE_MAXIMUM;
import static io.moquette.broker.config.IConfig.HOST_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SESSION_QUEUE_SIZE;
import static io.moquette.broker.config.IConfig.SSL_PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.SSL_PROVIDER;
import static io.moquette.broker.config.IConfig.TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.WEB_SOCKET_PORT_PROPERTY_NAME;

/**
Expand Down Expand Up @@ -209,6 +210,11 @@ public FluentConfig receiveMaximum(int receiveMaximum) {
return this;
}

public FluentConfig topicAliasMaximum(int topicAliasMaximum) {
configAccumulator.put(TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME, Integer.valueOf(topicAliasMaximum).toString());
return this;
}

public class TLSConfig {

private SSLProvider providerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public abstract class IConfig {
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size";
public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos";
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;
public static final String TOPIC_ALIAS_MAXIMUM_PROPERTY_NAME = "topic_alias_maximum";

public abstract void setProperty(String name, String value);

Expand Down
Loading

0 comments on commit facdc39

Please sign in to comment.