Skip to content

Commit

Permalink
Improvement/24035 artifical delay simulation adapter (#481)
Browse files Browse the repository at this point in the history
* simulation adapter > add optional sleep

* simulation adapter > add default case, fix default case

* simulation adapter > cleanup

* simulation adapter > make SimulationProtocolAdapter async

* simulation adapter > make SimulationProtocolAdapter async

* add license headers
  • Loading branch information
DC2-DanielKrueger authored Jul 26, 2024
1 parent b353796 commit 2cc44d7
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ public class SimulationAdapterConfig implements ProtocolAdapterConfig {
defaultValue = "1000")
private int maxValue = 1000;

@JsonProperty("minDelay")
@ModuleConfigField(title = "Minimum of delay",
description = "Minimum of artificial delay before the polling method generates a value",
numberMin = 0,
defaultValue = "0")
private int minDelay = 0;

@JsonProperty("maxDelay")
@ModuleConfigField(title = "Maximum of delay",
description = "Maximum of artificial delay before the polling method generates a value",
numberMin = 0,
defaultValue = "0")
private int maxDelay = 0;

public SimulationAdapterConfig() {
}

Expand All @@ -100,4 +114,12 @@ public int getPollingIntervalMillis() {
public int getMaxPollingErrorsBeforeRemoval() {
return maxPollingErrorsBeforeRemoval;
}

public int getMaxDelay() {
return maxDelay;
}

public int getMinDelay() {
return minDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.hivemq.extension.sdk.api.annotations.NotNull;

import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

import static com.hivemq.adapter.sdk.api.state.ProtocolAdapterState.ConnectionStatus.STATELESS;
Expand All @@ -37,13 +39,17 @@ public class SimulationProtocolAdapter implements PollingProtocolAdapter<Simulat
private final @NotNull ProtocolAdapterInformation adapterInformation;
private final @NotNull SimulationAdapterConfig adapterConfig;
private final @NotNull ProtocolAdapterState protocolAdapterState;
private final @NotNull TimeWaiter timeWaiter;
private static final @NotNull Random RANDOM = new Random();

public SimulationProtocolAdapter(
final @NotNull ProtocolAdapterInformation adapterInformation,
final @NotNull ProtocolAdapterInput<SimulationAdapterConfig> protocolAdapterInput) {
final @NotNull ProtocolAdapterInput<SimulationAdapterConfig> protocolAdapterInput,
final @NotNull TimeWaiter timeWaiter) {
this.adapterInformation = adapterInformation;
this.adapterConfig = protocolAdapterInput.getConfig();
this.protocolAdapterState = protocolAdapterInput.getProtocolAdapterState();
this.timeWaiter = timeWaiter;
this.protocolAdapterState.setConnectionStatus(STATELESS);
}

Expand All @@ -70,12 +76,43 @@ public void stop(@NotNull final ProtocolAdapterStopInput input, @NotNull final P

@Override
public void poll(
final @NotNull PollingInput<SimulationPollingContext> pollingInput, final @NotNull PollingOutput pollingOutput) {
pollingOutput.addDataPoint("sample",
ThreadLocalRandom.current()
.nextDouble(Math.min(adapterConfig.getMinValue(), adapterConfig.getMaxValue()),
Math.max(adapterConfig.getMinValue() + 1, adapterConfig.getMaxValue())));
pollingOutput.finish();
final @NotNull PollingInput<SimulationPollingContext> pollingInput,
final @NotNull PollingOutput pollingOutput) {

final int minDelay = adapterConfig.getMinDelay();
final int maxDelay = adapterConfig.getMaxDelay();

CompletableFuture.runAsync(() -> {
if (minDelay > maxDelay) {
pollingOutput.fail(String.format(
"The configured min '%d' delay was bigger than the max delay '%d'. Simulator Adapter will not publish a value.",
minDelay,
maxDelay));
} else if (minDelay == maxDelay && maxDelay > 0) {
try {
timeWaiter.sleep(minDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
pollingOutput.fail("Thread was interrupted");
return;
}
} else if (maxDelay > 0) {
final int sleepMS = minDelay + RANDOM.nextInt(maxDelay - minDelay);
try {
timeWaiter.sleep(sleepMS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
pollingOutput.fail("Thread was interrupted");
return;
}
}

pollingOutput.addDataPoint("sample",
ThreadLocalRandom.current()
.nextDouble(Math.min(adapterConfig.getMinValue(), adapterConfig.getMaxValue()),
Math.max(adapterConfig.getMinValue() + 1, adapterConfig.getMaxValue())));
pollingOutput.finish();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SimulationProtocolAdapterFactory implements ProtocolAdapterFactory<

@Override
public @NotNull ProtocolAdapter createAdapter(@NotNull final ProtocolAdapterInformation adapterInformation, @NotNull final ProtocolAdapterInput<SimulationAdapterConfig> input) {
return new SimulationProtocolAdapter(adapterInformation, input);
return new SimulationProtocolAdapter(adapterInformation, input, TimeWaiter.INSTANCE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.edge.modules.adapters.simulation;

import com.hivemq.extension.sdk.api.annotations.NotNull;

import java.util.concurrent.TimeUnit;

public class TimeWaiter {
public static @NotNull TimeWaiter INSTANCE = new TimeWaiter();

public void sleep(int millis) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(millis);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.edge.modules.adapters.simulation;

import com.hivemq.adapter.sdk.api.model.ProtocolAdapterInput;
import com.hivemq.edge.modules.adapters.data.ProtocolAdapterDataSampleImpl;
import com.hivemq.edge.modules.adapters.impl.ProtocolAdapterStateImpl;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.protocols.PollingInputImpl;
import com.hivemq.protocols.PollingOutputImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;

import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings({"rawtypes", "unchecked"})
class SimulationProtocolAdapterTest {

private final @NotNull ProtocolAdapterInput input = mock();
private final @NotNull SimulationAdapterConfig protocolAdapterConfig = mock();
private @NotNull SimulationProtocolAdapter simulationProtocolAdapter;
private final @NotNull SimulationPollingContext simulationPollingContext =
new SimulationPollingContext("test", 1, null);
private final @NotNull PollingInputImpl pollingInput = new PollingInputImpl(simulationPollingContext);
private final @NotNull PollingOutputImpl pollingOutput =
new PollingOutputImpl(new ProtocolAdapterDataSampleImpl(simulationPollingContext));
private final @NotNull TimeWaiter timeWaiter = mock();


@BeforeEach
void setUp() {
when(input.getProtocolAdapterState()).thenReturn(new ProtocolAdapterStateImpl(mock(),
"simulation",
"test-simulator"));
when(input.getConfig()).thenReturn(protocolAdapterConfig);
simulationProtocolAdapter =
new SimulationProtocolAdapter(SimulationProtocolAdapterInformation.INSTANCE, input, timeWaiter);
}

@Test
void test_poll_whenMinDelayIsBiggerThanMax_thenExecutionException() throws InterruptedException {
when(protocolAdapterConfig.getMinDelay()).thenReturn(2);
when(protocolAdapterConfig.getMaxDelay()).thenReturn(1);

simulationProtocolAdapter.poll(pollingInput, pollingOutput);

assertThrows(ExecutionException.class, () -> pollingOutput.getOutputFuture().get());
verify(timeWaiter, never()).sleep(anyInt());
}

@Test
@Timeout(2)
void test_poll_whenMinAndMaxIsTheSame_thenThreadWaitsExactlyTimeAmount()
throws InterruptedException, ExecutionException {
when(protocolAdapterConfig.getMinDelay()).thenReturn(1);
when(protocolAdapterConfig.getMaxDelay()).thenReturn(1);

simulationProtocolAdapter.poll(pollingInput, pollingOutput);
pollingOutput.getOutputFuture().get();

verify(timeWaiter, times(1)).sleep(eq(1));
}

@Test
@Timeout(2)
void test_poll_whenMaxBiggerMin_thenThreadWaitsBetweem() throws InterruptedException, ExecutionException {
when(protocolAdapterConfig.getMinDelay()).thenReturn(1);
when(protocolAdapterConfig.getMaxDelay()).thenReturn(3);

simulationProtocolAdapter.poll(pollingInput, pollingOutput);
pollingOutput.getOutputFuture().get();

ArgumentCaptor<Integer> argumentCaptor = ArgumentCaptor.forClass(Integer.class);
verify(timeWaiter, times(1)).sleep(argumentCaptor.capture());
final Integer sleepTimeMillis = argumentCaptor.getValue();
assertTrue(sleepTimeMillis >= 1);
assertTrue(sleepTimeMillis <= 3);
}

@Test
@Timeout(2)
void test_poll_whenMaxAndMinAreZero_thenDoNotSleep() throws InterruptedException, ExecutionException {
when(protocolAdapterConfig.getMinDelay()).thenReturn(0);
when(protocolAdapterConfig.getMaxDelay()).thenReturn(0);

simulationProtocolAdapter.poll(pollingInput, pollingOutput);
pollingOutput.getOutputFuture().get();

verify(timeWaiter, never()).sleep(anyInt());
}
}

0 comments on commit 2cc44d7

Please sign in to comment.