Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duty cycle tracker #536

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ private static LogTag lookupLogTag(final String name)
public static final long DEFAULT_MIN_FIXP_KEEPALIVE_TIMEOUT_IN_MS = 1;
public static final long DEFAULT_ACCEPTOR_FIXP_KEEPALIVE_TIMEOUT_IN_MS = SECONDS.toMillis(30);

public static final long DEFAULT_CYCLE_THRESHOLD_NS = SECONDS.toNanos(1);

public static final boolean RUNNING_ON_WINDOWS = System.getProperty("os.name").startsWith("Windows");

private long reasonableTransmissionTimeInMs = DEFAULT_REASONABLE_TRANSMISSION_TIME_IN_MS;
Expand Down
55 changes: 54 additions & 1 deletion artio-core/src/main/java/uk/co/real_logic/artio/FixCounters.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.aeron.Aeron;
import io.aeron.Counter;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.driver.status.DutyCycleStallTracker;
import org.agrona.collections.IntHashSet;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;
Expand Down Expand Up @@ -47,7 +49,13 @@ public enum FixCountersId
CURRENT_REPLAY_COUNT_TYPE_ID(10_008),
NEGATIVE_TIMESTAMP_TYPE_ID(10_009),
FAILED_ADMIN_TYPE_ID(10_010),
FAILED_ADMIN_REPLY_TYPE_ID(10_011);
FAILED_ADMIN_REPLY_TYPE_ID(10_011),
FRAMER_MAX_CYCLE_TIME_TYPE_ID(10_012),
FRAMER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_013),
INDEXER_MAX_CYCLE_TIME_TYPE_ID(10_014),
INDEXER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_015),
LIBRARY_MAX_CYCLE_TIME_TYPE_ID(10_016),
LIBRARY_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID(10_017);

final int id;

Expand Down Expand Up @@ -149,6 +157,51 @@ public AtomicCounter negativeTimestamps()
return negativeTimestamps;
}

public DutyCycleTracker getFramerDutyCycleTracker(final long threshold)
{
if (threshold == 0)
{
return new DutyCycleTracker();
}

return new DutyCycleStallTracker(
marc-adaptive marked this conversation as resolved.
Show resolved Hide resolved
newCounter(FRAMER_MAX_CYCLE_TIME_TYPE_ID.id(), "framer max cycle time in ns"),
newCounter(FRAMER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(),
"framer work cycle time exceeded count: threshold=" + threshold),
threshold
);
}

public DutyCycleTracker getIndexerDutyCycleTracker(final long threshold)
{
if (threshold == 0)
{
return new DutyCycleTracker();
}

return new DutyCycleStallTracker(
newCounter(INDEXER_MAX_CYCLE_TIME_TYPE_ID.id(), "indexer max cycle time in ns"),
newCounter(INDEXER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(),
"indexer work cycle time exceeded count: threshold=" + threshold),
threshold
);
}

public DutyCycleTracker getLibraryDutyCycleTracker(final int libraryId, final long threshold)
{
if (threshold == 0)
{
return new DutyCycleTracker();
}

return new DutyCycleStallTracker(
newCounter(LIBRARY_MAX_CYCLE_TIME_TYPE_ID.id(), "library " + libraryId + " max cycle time in ns"),
newCounter(LIBRARY_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID.id(),
"library " + libraryId + " work cycle time exceeded count: threshold=" + threshold),
threshold
);
}

public AtomicCounter messagesRead(final long connectionId, final String address)
{
return newCounter(FixCountersId.MESSAGES_READ_TYPE_ID.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ public final class EngineConfiguration extends CommonConfiguration implements Au
private long timeIndexReplayFlushIntervalInNs = DEFAULT_TIME_INDEX_FLUSH_INTERVAL_IN_NS;
private CancelOnDisconnectOption cancelOnDisconnectOption = DO_NOT_CANCEL_ON_DISCONNECT_OR_LOGOUT;
private int cancelOnDisconnectTimeoutWindowInMs = DEFAULT_CANCEL_ON_DISCONNECT_TIMEOUT_WINDOW_IN_MS;
private long framerCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS;
private long indexerCycleThresholdNs = DEFAULT_CYCLE_THRESHOLD_NS;

private EngineReproductionConfiguration reproductionConfiguration;
private ReproductionMessageHandler reproductionMessageHandler = (connectionId, bytes) ->
Expand Down Expand Up @@ -1294,6 +1296,32 @@ public EngineConfiguration cancelOnDisconnectTimeoutWindowInMs(final int cancelO
return this;
}

/**
* Set a threshold for the framer work cycle time which when exceeded it will increment the
* framer cycle time exceeded count.
*
* @param framerCycleThresholdNs value in nanoseconds. The value 0 will disable duty cycle tracking on the framer.
* @return this for fluent API.
*/
public EngineConfiguration framerCycleThresholdNs(final long framerCycleThresholdNs)
{
this.framerCycleThresholdNs = framerCycleThresholdNs;
return this;
}

/**
* Set a threshold for the indexer work cycle time which when exceeded it will increment the
* indexer cycle time exceeded count.
*
* @param indexerCycleThresholdNs value in nanoseconds. The value 0 will disable duty cycle tracking on the indexer.
* @return this for fluent API.
*/
public EngineConfiguration indexerCycleThresholdNs(final long indexerCycleThresholdNs)
{
this.indexerCycleThresholdNs = indexerCycleThresholdNs;
return this;
}

// ---------------------
// END SETTERS
// ---------------------
Expand Down Expand Up @@ -2025,6 +2053,16 @@ public int cancelOnDisconnectTimeoutWindowInMs()
return cancelOnDisconnectTimeoutWindowInMs;
}

public long framerCycleThresholdNs()
{
return framerCycleThresholdNs;
}

public long indexerCycleThresholdNs()
{
return indexerCycleThresholdNs;
}

public boolean indexChecksumEnabled()
{
return indexChecksumEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ private Replayer newReplayer(
configuration.maxConcurrentSessionReplays(),
clock,
configuration.supportedFixPProtocolType(),
configuration);
configuration,
fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs()));
}

private void newIndexers()
Expand Down Expand Up @@ -372,7 +373,8 @@ private void newArchivingAgent()
senderSequenceNumbers,
replayerCommandQueue,
new FixSessionCodecsFactory(clock, configuration.sessionEpochFractionFormat()),
clock);
clock,
fixCounters.getIndexerDutyCycleTracker(configuration.indexerCycleThresholdNs()));
}

final List<Agent> agents = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.engine.framer;

import io.aeron.*;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.ControlledFragmentHandler.Action;
import io.aeron.logbuffer.Header;
Expand Down Expand Up @@ -138,6 +139,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
private final EpochNanoClock clock;
private final Timer outboundTimer;
private final Timer sendTimer;
private final DutyCycleTracker dutyCycleTracker;

private final ControlledFragmentHandler librarySubscriber;
private final ControlledFragmentHandler replaySubscriber;
Expand Down Expand Up @@ -278,6 +280,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
this.acceptsFixP = configuration.acceptsFixP();
this.fixPContexts = fixPContexts;
this.fixCounters = fixCounters;
this.dutyCycleTracker = fixCounters.getFramerDutyCycleTracker(configuration.framerCycleThresholdNs());

replyTimeoutInNs = TimeUnit.MILLISECONDS.toNanos(configuration.replyTimeoutInMs());
timerEventHandler = new TimerEventHandler(errorHandler);
Expand Down Expand Up @@ -376,11 +379,17 @@ public Action onFixPMessage(final long connectionId, final DirectBuffer buffer,
MILLISECONDS, epochClock.time(), 128, 512);
}

public void onStart()
{
dutyCycleTracker.update(clock.nanoTime());
}

public int doWork() throws Exception
{
final long timeInNs = clock.nanoTime();
final long timeInMs = epochClock.time();

dutyCycleTracker.measureAndUpdate(timeInNs);
fixSenderEndPoints.timeInMs(timeInMs);

checkOutboundTimestampSender(timeInNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package uk.co.real_logic.artio.engine.logger;

import io.aeron.ExclusivePublication;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.ControlledFragmentHandler;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochNanoClock;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.Pressure;
Expand Down Expand Up @@ -69,16 +71,23 @@ abstract class AbstractReplayer implements Agent, ControlledFragmentHandler

boolean sendStartReplay = true;

protected final EpochNanoClock clock;
private final DutyCycleTracker dutyCycleTracker;

AbstractReplayer(
final ExclusivePublication publication,
final FixSessionCodecsFactory fixSessionCodecsFactory,
final BufferClaim bufferClaim,
final SenderSequenceNumbers senderSequenceNumbers)
final SenderSequenceNumbers senderSequenceNumbers,
final EpochNanoClock clock,
final DutyCycleTracker dutyCycleTracker)
{
this.publication = publication;
this.fixSessionCodecsFactory = fixSessionCodecsFactory;
this.bufferClaim = bufferClaim;
this.senderSequenceNumbers = senderSequenceNumbers;
this.clock = clock;
this.dutyCycleTracker = dutyCycleTracker;
}

boolean trySendStartReplay(final long sessionId, final long connectionId, final long correlationId)
Expand Down Expand Up @@ -108,6 +117,16 @@ boolean trySendStartReplay(final long sessionId, final long connectionId, final
return false;
}

public void onStart()
{
dutyCycleTracker.update(clock.nanoTime());
}

protected void trackDutyCycleTime(final long timeInNs)
{
dutyCycleTracker.measureAndUpdate(timeInNs);
}

public void onClose()
{
publication.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.engine.logger;

import io.aeron.Subscription;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.Header;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -63,9 +64,11 @@ public GapFiller(
final SenderSequenceNumbers senderSequenceNumbers,
final ReplayerCommandQueue replayerCommandQueue,
final FixSessionCodecsFactory fixSessionCodecsFactory,
final EpochNanoClock clock)
final EpochNanoClock clock,
final DutyCycleTracker dutyCycleTracker)
{
super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers);
super(publication.dataPublication(), fixSessionCodecsFactory, new BufferClaim(), senderSequenceNumbers,
clock, dutyCycleTracker);
this.inboundSubscription = inboundSubscription;
this.publication = publication;
this.agentNamePrefix = agentNamePrefix;
Expand All @@ -76,7 +79,10 @@ public GapFiller(

public int doWork()
{
timestamper.sendTimestampMessage();
final long timeInNs = clock.nanoTime();

trackDutyCycleTime(timeInNs);
timestamper.sendTimestampMessage(timeInNs);

return replayerCommandQueue.poll() + inboundSubscription.controlledPoll(this, POLL_LIMIT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ class ReplayTimestamper
replayerTimestampEncoder.wrapAndApplyHeader(timestampBuffer, 0, messageHeaderEncoder);
}

void sendTimestampMessage()
void sendTimestampMessage(final long timeInNs)
{
final long timeInNs = clock.nanoTime();
if (timeInNs > nextTimestampMessageInNs)
{
replayerTimestampEncoder.timestamp(timeInNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.aeron.ExclusivePublication;
import io.aeron.Subscription;
import io.aeron.driver.DutyCycleTracker;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.Header;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -106,7 +107,6 @@ public class Replayer extends AbstractReplayer
private final ReplayerCommandQueue replayerCommandQueue;
private final AtomicCounter currentReplayCount;
private final int maxConcurrentSessionReplays;
private final EpochNanoClock clock;
private final EngineConfiguration configuration;
private final ReplayQuery outboundReplayQuery;
private final IdleStrategy idleStrategy;
Expand Down Expand Up @@ -140,9 +140,10 @@ public Replayer(
final int maxConcurrentSessionReplays,
final EpochNanoClock clock,
final FixPProtocolType fixPProtocolType,
final EngineConfiguration configuration)
final EngineConfiguration configuration,
final DutyCycleTracker dutyCycleTracker)
{
super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers);
super(publication, fixSessionCodecsFactory, bufferClaim, senderSequenceNumbers, clock, dutyCycleTracker);
this.outboundReplayQuery = outboundReplayQuery;
this.idleStrategy = idleStrategy;
this.errorHandler = errorHandler;
Expand All @@ -157,7 +158,6 @@ public Replayer(
this.replayerCommandQueue = replayerCommandQueue;
this.currentReplayCount = currentReplayCount;
this.maxConcurrentSessionReplays = maxConcurrentSessionReplays;
this.clock = clock;
this.configuration = configuration;

gapFillMessageTypes = packAllMessageTypes(gapfillOnReplayMessageTypes);
Expand Down Expand Up @@ -468,7 +468,10 @@ private FixReplayerSession processFixResendRequest(

public int doWork()
{
timestamper.sendTimestampMessage();
final long timeInNs = clock.nanoTime();

trackDutyCycleTime(timeInNs);
timestamper.sendTimestampMessage(timeInNs);

int work = replayerCommandQueue.poll();
work += pollReplayerChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private FixLibrary connect()
{
poller.startConnecting();
scheduler.launch(configuration, errorHandler, monitoringCompositeAgent, conductorAgent());
poller.onStart();
return this;
}

Expand Down
Loading
Loading