Skip to content

Commit

Permalink
Fix/resolve flaky error in tests (#872)
Browse files Browse the repository at this point in the history
Fixes some flaky test acting on 3 things:
- drop from custom low level client the callback mechanism and the queue, but resort just in the queued messages an wait blocking on it.
- updates the Publish collect based on Paho client to release the latch only after having copied the data, to avoid nulls due to read ordering
From 
```java
latch.countDown();
receivedTopic = topic;
receivedMessage = message;
```
to
```java
receivedTopic = topic;
receivedMessage = message;
latch.countDown();
```
- in Hive client usages, register the listener to subscribe before the verification and not during the verification. This makes the code less linear, but it's the intended use of the API.
  • Loading branch information
andsel authored Nov 23, 2024
1 parent c076193 commit 9bb4d90
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ public void testWillMessageIsFiredOnClientKeepAliveExpiry() throws Exception {
@Test
public void testRejectConnectWithEmptyClientID() throws InterruptedException {
LOG.info("*** testRejectConnectWithEmptyClientID ***");
m_client.clientId("").connect();

this.receivedMsg = this.m_client.lastReceivedMessage();
this.receivedMsg = m_client.clientId("").connect();

assertTrue(receivedMsg instanceof MqttConnAckMessage);
MqttConnAckMessage connAck = (MqttConnAckMessage) receivedMsg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public void tearDown() throws Exception {
super.tearDown();
}

void connectLowLevel() {
void connectLowLevel() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
assertConnectionAccepted(connAck, "Connection must be accepted");
}

void connectLowLevel(int keepAliveSecs) {
void connectLowLevel(int keepAliveSecs) throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs, BrokerConstants.INFLIGHT_WINDOW_SIZE);
assertConnectionAccepted(connAck, "Connection must be accepted");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,35 +148,39 @@ protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer<V
}
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer<Void> action, MqttQos expectedQos,
protected static void verifyNoPublish(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer<Void> action, Duration timeout, String message) throws InterruptedException {
action.accept(null);
Optional<Mqtt5Publish> publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS);

// verify no published will in 10 seconds
assertFalse(publishedMessage.isPresent(), message);
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient.Mqtt5Publishes publishes, Consumer<Void> action, MqttQos expectedQos,
String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
}

static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) {
assertEquals(mqttMessageType, received.fixedHeader().messageType());
}

static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
static void verifyPublishMessage(Mqtt5BlockingClient.Mqtt5Publishes publishListener, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
Optional<Mqtt5Publish> publishMessage = publishListener.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testAckResponseProperties() {
}

@Test
public void testAssignedClientIdentifier() {
public void testAssignedClientIdentifier() throws InterruptedException {
Client unnamedClient = new Client("localhost").clientId("");
connAck = unnamedClient.connectV5();
assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected");
Expand Down
76 changes: 41 additions & 35 deletions broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void simpleConnect() {
}

@Test
public void sendConnectOnDisconnectedConnection() {
public void sendConnectOnDisconnectedConnection() throws InterruptedException {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
TestUtils.assertConnectionAccepted(connAck, "Connection must be accepted");
lowLevelClient.disconnect();
Expand All @@ -85,7 +85,7 @@ public void sendConnectOnDisconnectedConnection() {
}

@Test
public void receiveInflightPublishesAfterAReconnect() {
public void receiveInflightPublishesAfterAReconnect() throws InterruptedException {
final Mqtt5BlockingClient publisher = MqttClient.builder()
.useMqttVersion5()
.identifier("publisher")
Expand Down Expand Up @@ -180,30 +180,32 @@ public void avoidToFirePreviouslyScheduledWillWhenSameClientIDReconnects() throw
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// client trigger a will message, disconnecting with bad reason code
final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET)
.build();
clientWithWill.disconnect(malformedPacketReason);
// client trigger a will message, disconnecting with bad reason code
final Mqtt5Disconnect malformedPacketReason = Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.MALFORMED_PACKET)
.build();
clientWithWill.disconnect(malformedPacketReason);

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// reconnect another client with same clientId
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected");

}, Duration.ofSeconds(10));
// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// reconnect another client with same clientId
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
Mqtt5ConnAck connectAck = client.connect();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connectAck.getReasonCode(), "Client connected");

}, Duration.ofSeconds(10));
}
}

private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient testamentSubscriber, Consumer<Void> action, Duration timeout) throws InterruptedException {
verifyNoPublish(testamentSubscriber, action, timeout, "No will message should be published");
private static void verifyNoTestamentIsPublished(Mqtt5BlockingClient.Mqtt5Publishes testamentListener, Consumer<Void> action, Duration timeout) throws InterruptedException {
verifyNoPublish(testamentListener, action, timeout, "No will message should be published");
}

@Test
Expand All @@ -230,12 +232,14 @@ public void noWillMessageIsFiredOnNormalDisconnection() throws InterruptedExcept
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// normal session disconnection
clientWithWill.disconnect(Mqtt5Disconnect.builder().build());
}, Duration.ofSeconds(10));
// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// normal session disconnection
clientWithWill.disconnect(Mqtt5Disconnect.builder().build());
}, Duration.ofSeconds(10));
}
}

@Test
Expand All @@ -245,14 +249,16 @@ public void givenClientWithWillThatCleanlyDisconnectsWithWillShouldTriggerTheTes
final Mqtt5BlockingClient clientWithWill = createAndConnectClientWithWillTestament(clientId, 10, 60);

final Mqtt5BlockingClient testamentSubscriber = createAndConnectClientListeningToTestament();

// wait no will is published
verifyNoTestamentIsPublished(testamentSubscriber, unused -> {
// normal session disconnection with will
clientWithWill.disconnect(Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE)
.build());
}, Duration.ofSeconds(10));
try (Mqtt5BlockingClient.Mqtt5Publishes testamentListener = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {

// wait no will is published
verifyNoTestamentIsPublished(testamentListener, unused -> {
// normal session disconnection with will
clientWithWill.disconnect(Mqtt5Disconnect.builder()
.reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE)
.build());
}, Duration.ofSeconds(10));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
Expand Down Expand Up @@ -65,17 +66,19 @@ public void givenAPublishWithContentTypeWhenForwardedToSubscriberThenIsPresent()
.send();

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.contentType("application/json")
.qos(MqttQos.AT_MOST_ONCE)
.send();
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
publisher.publishWith()
.topic("temperature/living")
.payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8))
.contentType("application/json")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present");
assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched");
});
verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present");
assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched");
});
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.moquette.integration.mqtt5.TestUtils.assertConnectionAccepted;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
Expand Down Expand Up @@ -73,7 +72,7 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect
// sixth should exceed quota and the client should get a disconnect
sendQoS2Publish();

MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage();
MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(MqttMessageType.DISCONNECT, receivedMsg.fixedHeader().messageType(),
"On sixth in flight message the send quota is exhausted and response should be DISCONNECT");
MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMsg.variableHeader();
Expand All @@ -83,8 +82,8 @@ public void givenServerWithReceiveMaximumWhenClientPassSendQuotaThenIsDisconnect
assertTrue(lowLevelClient.isConnectionLost(), "Connection MUST be closed by the server");
}

private void verifyReceived(MqttMessageType expectedMessageType) {
MqttMessage receivedMsg = lowLevelClient.lastReceivedMessage();
private void verifyReceived(MqttMessageType expectedMessageType) throws InterruptedException {
MqttMessage receivedMsg = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(expectedMessageType, receivedMsg.fixedHeader().messageType());
}

Expand All @@ -94,7 +93,7 @@ private void sendQoS2Publish() {
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, MqttProperties.NO_PROPERTIES);
ByteBuf payload = Unpooled.wrappedBuffer("18°C".getBytes(StandardCharsets.UTF_8));
MqttPublishMessage publishQoS2 = new MqttPublishMessage(fixedHeader, variableHeader, payload);
lowLevelClient.publish(publishQoS2, 500, TimeUnit.MILLISECONDS);
lowLevelClient.publish(publishQoS2);
}

@Override
Expand Down Expand Up @@ -159,6 +158,7 @@ public void givenClientThatReconnectWithSmallerReceiveMaximumThenForwardCorrectl
// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123);

verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
Expand Down Expand Up @@ -68,13 +69,16 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimePassedThenRetainedIs

// subscribe to same topic and verify no message
Mqtt5BlockingClient subscriber = createSubscriberClient();
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyNoPublish(subscriber, v -> {}, Duration.ofSeconds(2),
"Subscriber must not receive any retained message");
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
subscriber.subscribeWith()
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyNoPublish(publishes, v -> {
}, Duration.ofSeconds(2),
"Subscriber must not receive any retained message");
}
}

// TODO verify the elapsed
Expand Down Expand Up @@ -153,7 +157,7 @@ public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThe

// subscribe with an identifier
MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living",
MqttQoS.AT_LEAST_ONCE, 123, 500, TimeUnit.MILLISECONDS);
MqttQoS.AT_LEAST_ONCE, 123, Duration.ofMillis(500));
verifyOfType(received, MqttMessageType.SUBACK);

//lowlevel client doesn't ACK any pub, so the in flight window fills up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
Expand All @@ -69,18 +71,19 @@ public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThen
.topicFilter("temperature/living")
.qos(MqttQos.AT_MOST_ONCE)
.send();
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.qos(MqttQos.AT_MOST_ONCE)
.send();

Mqtt5BlockingClient publisher = createPublisherClient();
publisher.publishWith()
.topic("temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
.qos(MqttQos.AT_MOST_ONCE)
.send();

verifyPublishMessage(subscriber, msgPub -> {
assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
});
verifyPublishMessage(publishes, msgPub -> {
assertTrue(msgPub.getPayloadFormatIndicator().isPresent());
});
}
}

@Test
Expand Down Expand Up @@ -168,10 +171,10 @@ public void givenNotValidUTF8StringInPublishQoS0WhenPayloadFormatIndicatorIsSetT
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, props);
MqttPublishMessage publishQoS0 = new MqttPublishMessage(fixedHeader, variableHeader, Unpooled.wrappedBuffer(INVALID_UTF_8_BYTES));
// in a reasonable amount of time (say 500 ms) it should receive a DISCONNECT
lowLevelClient.publish(publishQoS0, 500, TimeUnit.MILLISECONDS);
lowLevelClient.publish(publishQoS0);

// Verify a DISCONNECT is received with PAYLOAD_FORMAT_INVALID reason code and connection is closed
final MqttMessage receivedMessage = lowLevelClient.lastReceivedMessage();
final MqttMessage receivedMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(500));
assertEquals(MqttMessageType.DISCONNECT, receivedMessage.fixedHeader().messageType());
MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMessage.variableHeader();
assertEquals(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID.byteValue(), disconnectHeader.reasonCode(),
Expand Down
Loading

0 comments on commit 9bb4d90

Please sign in to comment.