Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/dont send ack on new connection #428

Merged
merged 7 commits into from
Aug 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ public interface InternalLogger {

void error(@NotNull String message, @NotNull Throwable throwable);

void error(@NotNull String format, @NotNull Object arg);

void error(@NotNull String format, @NotNull Object arg1, @NotNull Object arg2);

void warn(@NotNull String message);

void warn(@NotNull String format, @NotNull Object... args);
void warn(@NotNull String format, @NotNull Object arg);

void warn(@NotNull String format, @NotNull Object arg1, @NotNull Object arg2);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class InternalLoggerFactory {
if (SLF4J_AVAILABLE) {
return new InternalSlf4jLogger(clazz);
}
return new InternalNoopLogger(clazz);
return InternalNoopLogger.INSTANCE;
}

private InternalLoggerFactory() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,28 @@
*/
class InternalNoopLogger implements InternalLogger {

InternalNoopLogger(@SuppressWarnings("unused") final @NotNull Class<?> clazz) {}
static final @NotNull InternalLogger INSTANCE = new InternalNoopLogger();

private InternalNoopLogger() {}

@Override
public void error(final @NotNull String message) {}

@Override
public void error(final @NotNull String message, final @NotNull Throwable throwable) {}

@Override
public void error(final @NotNull String format, final @NotNull Object arg) {}

@Override
public void error(final @NotNull String format, final @NotNull Object arg1, final @NotNull Object arg2) {}

@Override
public void warn(final @NotNull String message) {}

@Override
public void warn(final @NotNull String format, final @NotNull Object... args) {}
public void warn(final @NotNull String format, final @NotNull Object arg) {}

@Override
public void warn(final @NotNull String format, final @NotNull Object arg1, final @NotNull Object arg2) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,28 @@ public void error(final @NotNull String message, final @NotNull Throwable throwa
delegate.error(message, throwable);
}

@Override
public void error(final @NotNull String format, final @NotNull Object arg) {
delegate.error(format, arg);
}

@Override
public void error(final @NotNull String format, final @NotNull Object arg1, final @NotNull Object arg2) {
delegate.error(format, arg1, arg2);
}

@Override
public void warn(final @NotNull String message) {
delegate.warn(message);
}

@Override
public void warn(final @NotNull String format, final @NotNull Object... args) {
delegate.warn(format, args);
public void warn(final @NotNull String format, final @NotNull Object arg) {
delegate.warn(format, arg);
}

@Override
public void warn(final @NotNull String format, final @NotNull Object arg1, final @NotNull Object arg2) {
delegate.warn(format, arg1, arg2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,20 @@
*/
class MqttIncomingPublishConfirmable implements Confirmable, Runnable {

private final long id;
private final @NotNull MqttIncomingPublishFlow flow;
private final @NotNull MqttMatchingPublishFlows flows;
private final @NotNull MqttStatefulPublishWithFlows publishWithFlows;
private final @NotNull AtomicBoolean confirmed = new AtomicBoolean(false);

MqttIncomingPublishConfirmable(
final long id, final @NotNull MqttIncomingPublishFlow flow, final @NotNull MqttMatchingPublishFlows flows) {
final @NotNull MqttIncomingPublishFlow flow, final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {

this.id = id;
this.flow = flow;
this.flows = flows;
this.publishWithFlows = publishWithFlows;
}

@Override
public long getId() {
return id;
return publishWithFlows.id;
}

@Override
Expand All @@ -55,7 +53,7 @@ public boolean confirm() {

@Override
public void run() {
flows.acknowledge(flow);
publishWithFlows.acknowledge(flow);
}

static class Qos0 implements Confirmable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.hivemq.client.internal.annotations.NotThreadSafe;
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicFilterImpl;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscription;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
Expand Down Expand Up @@ -118,34 +117,26 @@ void cancelGlobal(final @NotNull MqttGlobalIncomingPublishFlow flow) {
}
}

@NotNull MqttMatchingPublishFlows findMatching(final @NotNull MqttStatefulPublish publish) {
final MqttMatchingPublishFlows matchingFlows = new MqttMatchingPublishFlows();
findMatching(publish, matchingFlows);
return matchingFlows;
}

void findMatching(
final @NotNull MqttStatefulPublish publish, final @NotNull MqttMatchingPublishFlows matchingFlows) {

subscribedFlows.findMatching(publish.stateless().getTopic(), matchingFlows);
if (matchingFlows.subscriptionFound) {
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.SUBSCRIBED.ordinal()]);
void findMatching(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
subscribedFlows.findMatching(publishWithFlows);
if (publishWithFlows.subscriptionFound) {
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.SUBSCRIBED.ordinal()]);
} else {
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.UNSOLICITED.ordinal()]);
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.UNSOLICITED.ordinal()]);
}
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.ALL.ordinal()]);
if (matchingFlows.isEmpty()) {
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.REMAINING.ordinal()]);
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.ALL.ordinal()]);
if (publishWithFlows.isEmpty()) {
add(publishWithFlows, globalFlows[MqttGlobalPublishFilter.REMAINING.ordinal()]);
}
}

private static void add(
final @NotNull MqttMatchingPublishFlows matchingPublishFlows,
final @NotNull MqttStatefulPublishWithFlows publishWithFlows,
final @Nullable HandleList<MqttGlobalIncomingPublishFlow> globalFlows) {

if (globalFlows != null) {
for (Handle<MqttGlobalIncomingPublishFlow> h = globalFlows.getFirst(); h != null; h = h.getNext()) {
matchingPublishFlows.add(h.getElement());
publishWithFlows.add(h.getElement());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.hivemq.client.internal.logging.InternalLoggerFactory;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.util.collections.ChunkedArrayQueue;
import com.hivemq.client.internal.util.collections.HandleList.Handle;
import com.hivemq.client.mqtt.datatypes.MqttQos;
Expand All @@ -41,10 +40,10 @@ class MqttIncomingPublishService {
private final @NotNull MqttIncomingQosHandler incomingQosHandler;
final @NotNull MqttIncomingPublishFlows incomingPublishFlows;

private final @NotNull ChunkedArrayQueue<Object> qos0Queue = new ChunkedArrayQueue<>(32);
private final ChunkedArrayQueue<Object>.@NotNull Iterator qos0It = qos0Queue.iterator();
private final @NotNull ChunkedArrayQueue<Object> qos1Or2Queue = new ChunkedArrayQueue<>(32);
private final ChunkedArrayQueue<Object>.@NotNull Iterator qos1Or2It = qos1Or2Queue.iterator();
private final @NotNull ChunkedArrayQueue<MqttStatefulPublishWithFlows> qos0Queue = new ChunkedArrayQueue<>(32);
private final ChunkedArrayQueue<MqttStatefulPublishWithFlows>.@NotNull Iterator qos0It = qos0Queue.iterator();
private final @NotNull ChunkedArrayQueue<MqttStatefulPublishWithFlows> qos1Or2Queue = new ChunkedArrayQueue<>(32);
private final ChunkedArrayQueue<MqttStatefulPublishWithFlows>.@NotNull Iterator qos1Or2It = qos1Or2Queue.iterator();

private long nextQoS1Or2PublishId = 1;

Expand All @@ -61,13 +60,12 @@ class MqttIncomingPublishService {
}

@CallByThread("Netty EventLoop")
void onPublishQos0(final @NotNull MqttStatefulPublish publish, final int receiveMaximum) {
if (qos0Queue.size() >= (2 * receiveMaximum)) { // TODO receiveMaximum
void onPublishQos0(final @NotNull MqttStatefulPublishWithFlows publishWithFlows, final int receiveMaximum) {
if (qos0Queue.size() >= receiveMaximum) { // TODO receiveMaximum
SgtSilvio marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.warn("QoS 0 publish message dropped.");
if (QOS_0_DROP_OLDEST) {
qos0It.reset();
qos0It.next();
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos0It.next();
final MqttStatefulPublishWithFlows flows = qos0It.next();
qos0It.remove();
for (Handle<MqttIncomingPublishFlow> h = flows.getFirst(); h != null; h = h.getNext()) {
if (h.getElement().dereference() == 0) {
Expand All @@ -78,43 +76,40 @@ void onPublishQos0(final @NotNull MqttStatefulPublish publish, final int receive
return;
}
}
final MqttMatchingPublishFlows flows = onPublish(publish);
if (!flows.isEmpty()) {
qos0Queue.offer(publish);
qos0Queue.offer(flows);
onPublish(publishWithFlows);
if (!publishWithFlows.isEmpty()) {
qos0Queue.offer(publishWithFlows);
}
}

@CallByThread("Netty EventLoop")
boolean onPublishQos1Or2(final @NotNull MqttStatefulPublish publish, final int receiveMaximum) {
if (qos1Or2Queue.size() >= (2 * receiveMaximum)) {
boolean onPublishQos1Or2(final @NotNull MqttStatefulPublishWithFlows publishWithFlows, final int receiveMaximum) {
if (qos1Or2Queue.size() >= receiveMaximum) {
return false; // flow control error
}
publish.setId(nextQoS1Or2PublishId++);
final MqttMatchingPublishFlows flows = onPublish(publish);
if (qos1Or2Queue.isEmpty() && flows.isEmpty() && flows.areAcknowledged()) {
incomingQosHandler.ack(publish);
publishWithFlows.id = nextQoS1Or2PublishId++;
onPublish(publishWithFlows);
if (qos1Or2Queue.isEmpty() && publishWithFlows.isEmpty() && publishWithFlows.areAcknowledged()) {
incomingQosHandler.ack(publishWithFlows);
} else {
qos1Or2Queue.offer(publish);
qos1Or2Queue.offer(flows);
qos1Or2Queue.offer(publishWithFlows);
}
return true;
}

@CallByThread("Netty EventLoop")
private @NotNull MqttMatchingPublishFlows onPublish(final @NotNull MqttStatefulPublish publish) {
final MqttMatchingPublishFlows flows = incomingPublishFlows.findMatching(publish);
if (flows.isEmpty()) {
LOGGER.warn("No publish flow registered for {}.", publish);
private void onPublish(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
incomingPublishFlows.findMatching(publishWithFlows);
if (publishWithFlows.isEmpty()) {
LOGGER.warn("No publish flow registered for {}.", publishWithFlows.publish);
}
drain();
for (Handle<MqttIncomingPublishFlow> h = flows.getFirst(); h != null; h = h.getNext()) {
for (Handle<MqttIncomingPublishFlow> h = publishWithFlows.getFirst(); h != null; h = h.getNext()) {
if (h.getElement().reference() == 1) {
referencedFlowCount++;
}
}
emit(publish, flows);
return flows;
emit(publishWithFlows);
}

@CallByThread("Netty EventLoop")
Expand All @@ -124,22 +119,20 @@ void drain() {

qos1Or2It.reset();
while (qos1Or2It.hasNext()) {
final MqttStatefulPublish publish = (MqttStatefulPublish) qos1Or2It.next();
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos1Or2It.next();
emit(publish, flows);
if ((qos1Or2It.getIterated() == 2) && flows.isEmpty() && flows.areAcknowledged()) {
final MqttStatefulPublishWithFlows publishWithFlows = qos1Or2It.next();
emit(publishWithFlows);
if ((qos1Or2It.getIterated() == 1) && publishWithFlows.isEmpty() && publishWithFlows.areAcknowledged()) {
qos1Or2It.remove();
incomingQosHandler.ack(publish);
incomingQosHandler.ack(publishWithFlows);
} else if (blockingFlowCount == referencedFlowCount) {
return;
}
}
qos0It.reset();
while (qos0It.hasNext()) {
final MqttStatefulPublish publish = (MqttStatefulPublish) qos0It.next();
final MqttMatchingPublishFlows flows = (MqttMatchingPublishFlows) qos0It.next();
emit(publish, flows);
if ((qos0It.getIterated() == 2) && flows.isEmpty()) {
final MqttStatefulPublishWithFlows publishWithFlows = qos0It.next();
emit(publishWithFlows);
if ((qos0It.getIterated() == 1) && publishWithFlows.isEmpty()) {
qos0It.remove();
} else if (blockingFlowCount == referencedFlowCount) {
return;
Expand All @@ -148,32 +141,30 @@ void drain() {
}

@CallByThread("Netty EventLoop")
private void emit(
final @NotNull MqttStatefulPublish statefulPublish, final @NotNull MqttMatchingPublishFlows flows) {

for (Handle<MqttIncomingPublishFlow> h = flows.getFirst(); h != null; h = h.getNext()) {
private void emit(final @NotNull MqttStatefulPublishWithFlows publishWithFlows) {
for (Handle<MqttIncomingPublishFlow> h = publishWithFlows.getFirst(); h != null; h = h.getNext()) {
final MqttIncomingPublishFlow flow = h.getElement();

if (flow.isCancelled()) {
flows.remove(h);
publishWithFlows.remove(h);
if (flow.dereference() == 0) {
referencedFlowCount--;
}
} else {
final long requested = flow.requested(runIndex);
if (requested > 0) {
MqttPublish publish = statefulPublish.stateless();
MqttPublish publish = publishWithFlows.publish.stateless();
if (flow.manualAcknowledgement) {
final Confirmable confirmable;
if (publish.getQos() == MqttQos.AT_MOST_ONCE) {
confirmable = new MqttIncomingPublishConfirmable.Qos0();
} else {
confirmable = new MqttIncomingPublishConfirmable(statefulPublish.getId(), flow, flows);
confirmable = new MqttIncomingPublishConfirmable(flow, publishWithFlows);
}
publish = publish.withConfirmable(confirmable);
}
flow.onNext(publish);
flows.remove(h);
publishWithFlows.remove(h);
if (flow.dereference() == 0) {
referencedFlowCount--;
flow.checkDone();
Expand Down
Loading