diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/CommonConfiguration.java b/artio-core/src/main/java/uk/co/real_logic/artio/CommonConfiguration.java index 92a160c56f..bf993c94ac 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/CommonConfiguration.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/CommonConfiguration.java @@ -268,6 +268,8 @@ private static LogTag lookupLogTag(final String name) public static final long DEFAULT_MIN_FIXP_KEEPALIVE_TIMEOUT_IN_MS = 1; public static final long DEFAULT_ACCEPTOR_FIXP_KEEPALIVE_TIMEOUT_IN_MS = SECONDS.toMillis(30); + public static final long DEFAULT_CYCLE_THRESHOLD_NS = SECONDS.toNanos(1); + public static final boolean RUNNING_ON_WINDOWS = System.getProperty("os.name").startsWith("Windows"); private long reasonableTransmissionTimeInMs = DEFAULT_REASONABLE_TRANSMISSION_TIME_IN_MS; diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/FixCounters.java b/artio-core/src/main/java/uk/co/real_logic/artio/FixCounters.java index 27da11a9cb..bcc892c40f 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/FixCounters.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/FixCounters.java @@ -17,6 +17,8 @@ import io.aeron.Aeron; import io.aeron.Counter; +import io.aeron.driver.DutyCycleTracker; +import io.aeron.driver.status.DutyCycleStallTracker; import org.agrona.collections.IntHashSet; import org.agrona.concurrent.status.AtomicCounter; import org.agrona.concurrent.status.CountersReader; @@ -47,7 +49,13 @@ public enum FixCountersId CURRENT_REPLAY_COUNT_TYPE_ID(10_008), NEGATIVE_TIMESTAMP_TYPE_ID(10_009), FAILED_ADMIN_TYPE_ID(10_010), - FAILED_ADMIN_REPLY_TYPE_ID(10_011); + FAILED_ADMIN_REPLY_TYPE_ID(10_011), + FRAMER_MAX_CYCLE_TIME_TYPE_ID(10_012), + FRAMER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_013), + INDEXER_MAX_CYCLE_TIME_TYPE_ID(10_014), + INDEXER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_015), + LIBRARY_MAX_CYCLE_TIME_TYPE_ID(10_016), + LIBRARY_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_017); final int id; @@ -149,6 +157,51 @@ public AtomicCounter negativeTimestamps() return negativeTimestamps; } + public DutyCycleTracker getFramerDutyCycleTracker(final long threshold) + { + if (threshold == 0) + { + return new DutyCycleTracker(); + } + + return new DutyCycleStallTracker( + newCounter(FRAMER_MAX_CYCLE_TIME_TYPE_ID.id(), "framer max cycle time in ns"), + newCounter(FRAMER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(), + "framer work cycle time exceeded count: threshold=" + threshold), + threshold + ); + } + + public DutyCycleTracker getIndexerDutyCycleTracker(final long threshold) + { + if (threshold == 0) + { + return new DutyCycleTracker(); + } + + return new DutyCycleStallTracker( + newCounter(INDEXER_MAX_CYCLE_TIME_TYPE_ID.id(), "indexer max cycle time in ns"), + newCounter(INDEXER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(), + "indexer work cycle time exceeded count: threshold=" + threshold), + threshold + ); + } + + public DutyCycleTracker getLibraryDutyCycleTracker(final int libraryId, final long threshold) + { + if (threshold == 0) + { + return new DutyCycleTracker(); + } + + return new DutyCycleStallTracker( + newCounter(LIBRARY_MAX_CYCLE_TIME_TYPE_ID.id(), "library " + libraryId + " max cycle time in ns"), + newCounter(LIBRARY_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(), + "library " + libraryId + " work cycle time exceeded count: threshold=" + threshold), + threshold + ); + } + public AtomicCounter messagesRead(final long connectionId, final String address) { return newCounter(FixCountersId.MESSAGES_READ_TYPE_ID.id(), diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineConfiguration.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineConfiguration.java index 0f72db73f2..de71b37d56 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineConfiguration.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineConfiguration.java @@ -324,6 +324,8 @@ public final class EngineConfiguration extends CommonConfiguration implements Au private long timeIndexReplayFlushIntervalInNs = DEFAULT_TIME_INDEX_FLUSH_INTERVAL_IN_NS; private CancelOnDisconnectOption cancelOnDisconnectOption = DO_NOT_CANCEL_ON_DISCONNECT_OR_LOGOUT; private int cancelOnDisconnectTimeoutWindowInMs = DEFAULT_CANCEL_ON_DISCONNECT_TIMEOUT_WINDOW_IN_MS; + private long framerCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS; + private long indexerCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS; private EngineReproductionConfiguration reproductionConfiguration; private ReproductionMessageHandler reproductionMessageHandler = (connectionId, bytes) -> @@ -1294,6 +1296,32 @@ public EngineConfiguration cancelOnDisconnectTimeoutWindowInMs(final int cancelO return this; } + /** + * Set a threshold for the framer work cycle time which when exceeded it will increment the + * framer cycle time exceeded count. + * + * @param framerCycleThresholdNs value in nanoseconds. The value 0 will disable duty cycle tracking on the framer. + * @return this for fluent API. + */ + public EngineConfiguration framerCycleThresholdNs(final long framerCycleThresholdNs) + { + this.framerCycleThresholdNs = framerCycleThresholdNs; + return this; + } + + /** + * Set a threshold for the indexer work cycle time which when exceeded it will increment the + * indexer cycle time exceeded count. + * + * @param indexerCycleThresholdNs value in nanoseconds. The value 0 will disable duty cycle tracking on the indexer. + * @return this for fluent API. + */ + public EngineConfiguration indexerCycleThresholdNs(final long indexerCycleThresholdNs) + { + this.indexerCycleThresholdNs = indexerCycleThresholdNs; + return this; + } + // --------------------- // END SETTERS // --------------------- @@ -2025,6 +2053,16 @@ public int cancelOnDisconnectTimeoutWindowInMs() return cancelOnDisconnectTimeoutWindowInMs; } + public long framerCycleThresholdNs() + { + return framerCycleThresholdNs; + } + + public long indexerCycleThresholdNs() + { + return indexerCycleThresholdNs; + } + public boolean indexChecksumEnabled() { return indexChecksumEnabled; diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java index 93bd7aa202..a0fadce7f4 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java @@ -252,7 +252,8 @@ private Replayer newReplayer( configuration.maxConcurrentSessionReplays(), clock, configuration.supportedFixPProtocolType(), - configuration); + configuration, + fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs())); } private void newIndexers() @@ -372,7 +373,8 @@ private void newArchivingAgent() senderSequenceNumbers, replayerCommandQueue, new FixSessionCodecsFactory(clock, configuration.sessionEpochFractionFormat()), - clock); + clock, + fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs())); } final List agents = new ArrayList<>(); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java index e2a9a2a0ef..a59d38f7dd 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java @@ -16,6 +16,7 @@ package uk.co.real_logic.artio.engine.framer; import io.aeron.*; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.ControlledFragmentHandler; import io.aeron.logbuffer.ControlledFragmentHandler.Action; import io.aeron.logbuffer.Header; @@ -138,6 +139,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler private final EpochNanoClock clock; private final Timer outboundTimer; private final Timer sendTimer; + private final DutyCycleTracker dutyCycleTracker; private final ControlledFragmentHandler librarySubscriber; private final ControlledFragmentHandler replaySubscriber; @@ -278,6 +280,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler this.acceptsFixP = configuration.acceptsFixP(); this.fixPContexts = fixPContexts; this.fixCounters = fixCounters; + this.dutyCycleTracker = fixCounters.getFramerDutyCycleTracker(configuration.framerCycleThresholdNs()); replyTimeoutInNs = TimeUnit.MILLISECONDS.toNanos(configuration.replyTimeoutInMs()); timerEventHandler = new TimerEventHandler(errorHandler); @@ -376,11 +379,17 @@ public Action onFixPMessage(final long connectionId, final DirectBuffer buffer, MILLISECONDS, epochClock.time(), 128, 512); } + public void onStart() + { + dutyCycleTracker.update(clock.nanoTime()); + } + public int doWork() throws Exception { final long timeInNs = clock.nanoTime(); final long timeInMs = epochClock.time(); + dutyCycleTracker.measureAndUpdate(timeInNs); fixSenderEndPoints.timeInMs(timeInMs); checkOutboundTimestampSender(timeInNs); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java index 8ff612737b..01e380621b 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java @@ -16,10 +16,12 @@ package uk.co.real_logic.artio.engine.logger; import io.aeron.ExclusivePublication; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.BufferClaim; import io.aeron.logbuffer.ControlledFragmentHandler; import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.Agent; +import org.agrona.concurrent.EpochNanoClock; import uk.co.real_logic.artio.DebugLogger; import uk.co.real_logic.artio.LogTag; import uk.co.real_logic.artio.Pressure; @@ -69,16 +71,23 @@ abstract class AbstractReplayer implements Agent, ControlledFragmentHandler boolean sendStartReplay = true; + protected final EpochNanoClock clock; + private final DutyCycleTracker dutyCycleTracker; + AbstractReplayer( final ExclusivePublication publication, final FixSessionCodecsFactory fixSessionCodecsFactory, final BufferClaim bufferClaim, - final SenderSequenceNumbers senderSequenceNumbers) + final SenderSequenceNumbers senderSequenceNumbers, + final EpochNanoClock clock, + final DutyCycleTracker dutyCycleTracker) { this.publication = publication; this.fixSessionCodecsFactory = fixSessionCodecsFactory; this.bufferClaim = bufferClaim; this.senderSequenceNumbers = senderSequenceNumbers; + this.clock = clock; + this.dutyCycleTracker = dutyCycleTracker; } boolean trySendStartReplay(final long sessionId, final long connectionId, final long correlationId) @@ -108,6 +117,16 @@ boolean trySendStartReplay(final long sessionId, final long connectionId, final return false; } + public void onStart() + { + dutyCycleTracker.update(clock.nanoTime()); + } + + protected void trackDutyCycleTime(final long timeInNs) + { + dutyCycleTracker.measureAndUpdate(timeInNs); + } + public void onClose() { publication.close(); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java index 5e1741940b..8a3bc9a377 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java @@ -16,6 +16,7 @@ package uk.co.real_logic.artio.engine.logger; import io.aeron.Subscription; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.BufferClaim; import io.aeron.logbuffer.Header; import org.agrona.DirectBuffer; @@ -63,9 +64,11 @@ public GapFiller( final SenderSequenceNumbers senderSequenceNumbers, final ReplayerCommandQueue replayerCommandQueue, final FixSessionCodecsFactory fixSessionCodecsFactory, - final EpochNanoClock clock) + final EpochNanoClock clock, + final DutyCycleTracker dutyCycleTracker) { - super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers); + super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers, + clock, dutyCycleTracker); this.inboundSubscription = inboundSubscription; this.publication = publication; this.agentNamePrefix = agentNamePrefix; @@ -76,7 +79,10 @@ public GapFiller( public int doWork() { - timestamper.sendTimestampMessage(); + final long timeInNs = clock.nanoTime(); + + trackDutyCycleTime(timeInNs); + timestamper.sendTimestampMessage(timeInNs); return replayerCommandQueue.poll() + inboundSubscription.controlledPoll(this, POLL_LIMIT); } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java index 6554759ceb..f6ed993dbf 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java @@ -48,9 +48,8 @@ class ReplayTimestamper replayerTimestampEncoder.wrapAndApplyHeader(timestampBuffer, 0, messageHeaderEncoder); } - void sendTimestampMessage() + void sendTimestampMessage(final long timeInNs) { - final long timeInNs = clock.nanoTime(); if (timeInNs > nextTimestampMessageInNs) { replayerTimestampEncoder.timestamp(timeInNs); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java index e1dce081eb..c52b199ead 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java @@ -17,6 +17,7 @@ import io.aeron.ExclusivePublication; import io.aeron.Subscription; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.BufferClaim; import io.aeron.logbuffer.Header; import org.agrona.DirectBuffer; @@ -106,7 +107,6 @@ public class Replayer extends AbstractReplayer private final ReplayerCommandQueue replayerCommandQueue; private final AtomicCounter currentReplayCount; private final int maxConcurrentSessionReplays; - private final EpochNanoClock clock; private final EngineConfiguration configuration; private final ReplayQuery outboundReplayQuery; private final IdleStrategy idleStrategy; @@ -140,9 +140,10 @@ public Replayer( final int maxConcurrentSessionReplays, final EpochNanoClock clock, final FixPProtocolType fixPProtocolType, - final EngineConfiguration configuration) + final EngineConfiguration configuration, + final DutyCycleTracker dutyCycleTracker) { - super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers); + super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers, clock, dutyCycleTracker); this.outboundReplayQuery = outboundReplayQuery; this.idleStrategy = idleStrategy; this.errorHandler = errorHandler; @@ -157,7 +158,6 @@ public Replayer( this.replayerCommandQueue = replayerCommandQueue; this.currentReplayCount = currentReplayCount; this.maxConcurrentSessionReplays = maxConcurrentSessionReplays; - this.clock = clock; this.configuration = configuration; gapFillMessageTypes = packAllMessageTypes(gapfillOnReplayMessageTypes); @@ -468,7 +468,10 @@ private FixReplayerSession processFixResendRequest( public int doWork() { - timestamper.sendTimestampMessage(); + final long timeInNs = clock.nanoTime(); + + trackDutyCycleTime(timeInNs); + timestamper.sendTimestampMessage(timeInNs); int work = replayerCommandQueue.poll(); work += pollReplayerChannels(); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java b/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java index a6a885bb34..6e0723c490 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/library/FixLibrary.java @@ -123,6 +123,7 @@ private FixLibrary connect() { poller.startConnecting(); scheduler.launch(configuration, errorHandler, monitoringCompositeAgent, conductorAgent()); + poller.onStart(); return this; } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryConfiguration.java b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryConfiguration.java index d1e34a093a..827f708237 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryConfiguration.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryConfiguration.java @@ -83,6 +83,7 @@ public void onDisconnect(final FixLibrary library) private FixPConnectionExistsHandler fixPConnectionExistsHandler; private FixPConnectionAcquiredHandler fixPConnectionAcquiredHandler; private LibraryReproductionConfiguration reproductionConfiguration; + private long libraryCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS; /** * When a new FIX session connects to the gateway you register a callback handler to find @@ -287,6 +288,19 @@ public LibraryConfiguration reproduceInbound( return this; } + /** + * Set a threshold for the library work cycle time which when exceeded it will increment the + * library cycle time exceeded count. + * + * @param libraryCycleThresholdNs value in nanoseconds. The value 0 will disable duty cycle tracking on the library. + * @return this for fluent API. + */ + public LibraryConfiguration libraryCycleThresholdNs(final long libraryCycleThresholdNs) + { + this.libraryCycleThresholdNs = libraryCycleThresholdNs; + return this; + } + // ------------------------ // BEGIN INHERITED SETTERS // ------------------------ @@ -430,6 +444,11 @@ boolean isReproductionEnabled() return reproductionConfiguration != null; } + public long libraryCycleThresholdNs() + { + return libraryCycleThresholdNs; + } + public LibraryConfiguration libraryName(final String libraryName) { this.libraryName = libraryName; diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java index b568fa8bd6..f1d0cd0789 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/library/LibraryPoller.java @@ -19,6 +19,7 @@ import io.aeron.CommonContext; import io.aeron.ControlledFragmentAssembler; import io.aeron.Subscription; +import io.aeron.driver.DutyCycleTracker; import io.aeron.exceptions.RegistrationException; import io.aeron.logbuffer.ControlledFragmentHandler; import io.aeron.logbuffer.ControlledFragmentHandler.Action; @@ -143,6 +144,7 @@ final class LibraryPoller implements LibraryEndPointHandler, ProtocolHandler, Au private final boolean enginesAreClustered; private final ErrorHandler errorHandler; private final FixCounters fixCounters; + private final DutyCycleTracker dutyCycleTracker; private final boolean isReproductionEnabled; private final ReproductionClock reproductionClock; @@ -258,6 +260,9 @@ public Reply messageThrottle( epochClock, configuration.epochNanoClock(), configuration.sessionEpochFractionFormat()); this.isReproductionEnabled = configuration.isReproductionEnabled(); this.reproductionClock = isReproductionEnabled ? configuration.reproductionConfiguration().clock() : null; + + this.dutyCycleTracker = fixCounters.getLibraryDutyCycleTracker( + configuration.libraryId(), configuration.libraryCycleThresholdNs()); } boolean isConnected() @@ -589,29 +594,32 @@ long saveFollowerSessionRequest( int poll(final int fragmentLimit) { + final long timeInNs = epochNanoClock.nanoTime(); final long timeInMs = timeInMs(); + dutyCycleTracker.measureAndUpdate(timeInNs); + switch (state) { case CONNECTED: - return pollWithoutReconnect(timeInMs, fragmentLimit); + return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit); case ATTEMPT_CONNECT: - startConnecting(); - return pollWithoutReconnect(timeInMs, fragmentLimit); + startConnecting(timeInMs); + return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit); case CONNECTING: nextConnectingStep(timeInMs); - return pollWithoutReconnect(timeInMs, fragmentLimit); + return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit); case ATTEMPT_CURRENT_NODE: connectToNewEngine(timeInMs); state = CONNECTING; - return pollWithoutReconnect(timeInMs, fragmentLimit); + return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit); case ENGINE_DISCONNECT: attemptEngineCloseBasedLogout(); - return pollWithoutReconnect(timeInMs, fragmentLimit); + return pollWithoutReconnect(timeInNs, timeInMs, fragmentLimit); case CLOSED: default: @@ -619,9 +627,8 @@ int poll(final int fragmentLimit) } } - private int pollWithoutReconnect(final long timeInMs, final int fragmentLimit) + private int pollWithoutReconnect(final long timeInNs, final long timeInMs, final int fragmentLimit) { - final long timeInNs = epochNanoClock.nanoTime(); int operations = 0; operations += inboundSubscription.controlledPoll(outboundSubscription, fragmentLimit); operations += livenessDetector.poll(timeInMs); @@ -635,6 +642,11 @@ private int pollWithoutReconnect(final long timeInMs, final int fragmentLimit) // BEGIN CONNECTION LOGIC // ----------------------------------------------------------------------- + void onStart() + { + dutyCycleTracker.update(epochNanoClock.nanoTime()); + } + void startConnecting() { startConnecting(timeInMs()); diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java index 0d2e86a1c0..4d3f24a09f 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/framer/FramerTest.java @@ -17,6 +17,7 @@ import io.aeron.Image; import io.aeron.Subscription; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.ControlledFragmentHandler.Action; import io.aeron.logbuffer.Header; import org.agrona.DirectBuffer; @@ -112,6 +113,7 @@ public class FramerTest private final SequenceNumberIndexReader sentSequenceNumberIndex = mock(SequenceNumberIndexReader.class); private final SequenceNumberIndexReader receivedSequenceNumberIndex = mock(SequenceNumberIndexReader.class); private final ReplayQuery replayQuery = mock(ReplayQuery.class); + private final FixCounters fixCounters = mock(FixCounters.class); private final FixContexts fixContexts = mock(FixContexts.class); private final FixGatewaySessions gatewaySessions = mock(FixGatewaySessions.class); private final FixGatewaySession gatewaySession = mock(FixGatewaySession.class); @@ -155,6 +157,9 @@ public void setUp() throws IOException clientBuffer.putInt(10, 5); + when(fixCounters.getFramerDutyCycleTracker(anyLong())).thenReturn(mock(DutyCycleTracker.class)); + when(fixCounters.getIndexerDutyCycleTracker(anyLong())).thenReturn(mock(DutyCycleTracker.class)); + when(outboundLibrarySubscription.imageBySessionId(anyInt())).thenReturn(normalImage); when(mockEndPointFactory.receiverEndPoint( @@ -208,7 +213,7 @@ public void setUp() throws IOException mock(CountersReader.class), 2, 1, - mock(FixCounters.class), + fixCounters, mock(SenderSequenceNumbers.class), mock(AgentInvoker.class), null); diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java index 39506e9deb..42bc819a14 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java @@ -17,6 +17,7 @@ import io.aeron.Subscription; import io.aeron.driver.Configuration; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.ControlledFragmentHandler; import io.aeron.logbuffer.ControlledFragmentHandler.Action; import io.aeron.logbuffer.Header; @@ -138,7 +139,8 @@ public void setUp() DEFAULT_MAX_CONCURRENT_SESSION_REPLAYS, clock, FixPProtocolType.ILINK_3, - mock(EngineConfiguration.class)); + mock(EngineConfiguration.class), + mock(DutyCycleTracker.class)); } private void setReplayedMessages(final int replayedMessages) diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/library/LibraryPollerTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/library/LibraryPollerTest.java index fea269fc59..10354c9476 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/library/LibraryPollerTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/library/LibraryPollerTest.java @@ -16,6 +16,7 @@ package uk.co.real_logic.artio.library; import io.aeron.Subscription; +import io.aeron.driver.DutyCycleTracker; import org.agrona.LangUtil; import org.agrona.concurrent.UnsafeBuffer; import org.agrona.concurrent.status.AtomicCounter; @@ -89,6 +90,7 @@ public void setUp() when(transport.outboundPublication()).thenReturn(outboundPublication); when(transport.inboundSubscription()).thenReturn(inboundSubscription); + when(counters.getLibraryDutyCycleTracker(anyInt(), anyLong())).thenReturn(mock(DutyCycleTracker.class)); when(counters.receivedMsgSeqNo(anyLong(), anyLong())).thenReturn(mock(AtomicCounter.class)); when(counters.sentMsgSeqNo(anyLong(), anyLong())).thenReturn(mock(AtomicCounter.class));