From d6037d56f076a080b9ac7398ffcdf7afbc7fd8df Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 19 Dec 2024 14:30:48 -0500 Subject: [PATCH] Add duty cycle tracking to replayer/gapfiller --- .../artio/engine/EngineContext.java | 12 ++---- .../artio/engine/logger/AbstractReplayer.java | 21 +++++++++- .../artio/engine/logger/GapFiller.java | 12 ++++-- .../logger/IndexerDutyCycleTracker.java | 41 ------------------- .../engine/logger/ReplayTimestamper.java | 3 +- .../artio/engine/logger/Replayer.java | 13 +++--- .../artio/engine/logger/ReplayerTest.java | 4 +- 7 files changed, 45 insertions(+), 61 deletions(-) delete mode 100644 artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/IndexerDutyCycleTracker.java diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java index 7edff2b15f..a0fadce7f4 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/EngineContext.java @@ -252,7 +252,8 @@ private Replayer newReplayer( configuration.maxConcurrentSessionReplays(), clock, configuration.supportedFixPProtocolType(), - configuration); + configuration, + fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs())); } private void newIndexers() @@ -372,19 +373,14 @@ private void newArchivingAgent() senderSequenceNumbers, replayerCommandQueue, new FixSessionCodecsFactory(clock, configuration.sessionEpochFractionFormat()), - clock); + clock, + fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs())); } - final Agent dutyCycleTrackingAgent = new IndexerDutyCycleTracker( - configuration.agentNamePrefix(), - clock, - fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs())); - final List agents = new ArrayList<>(); agents.add(inboundIndexer); agents.add(outboundIndexer); agents.add(replayer); - agents.add(dutyCycleTrackingAgent); indexingAgent = new CompositeAgent(agents); } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java index 8ff612737b..01e380621b 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/AbstractReplayer.java @@ -16,10 +16,12 @@ package uk.co.real_logic.artio.engine.logger; import io.aeron.ExclusivePublication; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.BufferClaim; import io.aeron.logbuffer.ControlledFragmentHandler; import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.Agent; +import org.agrona.concurrent.EpochNanoClock; import uk.co.real_logic.artio.DebugLogger; import uk.co.real_logic.artio.LogTag; import uk.co.real_logic.artio.Pressure; @@ -69,16 +71,23 @@ abstract class AbstractReplayer implements Agent, ControlledFragmentHandler boolean sendStartReplay = true; + protected final EpochNanoClock clock; + private final DutyCycleTracker dutyCycleTracker; + AbstractReplayer( final ExclusivePublication publication, final FixSessionCodecsFactory fixSessionCodecsFactory, final BufferClaim bufferClaim, - final SenderSequenceNumbers senderSequenceNumbers) + final SenderSequenceNumbers senderSequenceNumbers, + final EpochNanoClock clock, + final DutyCycleTracker dutyCycleTracker) { this.publication = publication; this.fixSessionCodecsFactory = fixSessionCodecsFactory; this.bufferClaim = bufferClaim; this.senderSequenceNumbers = senderSequenceNumbers; + this.clock = clock; + this.dutyCycleTracker = dutyCycleTracker; } boolean trySendStartReplay(final long sessionId, final long connectionId, final long correlationId) @@ -108,6 +117,16 @@ boolean trySendStartReplay(final long sessionId, final long connectionId, final return false; } + public void onStart() + { + dutyCycleTracker.update(clock.nanoTime()); + } + + protected void trackDutyCycleTime(final long timeInNs) + { + dutyCycleTracker.measureAndUpdate(timeInNs); + } + public void onClose() { publication.close(); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java index 5e1741940b..8a3bc9a377 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/GapFiller.java @@ -16,6 +16,7 @@ package uk.co.real_logic.artio.engine.logger; import io.aeron.Subscription; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.BufferClaim; import io.aeron.logbuffer.Header; import org.agrona.DirectBuffer; @@ -63,9 +64,11 @@ public GapFiller( final SenderSequenceNumbers senderSequenceNumbers, final ReplayerCommandQueue replayerCommandQueue, final FixSessionCodecsFactory fixSessionCodecsFactory, - final EpochNanoClock clock) + final EpochNanoClock clock, + final DutyCycleTracker dutyCycleTracker) { - super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers); + super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers, + clock, dutyCycleTracker); this.inboundSubscription = inboundSubscription; this.publication = publication; this.agentNamePrefix = agentNamePrefix; @@ -76,7 +79,10 @@ public GapFiller( public int doWork() { - timestamper.sendTimestampMessage(); + final long timeInNs = clock.nanoTime(); + + trackDutyCycleTime(timeInNs); + timestamper.sendTimestampMessage(timeInNs); return replayerCommandQueue.poll() + inboundSubscription.controlledPoll(this, POLL_LIMIT); } diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/IndexerDutyCycleTracker.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/IndexerDutyCycleTracker.java deleted file mode 100644 index e058c3bdd4..0000000000 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/IndexerDutyCycleTracker.java +++ /dev/null @@ -1,41 +0,0 @@ -package uk.co.real_logic.artio.engine.logger; - -import io.aeron.driver.DutyCycleTracker; -import org.agrona.concurrent.Agent; -import org.agrona.concurrent.EpochNanoClock; - -public class IndexerDutyCycleTracker implements Agent -{ - final String agentNamePrefix; - final EpochNanoClock clock; - final DutyCycleTracker dutyCycleTracker; - - public IndexerDutyCycleTracker( - final String agentNamePrefix, - final EpochNanoClock clock, - final DutyCycleTracker dutyCycleTracker) - { - this.agentNamePrefix = agentNamePrefix; - this.clock = clock; - this.dutyCycleTracker = dutyCycleTracker; - } - - @Override - public void onStart() - { - dutyCycleTracker.update(clock.nanoTime()); - } - - @Override - public int doWork() throws Exception - { - dutyCycleTracker.measureAndUpdate(clock.nanoTime()); - return 0; - } - - @Override - public String roleName() - { - return agentNamePrefix + "IndexerDutyCycleTracker"; - } -} diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java index 6554759ceb..f6ed993dbf 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/ReplayTimestamper.java @@ -48,9 +48,8 @@ class ReplayTimestamper replayerTimestampEncoder.wrapAndApplyHeader(timestampBuffer, 0, messageHeaderEncoder); } - void sendTimestampMessage() + void sendTimestampMessage(final long timeInNs) { - final long timeInNs = clock.nanoTime(); if (timeInNs > nextTimestampMessageInNs) { replayerTimestampEncoder.timestamp(timeInNs); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java index e1dce081eb..c52b199ead 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/logger/Replayer.java @@ -17,6 +17,7 @@ import io.aeron.ExclusivePublication; import io.aeron.Subscription; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.BufferClaim; import io.aeron.logbuffer.Header; import org.agrona.DirectBuffer; @@ -106,7 +107,6 @@ public class Replayer extends AbstractReplayer private final ReplayerCommandQueue replayerCommandQueue; private final AtomicCounter currentReplayCount; private final int maxConcurrentSessionReplays; - private final EpochNanoClock clock; private final EngineConfiguration configuration; private final ReplayQuery outboundReplayQuery; private final IdleStrategy idleStrategy; @@ -140,9 +140,10 @@ public Replayer( final int maxConcurrentSessionReplays, final EpochNanoClock clock, final FixPProtocolType fixPProtocolType, - final EngineConfiguration configuration) + final EngineConfiguration configuration, + final DutyCycleTracker dutyCycleTracker) { - super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers); + super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers, clock, dutyCycleTracker); this.outboundReplayQuery = outboundReplayQuery; this.idleStrategy = idleStrategy; this.errorHandler = errorHandler; @@ -157,7 +158,6 @@ public Replayer( this.replayerCommandQueue = replayerCommandQueue; this.currentReplayCount = currentReplayCount; this.maxConcurrentSessionReplays = maxConcurrentSessionReplays; - this.clock = clock; this.configuration = configuration; gapFillMessageTypes = packAllMessageTypes(gapfillOnReplayMessageTypes); @@ -468,7 +468,10 @@ private FixReplayerSession processFixResendRequest( public int doWork() { - timestamper.sendTimestampMessage(); + final long timeInNs = clock.nanoTime(); + + trackDutyCycleTime(timeInNs); + timestamper.sendTimestampMessage(timeInNs); int work = replayerCommandQueue.poll(); work += pollReplayerChannels(); diff --git a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java index 39506e9deb..42bc819a14 100644 --- a/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java +++ b/artio-core/src/test/java/uk/co/real_logic/artio/engine/logger/ReplayerTest.java @@ -17,6 +17,7 @@ import io.aeron.Subscription; import io.aeron.driver.Configuration; +import io.aeron.driver.DutyCycleTracker; import io.aeron.logbuffer.ControlledFragmentHandler; import io.aeron.logbuffer.ControlledFragmentHandler.Action; import io.aeron.logbuffer.Header; @@ -138,7 +139,8 @@ public void setUp() DEFAULT_MAX_CONCURRENT_SESSION_REPLAYS, clock, FixPProtocolType.ILINK_3, - mock(EngineConfiguration.class)); + mock(EngineConfiguration.class), + mock(DutyCycleTracker.class)); } private void setReplayedMessages(final int replayedMessages)