diff --git a/build.gradle b/build.gradle
index b3cc95c..cebb9d5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,5 +1,3 @@
-import com.sun.org.apache.xalan.internal.xsltc.compiler.Copy
-
apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'signing'
@@ -10,7 +8,7 @@ apply plugin: 'idea'
defaultTasks 'clean', 'build', 'install'
group = 'org.helios'
-version = '0.0.4-RC'
+version = '0.0.4'
ext.isReleaseVersion = !version.endsWith("-RC")
diff --git a/resources/Helios-MMB.xml b/resources/Helios-MMB.xml
index 3195a9c..f0f684a 100644
--- a/resources/Helios-MMB.xml
+++ b/resources/Helios-MMB.xml
@@ -31,9 +31,9 @@
-
- S
- G
+
+ 0
+ 1
@@ -45,24 +45,32 @@
-
+
-
+
-
+
-
+
-
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/org/helios/AeronStream.java b/src/org/helios/AeronStream.java
index b0ecafb..6b66816 100644
--- a/src/org/helios/AeronStream.java
+++ b/src/org/helios/AeronStream.java
@@ -2,6 +2,7 @@
import io.aeron.Aeron;
import io.aeron.Image;
+import org.helios.mmb.sbe.ComponentType;
import java.util.Objects;
@@ -24,20 +25,15 @@ static AeronStream fromImage(final Aeron aeron, final Image image)
public final Aeron aeron;
public final String channel;
public final int streamId;
+ public ComponentType componentType;
+ public short componentId;
private final String key;
@Override
public boolean equals(Object obj)
{
- if (obj != null && obj instanceof AeronStream)
- {
- return ((AeronStream)obj).key.equals(key);
- }
- else
- {
- return false;
- }
+ return obj != null && obj instanceof AeronStream && ((AeronStream) obj).key.equals(key);
}
@Override
diff --git a/src/org/helios/Helios.java b/src/org/helios/Helios.java
index a09cbfd..09256d9 100644
--- a/src/org/helios/Helios.java
+++ b/src/org/helios/Helios.java
@@ -23,7 +23,7 @@
import org.helios.gateway.GatewayHandler;
import org.helios.gateway.GatewayHandlerFactory;
import org.helios.infra.AvailableAssociationHandler;
-import org.helios.infra.RateReporter;
+import org.helios.infra.ReportProcessor;
import org.helios.infra.UnavailableAssociationHandler;
import org.helios.service.Service;
import org.helios.service.ServiceHandler;
@@ -46,7 +46,7 @@ public class Helios implements AutoCloseable, ErrorHandler, AvailableImageHandle
private final Long2ObjectHashMap> serviceSubscriptionRepository;
private final Long2ObjectHashMap> gatewaySubscriptionRepository;
- private RateReporter reporter;
+ private ReportProcessor reporter;
public Helios()
{
@@ -65,6 +65,7 @@ public Helios(final HeliosContext context, final HeliosDriver driver)
final Aeron.Context aeronContext = new Aeron.Context()
.errorHandler(this).availableImageHandler(this).unavailableImageHandler(this);
+
if (context.isMediaDriverEmbedded())
{
aeronContext.aeronDirectoryName(driver.mediaDriver().aeronDirectoryName());
@@ -78,7 +79,7 @@ public Helios(final HeliosContext context, final HeliosDriver driver)
serviceSubscriptionRepository = new Long2ObjectHashMap<>();
gatewaySubscriptionRepository = new Long2ObjectHashMap<>();
- reporter = context.isReportingEnabled() ? new RateReporter() : null;
+ reporter = context.isReportingEnabled() ? new ReportProcessor(1_000_000_000L, null) : null; // TODO: configure
}
public void start()
@@ -159,21 +160,22 @@ public AeronStream newStream(final String channel, final int streamId)
return new AeronStream(aeron, channel, streamId);
}
- public AeronStream newEmbeddedStream(final int streamId)
+ public AeronStream newIpcStream(final int streamId)
{
return newStream(CommonContext.IPC_CHANNEL, streamId);
}
- public Service addService(final ServiceHandlerFactory factory)
+ public Service addService(final ServiceHandlerFactory factory, final AeronStream reqStream)
{
Objects.requireNonNull(factory, "factory");
+ Objects.requireNonNull(reqStream, "reqStream");
- final HeliosService svc = new HeliosService<>(this, factory);
+ final HeliosService svc = new HeliosService<>(this, factory, reqStream);
serviceList.add(svc);
if (reporter != null)
{
- reporter.addAll(svc.reportList());
+ reporter.add(svc.report());
}
return svc;
@@ -182,15 +184,15 @@ public Service addService(final ServiceHandlerFact
public Service addService(final ServiceHandlerFactory factory,
final AeronStream reqStream, final AeronStream rspStream)
{
- final Service svc = addService(factory);
+ final Service svc = addService(factory, reqStream);
- return svc.addEndPoint(reqStream, rspStream);
+ return svc.addEndPoint(rspStream);
}
- public Service addService(final ServiceHandlerFactory factory,
+ public Service addService(final ServiceHandlerFactory factory, final AeronStream reqStream,
final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler)
{
- final Service svc = addService(factory);
+ final Service svc = addService(factory, reqStream);
svc.availableAssociationHandler(availableHandler).unavailableAssociationHandler(unavailableHandler);
@@ -201,51 +203,49 @@ public Service addService(final ServiceHandlerFact
final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler,
final AeronStream reqStream, final AeronStream rspStream)
{
- final Service svc = addService(factory, availableHandler, unavailableHandler);
+ final Service svc = addService(factory, reqStream, availableHandler, unavailableHandler);
- return svc.addEndPoint(reqStream, rspStream);
+ return svc.addEndPoint(rspStream);
}
- public Gateway addGateway(final GatewayHandlerFactory factory)
+ public Gateway addGateway()
{
- Objects.requireNonNull(factory, "factory");
-
- final HeliosGateway gw = new HeliosGateway<>(this, factory);
+ final HeliosGateway gw = new HeliosGateway<>(this);
gatewayList.add(gw);
if (reporter != null)
{
- reporter.addAll(gw.reportList());
+ reporter.add(gw.report());
}
return gw;
}
- public Gateway addGateway(final GatewayHandlerFactory factory,
+ public T addGateway(final GatewayHandlerFactory factory,
final AeronStream reqStream, final AeronStream rspStream)
{
- final Gateway gw = addGateway(factory);
+ final Gateway gw = addGateway();
- return gw.addEndPoint(reqStream, rspStream);
+ return gw.addEndPoint(reqStream, rspStream, factory);
}
- public Gateway addGateway(final GatewayHandlerFactory factory,
- final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler)
+ public Gateway addGateway(final AvailableAssociationHandler availableHandler,
+ final UnavailableAssociationHandler unavailableHandler)
{
- final Gateway gw = addGateway(factory);
+ final Gateway gw = addGateway();
gw.availableAssociationHandler(availableHandler).unavailableAssociationHandler(unavailableHandler);
return gw;
}
- public Gateway addGateway(final GatewayHandlerFactory factory,
+ public T addGateway(final GatewayHandlerFactory factory,
final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler,
final AeronStream reqStream, final AeronStream rspStream)
{
- final Gateway gw = addGateway(factory, availableHandler, unavailableHandler);
+ final Gateway gw = addGateway(availableHandler, unavailableHandler);
- return gw.addEndPoint(reqStream, rspStream);
+ return gw.addEndPoint(reqStream, rspStream, factory);
}
HeliosContext context()
diff --git a/src/org/helios/HeliosConfiguration.java b/src/org/helios/HeliosConfiguration.java
index 6c678e5..74687db 100644
--- a/src/org/helios/HeliosConfiguration.java
+++ b/src/org/helios/HeliosConfiguration.java
@@ -2,11 +2,11 @@
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
-import org.helios.journal.Journalling;
-import org.helios.journal.strategy.PositionalJournalling;
import org.agrona.LangUtil;
-import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
+import org.agrona.concurrent.SleepingIdleStrategy;
+import org.helios.journal.Journalling;
+import org.helios.journal.strategy.PositionalJournalling;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -49,6 +49,9 @@ public class HeliosConfiguration
public static final long MIN_PARK_NS = getLong("helios.core.back_off.idle.strategy.min_park_ns", 1000);
public static final long MAX_PARK_NS = getLong("helios.core.back_off.idle.strategy.max_park_ns", 100000);
+ public static final int HEARTBEAT_INTERVAL_MS = getInteger("helios.mmb.heartbeat_interval", 1000000); //1000
+ public static final int HEARTBEAT_LIVENESS = getInteger("helios.mmb.heartbeat_liveness", 3);
+
public static Journalling journalStrategy()
{
return newJournalStrategy(JOURNAL_STRATEGY);
@@ -117,7 +120,8 @@ private static IdleStrategy newIdleStrategy(final String strategyClassName)
if (strategyClassName == null)
{
- idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS);
+ //idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS);
+ idleStrategy = new SleepingIdleStrategy(MAX_PARK_NS);
}
else
{
diff --git a/src/org/helios/HeliosContext.java b/src/org/helios/HeliosContext.java
index 969322a..3f8b904 100644
--- a/src/org/helios/HeliosContext.java
+++ b/src/org/helios/HeliosContext.java
@@ -21,6 +21,9 @@ public class HeliosContext
private IdleStrategy publisherIdleStrategy;
private IdleStrategy subscriberIdleStrategy;
+ private int heartbeatInterval;
+ private int heartbeatLiveness;
+
public HeliosContext()
{
setMediaDriverConf(HeliosConfiguration.MEDIA_DRIVER_CONF);
@@ -38,6 +41,9 @@ public HeliosContext()
setWriteIdleStrategy(HeliosConfiguration.writeIdleStrategy());
setPublisherIdleStrategy(HeliosConfiguration.publisherIdleStrategy());
setSubscriberIdleStrategy(HeliosConfiguration.subscriberIdleStrategy());
+
+ setHeartbeatInterval(HeliosConfiguration.HEARTBEAT_INTERVAL_MS);
+ setHeartbeatLiveness(HeliosConfiguration.HEARTBEAT_LIVENESS);
}
public HeliosContext setMediaDriverConf(String mediaDriverConf)
@@ -118,6 +124,18 @@ public HeliosContext setSubscriberIdleStrategy(IdleStrategy subscriberIdleStrate
return this;
}
+ public HeliosContext setHeartbeatInterval(int heartbeatInterval)
+ {
+ this.heartbeatInterval = heartbeatInterval;
+ return this;
+ }
+
+ public HeliosContext setHeartbeatLiveness(int heartbeatLiveness)
+ {
+ this.heartbeatLiveness = heartbeatLiveness;
+ return this;
+ }
+
public String getMediaDriverConf()
{
return mediaDriverConf;
@@ -182,4 +200,14 @@ public IdleStrategy subscriberIdleStrategy()
{
return subscriberIdleStrategy;
}
+
+ public int heartbeatInterval()
+ {
+ return heartbeatInterval;
+ }
+
+ public int heartbeatLiveness()
+ {
+ return heartbeatLiveness;
+ }
}
diff --git a/src/org/helios/HeliosDriver.java b/src/org/helios/HeliosDriver.java
index 1688881..9cf08db 100644
--- a/src/org/helios/HeliosDriver.java
+++ b/src/org/helios/HeliosDriver.java
@@ -15,9 +15,15 @@ public class HeliosDriver implements Closeable
private final MediaDriver mediaDriver;
public HeliosDriver(final HeliosContext context)
+ {
+ this(context, AERON_DIR_NAME_DEFAULT);
+ }
+
+ public HeliosDriver(final HeliosContext context, final String aeronDirectoryName)
{
this(context, new MediaDriver.Context()
- .aeronDirectoryName(AERON_DIR_NAME_DEFAULT)
+ .aeronDirectoryName(aeronDirectoryName)
+ .termBufferSparseFile(false)
.threadingMode(ThreadingMode.SHARED)
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1))
.receiverIdleStrategy(new NoOpIdleStrategy())
diff --git a/src/org/helios/HeliosGateway.java b/src/org/helios/HeliosGateway.java
index f81e903..13d7606 100644
--- a/src/org/helios/HeliosGateway.java
+++ b/src/org/helios/HeliosGateway.java
@@ -14,6 +14,7 @@
import org.helios.gateway.GatewayHandlerFactory;
import org.helios.gateway.GatewayReport;
import org.helios.infra.*;
+import org.helios.mmb.sbe.ComponentType;
import org.helios.util.DirectBufferAllocator;
import org.helios.util.ProcessorHelper;
import org.helios.util.RingBufferPool;
@@ -25,45 +26,46 @@
import static org.agrona.concurrent.ringbuffer.RingBufferDescriptor.TRAILER_LENGTH;
-public class HeliosGateway implements Gateway, AssociationHandler,
+class HeliosGateway implements Gateway, AssociationHandler,
AvailableImageHandler, UnavailableImageHandler
{
private static final int FRAME_COUNT_LIMIT = Integer.getInteger("helios.gateway.poll.frame_count_limit", 10); // TODO: from HeliosContext
+ private static short nextGatewayId = 0;
+ private final short gatewayId;
private final Helios helios;
- private final InputMessageProcessor svcResponseProcessor;
private final RingBufferPool ringBufferPool;
- private final List svcRequestProcessorList;
- private final RingBufferProcessor gatewayProcessor;
+ private final List gwOutputProcessorList;
+ private final List gwInputProcessorList;
+ private final List gatewayProcessorList;
private final List eventProcessorList;
- private final List reportList;
+ private final GatewayReport report;
private AvailableAssociationHandler availableAssociationHandler;
private UnavailableAssociationHandler unavailableAssociationHandler;
- public HeliosGateway(final Helios helios, final GatewayHandlerFactory factory)
+ HeliosGateway(final Helios helios)
{
this.helios = helios;
+ gatewayId = ++nextGatewayId;
ringBufferPool = new RingBufferPool();
- svcRequestProcessorList = new ArrayList<>();
+ gwOutputProcessorList = new ArrayList<>();
+ gwInputProcessorList = new ArrayList<>();
+ gatewayProcessorList = new ArrayList<>();
eventProcessorList = new ArrayList<>();
- reportList = new ArrayList<>();
- final ByteBuffer inputBuffer = DirectBufferAllocator.allocateCacheAligned((16 * 1024) + TRAILER_LENGTH); // TODO: configure
- final RingBuffer inputRingBuffer = new OneToOneRingBuffer(new UnsafeBuffer(inputBuffer));
-
- final T gatewayHandler = factory.createGatewayHandler(ringBufferPool);
- gatewayProcessor = new RingBufferProcessor<>(inputRingBuffer, gatewayHandler, new BusySpinIdleStrategy(), "gwProcessor");
-
- final IdleStrategy pollIdleStrategy = helios.context().subscriberIdleStrategy();
- svcResponseProcessor = new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT, "svcResponseProcessor");
+ report = new GatewayReport();
}
@Override
- public Gateway addEndPoint(final AeronStream reqStream, final AeronStream rspStream)
+ public T addEndPoint(final AeronStream reqStream, final AeronStream rspStream, final GatewayHandlerFactory factory)
{
Objects.requireNonNull(reqStream, "reqStream");
Objects.requireNonNull(rspStream, "rspStream");
+ Objects.requireNonNull(factory, "factory");
+
+ reqStream.componentType = ComponentType.Gateway;
+ reqStream.componentId = gatewayId;
final IdleStrategy idleStrategy = new BusySpinIdleStrategy();
@@ -72,17 +74,34 @@ public Gateway addEndPoint(final AeronStream reqStream, final AeronStream rsp
ringBufferPool.addOutputRingBuffer(reqStream, outputRingBuffer);
- final OutputMessageProcessor svcRequestProcessor =
- new OutputMessageProcessor(outputRingBuffer, reqStream, idleStrategy, "svcRequestProcessor");
+ final int heartbeatInterval = helios.context().heartbeatInterval();
+ final OutputMessageProcessor gwOutputProcessor =
+ new OutputMessageProcessor(outputRingBuffer, reqStream, idleStrategy, heartbeatInterval, "gwOutputProcessor");
+
+ gwOutputProcessorList.add(gwOutputProcessor);
+
+ final ByteBuffer inputBuffer = DirectBufferAllocator.allocateCacheAligned((16 * 1024) + TRAILER_LENGTH); // TODO: configure
+ final RingBuffer inputRingBuffer = new OneToOneRingBuffer(new UnsafeBuffer(inputBuffer));
- svcRequestProcessorList.add(svcRequestProcessor);
+ final T gatewayHandler = factory.createGatewayHandler(ringBufferPool);
+ final RingBufferProcessor gatewayProcessor = new RingBufferProcessor<>(inputRingBuffer, gatewayHandler, new BusySpinIdleStrategy(), "gwProcessor");
+ gatewayProcessorList.add(gatewayProcessor);
- final long subscriptionId = svcResponseProcessor.addSubscription(rspStream);
+ final IdleStrategy pollIdleStrategy = helios.context().subscriberIdleStrategy();
+ final int heartbeatLiveness = helios.context().heartbeatLiveness();
+ final InputMessageProcessor gwInputProcessor =
+ new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT, heartbeatLiveness,
+ rspStream, this, "gwInputProcessor");
+
+ gwInputProcessorList.add(gwInputProcessor);
+
+ final long subscriptionId = gwInputProcessor.subscriptionId();
helios.addGatewaySubscription(subscriptionId, this);
- reportList.add(new GatewayReport(svcRequestProcessor, svcResponseProcessor));
+ report.addRequestProcessor(gwOutputProcessor);
+ report.addResponseProcessor(gwInputProcessor);
- return this;
+ return gatewayHandler;
}
@Override
@@ -97,8 +116,10 @@ public Gateway addEventChannel(final AeronStream eventStream)
ringBufferPool.addEventRingBuffer(eventStream, eventRingBuffer);
+ final int heartbeatLiveness = helios.context().heartbeatLiveness();
final InputMessageProcessor eventProcessor =
- new InputMessageProcessor(eventRingBuffer, readIdleStrategy, FRAME_COUNT_LIMIT, "eventProcessor"); // FIXME: eventProcessor name
+ new InputMessageProcessor(eventRingBuffer, readIdleStrategy, FRAME_COUNT_LIMIT,
+ heartbeatLiveness, eventStream, null, "eventProcessor"); // FIXME: eventProcessor name
eventProcessorList.add(eventProcessor);
@@ -106,24 +127,18 @@ public Gateway addEventChannel(final AeronStream eventStream)
}
@Override
- public List reportList()
+ public Report report()
{
- return reportList;
- }
-
- @Override
- public T handler()
- {
- return gatewayProcessor.handler();
+ return report;
}
@Override
public void start()
{
- ProcessorHelper.start(gatewayProcessor);
+ gatewayProcessorList.forEach(ProcessorHelper::start);
eventProcessorList.forEach(ProcessorHelper::start);
- svcRequestProcessorList.forEach(ProcessorHelper::start);
- ProcessorHelper.start(svcResponseProcessor);
+ gwOutputProcessorList.forEach(ProcessorHelper::start);
+ gwInputProcessorList.forEach(ProcessorHelper::start);
}
@Override
@@ -143,19 +158,13 @@ public Gateway unavailableAssociationHandler(final UnavailableAssociationHand
@Override
public void onAvailableImage(final Image image)
{
- svcResponseProcessor.onAvailableImage(image);
-
- // TODO: send HEARTBEAT to service through GatewayHandler
-
- onAssociationEstablished(); // TODO: remove after HEARTBEAT handling
+ gwInputProcessorList.forEach((p) -> p.onAvailableImage(image));
}
@Override
public void onUnavailableImage(final Image image)
{
- svcResponseProcessor.onUnavailableImage(image);
-
- onAssociationBroken(); // TODO: remove after HEARTBEAT handling
+ gwInputProcessorList.forEach((p) -> p.onUnavailableImage(image));
}
@Override
@@ -179,9 +188,9 @@ public void onAssociationBroken()
@Override
public void close()
{
- CloseHelper.quietClose(svcResponseProcessor);
- svcRequestProcessorList.forEach(CloseHelper::quietClose);
+ gwInputProcessorList.forEach(CloseHelper::quietClose);
+ gwOutputProcessorList.forEach(CloseHelper::quietClose);
eventProcessorList.forEach(CloseHelper::quietClose);
- CloseHelper.quietClose(gatewayProcessor);
+ gatewayProcessorList.forEach(CloseHelper::quietClose);
}
}
diff --git a/src/org/helios/HeliosService.java b/src/org/helios/HeliosService.java
index 5ee2a13..ffc6ef5 100644
--- a/src/org/helios/HeliosService.java
+++ b/src/org/helios/HeliosService.java
@@ -15,6 +15,7 @@
import org.helios.journal.JournalProcessor;
import org.helios.journal.JournalWriter;
import org.helios.journal.Journalling;
+import org.helios.mmb.sbe.ComponentType;
import org.helios.replica.ReplicaHandler;
import org.helios.replica.ReplicaProcessor;
import org.helios.service.Service;
@@ -44,16 +45,18 @@ public class HeliosService implements Service, Asso
private final long TICK_DURATION_NS = TimeUnit.MICROSECONDS.toNanos(100); // TODO: configure
private final int TICKS_PER_WHEEL = 512; // TODO: configure
+ private static short nextServiceId = 0;
+ private final short serviceId;
private final Helios helios;
- private final InputMessageProcessor gwRequestProcessor;
+ private final InputMessageProcessor svcInputProcessor;
private final RingBufferPool ringBufferPool;
private final List gwResponseProcessorList;
private final JournalProcessor journalProcessor;
private final ReplicaProcessor replicaProcessor;
private final RingBufferProcessor serviceProcessor;
private final List eventProcessorList;
- private final List reportList;
+ private final ServiceReport report;
private final TimerWheel timerWheel;
private final SnapshotTimer snapshotTimer;
private final AtomicBoolean timerWheelRunning;
@@ -61,14 +64,14 @@ public class HeliosService implements Service, Asso
private AvailableAssociationHandler availableAssociationHandler;
private UnavailableAssociationHandler unavailableAssociationHandler;
- public HeliosService(final Helios helios, final ServiceHandlerFactory factory)
+ public HeliosService(final Helios helios, final ServiceHandlerFactory factory, final AeronStream reqStream)
{
this.helios = helios;
+ serviceId = ++nextServiceId;
ringBufferPool = new RingBufferPool();
gwResponseProcessorList = new ArrayList<>();
eventProcessorList = new ArrayList<>();
- reportList = new ArrayList<>();
final HeliosContext context = helios.context();
@@ -127,19 +130,25 @@ public HeliosService(final Helios helios, final ServiceHandlerFactory factory
final T serviceHandler = factory.createServiceHandler(ringBufferPool);
serviceProcessor = new RingBufferProcessor<>(
isJournalEnabled ? journalRingBuffer : (isReplicaEnabled ? replicaRingBuffer : inputRingBuffer),
- serviceHandler, new BusySpinIdleStrategy(), "serviceProcessor");
+ serviceHandler, new BusySpinIdleStrategy(), "svcProcessor");
final IdleStrategy pollIdleStrategy = context.subscriberIdleStrategy();
- gwRequestProcessor = new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT, "gwRequestProcessor");
+ final int heartbeatLiveness = context.heartbeatLiveness();
+ svcInputProcessor = new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT,
+ heartbeatLiveness, reqStream, this, "svcInputProcessor");
+
+ final long subscriptionId = svcInputProcessor.subscriptionId();
+ helios.addServiceSubscription(subscriptionId, this);
+
+ report = new ServiceReport(svcInputProcessor);
}
- public Service addEndPoint(final AeronStream reqStream, final AeronStream rspStream)
+ public Service addEndPoint(final AeronStream rspStream)
{
- Objects.requireNonNull(reqStream, "reqStream");
Objects.requireNonNull(rspStream, "rspStream");
- final long subscriptionId = gwRequestProcessor.addSubscription(reqStream);
- helios.addServiceSubscription(subscriptionId, this);
+ rspStream.componentType = ComponentType.Service;
+ rspStream.componentId = serviceId;
final IdleStrategy writeIdleStrategy = helios.context().writeIdleStrategy();
@@ -148,14 +157,13 @@ public Service addEndPoint(final AeronStream reqStream, final AeronStream rsp
ringBufferPool.addOutputRingBuffer(rspStream, outputRingBuffer);
- final OutputMessageProcessor gwResponseProcessor =
- new OutputMessageProcessor(outputRingBuffer, rspStream, writeIdleStrategy, "gwResponseProcessor"); // FIXME: gwResponseProcessor name
+ final int heartbeatInterval = helios.context().heartbeatInterval();
+ final OutputMessageProcessor svcOutputProcessor =
+ new OutputMessageProcessor(outputRingBuffer, rspStream, writeIdleStrategy, heartbeatInterval, "svcOutputProcessor"); // FIXME: gwResponseProcessor name
- gwResponseProcessorList.add(gwResponseProcessor);
+ gwResponseProcessorList.add(svcOutputProcessor);
- final long publicationId = gwResponseProcessor.handler().outputPublicationId();
-
- reportList.add(new ServiceReport(gwRequestProcessor, gwResponseProcessor));
+ report.addResponseProcessor(svcOutputProcessor);
return this;
}
@@ -165,6 +173,9 @@ public Service addEventChannel(final AeronStream eventStream)
{
Objects.requireNonNull(eventStream, "eventStream");
+ eventStream.componentType = ComponentType.Service;
+ eventStream.componentId = serviceId;
+
final IdleStrategy writeIdleStrategy = helios.context().writeIdleStrategy();
final ByteBuffer eventBuffer = DirectBufferAllocator.allocateCacheAligned((16 * 1024) + TRAILER_LENGTH); // TODO: configure
@@ -172,8 +183,9 @@ public Service addEventChannel(final AeronStream eventStream)
ringBufferPool.addEventRingBuffer(eventStream, eventRingBuffer);
+ final int heartbeatInterval = helios.context().heartbeatInterval();
final OutputMessageProcessor eventProcessor =
- new OutputMessageProcessor(eventRingBuffer, eventStream, writeIdleStrategy, "eventProcessor"); // FIXME: eventProcessor name
+ new OutputMessageProcessor(eventRingBuffer, eventStream, writeIdleStrategy, heartbeatInterval, "eventProcessor"); // FIXME: eventProcessor name
eventProcessorList.add(eventProcessor);
@@ -181,9 +193,9 @@ public Service addEventChannel(final AeronStream eventStream)
}
@Override
- public List reportList()
+ public Report report()
{
- return reportList;
+ return report;
}
@Override
@@ -200,7 +212,7 @@ public void start()
ProcessorHelper.start(journalProcessor);
eventProcessorList.forEach(ProcessorHelper::start);
gwResponseProcessorList.forEach(ProcessorHelper::start);
- ProcessorHelper.start(gwRequestProcessor);
+ ProcessorHelper.start(svcInputProcessor);
timerExecutor.execute(() -> {
while (timerWheelRunning.get()) {
@@ -227,17 +239,13 @@ public Service unavailableAssociationHandler(final UnavailableAssociationHand
@Override
public void onAvailableImage(final Image image)
{
- gwRequestProcessor.onAvailableImage(image);
-
- onAssociationEstablished();
+ svcInputProcessor.onAvailableImage(image);
}
@Override
public void onUnavailableImage(final Image image)
{
- gwRequestProcessor.onUnavailableImage(image);
-
- onAssociationBroken();
+ svcInputProcessor.onUnavailableImage(image);
}
@Override
@@ -265,7 +273,7 @@ public void close()
timerWheelRunning.set(false);
timerExecutor.shutdown();
- CloseHelper.quietClose(gwRequestProcessor);
+ CloseHelper.quietClose(svcInputProcessor);
gwResponseProcessorList.forEach(CloseHelper::quietClose);
eventProcessorList.forEach(CloseHelper::quietClose);
CloseHelper.quietClose(journalProcessor);
diff --git a/src/org/helios/archive/ArchiveProcessor.java b/src/org/helios/archive/ArchiveProcessor.java
index c3203ee..517bb68 100644
--- a/src/org/helios/archive/ArchiveProcessor.java
+++ b/src/org/helios/archive/ArchiveProcessor.java
@@ -5,12 +5,10 @@
import com.lmax.disruptor.util.DaemonThreadFactory;
import org.helios.infra.Processor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class ArchiveProcessor implements Processor
{
private final Disruptor eventDisruptor;
- private final AtomicBoolean running;
+ private volatile boolean running;
private final Thread archiveThread;
@SuppressWarnings("unchecked")
@@ -20,21 +18,27 @@ public ArchiveProcessor(final EventFactory eventFactory, int ringBufferSize,
eventDisruptor = new Disruptor<>(eventFactory, ringBufferSize, DaemonThreadFactory.INSTANCE);
eventDisruptor.handleEventsWith(new ArchiveEventHandler<>(eventClass, batchSize, batchHandler));
- running = new AtomicBoolean(false);
+ running = false;
archiveThread = new Thread(this, "archiveProcessor");
}
+ @Override
+ public String name()
+ {
+ return archiveThread.getName();
+ }
+
@Override
public void start()
{
- running.set(true);
+ running = true;
archiveThread.start();
}
@Override
public void run()
{
- while (running.get())
+ while (running)
{
// TODO:
}
@@ -43,7 +47,7 @@ public void run()
@Override
public void close() throws Exception
{
- running.set(false);
+ running = false;
archiveThread.join();
}
}
diff --git a/src/org/helios/gateway/Gateway.java b/src/org/helios/gateway/Gateway.java
index 640836d..d169bec 100644
--- a/src/org/helios/gateway/Gateway.java
+++ b/src/org/helios/gateway/Gateway.java
@@ -2,14 +2,12 @@
import org.helios.AeronStream;
import org.helios.infra.AvailableAssociationHandler;
-import org.helios.infra.RateReport;
+import org.helios.infra.Report;
import org.helios.infra.UnavailableAssociationHandler;
-import java.util.List;
-
public interface Gateway extends AutoCloseable
{
- Gateway addEndPoint(final AeronStream reqStream, final AeronStream rspStream);
+ T addEndPoint(final AeronStream reqStream, final AeronStream rspStream, final GatewayHandlerFactory factory);
Gateway addEventChannel(final AeronStream eventStream);
@@ -17,9 +15,7 @@ public interface Gateway extends AutoCloseable
Gateway unavailableAssociationHandler(final UnavailableAssociationHandler handler);
- List reportList();
-
- T handler();
+ Report report();
void start();
}
diff --git a/src/org/helios/gateway/GatewayReport.java b/src/org/helios/gateway/GatewayReport.java
index e2cb9d3..2ab0ee7 100644
--- a/src/org/helios/gateway/GatewayReport.java
+++ b/src/org/helios/gateway/GatewayReport.java
@@ -1,51 +1,51 @@
package org.helios.gateway;
-import org.helios.infra.InputMessageProcessor;
-import org.helios.infra.OutputMessageProcessor;
-import org.helios.infra.RateReport;
+import org.helios.infra.*;
-import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
-public final class GatewayReport implements RateReport
+public final class GatewayReport implements Report
{
- private final OutputMessageProcessor requestProcessor;
- private final InputMessageProcessor responseProcessor;
+ private final List inputReportList;
+ private final List outputReportList;
- private long lastTimeStamp;
- private long lastSuccessfulWrites;
- private long lastBytesWritten;
- private long lastSuccessfulReads;
+ public GatewayReport()
+ {
+ inputReportList = new ArrayList<>();
+ outputReportList = new ArrayList<>();
+ }
+
+ public void addRequestProcessor(final OutputMessageProcessor requestProcessor)
+ {
+ Objects.requireNonNull(requestProcessor, "requestProcessor");
+
+ outputReportList.add(requestProcessor);
+ }
+
+ public void addResponseProcessor(final InputMessageProcessor responseProcessor)
+ {
+ Objects.requireNonNull(responseProcessor, "responseProcessor");
+
+ inputReportList.add(responseProcessor);
+ }
- public GatewayReport(final OutputMessageProcessor requestProcessor, final InputMessageProcessor responseProcessor)
+ @Override
+ public String name()
{
- this.requestProcessor = requestProcessor;
- this.responseProcessor = responseProcessor;
+ return "GatewayReport";
+ }
- lastTimeStamp = System.currentTimeMillis();
+ @Override
+ public List inputReports()
+ {
+ return inputReportList;
}
@Override
- public void print(final PrintStream stream)
+ public List outputReports()
{
- final long timeStamp = System.currentTimeMillis();
- long successfulWrites = requestProcessor.handler().successfulWrites();
- long bytesWritten = requestProcessor.handler().bytesWritten();
- long successfulReads = responseProcessor.successfulReads();
- long failedReads = responseProcessor.failedReads();
-
- final long duration = timeStamp - lastTimeStamp;
- final long successfulWritesDelta = successfulWrites - lastSuccessfulWrites;
- final long bytesTransferred = bytesWritten - lastBytesWritten;
- final long successfulReadsDelta = successfulReads - lastSuccessfulReads;
-
- final double failureRatio = failedReads / (double)(successfulReads + failedReads);
-
- stream.format("GatewayReport: T %dms OUT %,d messages - %,d bytes - IN %,d messages [read failure ratio: %f]\n",
- duration, successfulWritesDelta, bytesTransferred, successfulReadsDelta, failureRatio);
-
- lastTimeStamp = timeStamp;
- lastSuccessfulWrites = successfulWrites;
- lastBytesWritten = bytesWritten;
- lastSuccessfulReads = successfulReads;
+ return outputReportList;
}
}
diff --git a/src/org/helios/heartbeat/Heartbeat.java b/src/org/helios/heartbeat/Heartbeat.java
deleted file mode 100644
index b5b35c7..0000000
--- a/src/org/helios/heartbeat/Heartbeat.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.helios.heartbeat;
-
-import org.agrona.concurrent.IdleStrategy;
-import org.agrona.concurrent.ringbuffer.RingBuffer;
-import org.helios.infra.MessageTypes;
-
-import java.util.Objects;
-
-import static org.helios.mmb.HeartbeatMessageFactory.MESSAGE_LENGTH;
-import static org.helios.mmb.HeartbeatMessageFactory.heartbeatBuffer;
-
-public final class Heartbeat
-{
- public static void writeMessage(final RingBuffer outputRingBuffer, final IdleStrategy idleStrategy)
- {
- Objects.requireNonNull(idleStrategy, "idleStrategy");
-
- while (!outputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, heartbeatBuffer, 0, MESSAGE_LENGTH))
- {
- idleStrategy.idle(0);
- }
- }
-}
diff --git a/src/org/helios/infra/ConsoleReporter.java b/src/org/helios/infra/ConsoleReporter.java
new file mode 100644
index 0000000..d4f0a6b
--- /dev/null
+++ b/src/org/helios/infra/ConsoleReporter.java
@@ -0,0 +1,51 @@
+package org.helios.infra;
+
+import java.io.PrintStream;
+
+public final class ConsoleReporter implements Reporter
+{
+ private final PrintStream stream = System.out;
+ private long lastTimestamp = System.nanoTime();
+
+ @Override
+ public void onReport(final Report report)
+ {
+ final long currentTimestamp = System.nanoTime();
+ final long duration = currentTimestamp - lastTimestamp;
+ lastTimestamp = currentTimestamp;
+
+ stream.format("%s: T %dms ", report.name(), duration);
+
+ for (final InputReport inputReport : report.inputReports())
+ {
+ final long successfulReads = inputReport.successfulReads();
+ final long failedReads = inputReport.failedReads();
+ final long administrativeMessages = inputReport.administrativeMessages();
+ final long heartbeatReceived = inputReport.heartbeatReceived();
+ final long applicationMessages = inputReport.applicationMessages();
+ final long bytesRead = inputReport.bytesRead();
+
+ final double failureRatio = failedReads / (double)(successfulReads + failedReads);
+
+ stream.format("IN %s: %,d messages (%,d ADMIN %,d HBT %,d APP) - %,d failed - %,d bytes [read failure ratio: %f]\n",
+ inputReport.name(), successfulReads, administrativeMessages, heartbeatReceived, applicationMessages,
+ failedReads, bytesRead, failureRatio);
+ }
+
+ for (final OutputReport outputReport : report.outputReports())
+ {
+ final long successfulWrites = outputReport.successfulWrites();
+ final long failedWrites = outputReport.failedWrites();
+ final long heartbeatSent = outputReport.heartbeatSent();
+ final long bytesWritten = outputReport.bytesWritten();
+ final long successfulBufferReads = outputReport.successfulBufferReads();
+ final long failedBufferReads = outputReport.failedBufferReads();
+
+ final double failureRatio = failedWrites / (double)(successfulWrites + failedWrites);
+
+ stream.format("OUT %s: %,d messages (%,d HBT) - %,d failed - %,d bytes [write failure ratio: %f] {buffer %,d reads %,d failed}\n",
+ outputReport.name(), successfulWrites, heartbeatSent, failedWrites, bytesWritten, failureRatio,
+ successfulBufferReads, failedBufferReads);
+ }
+ }
+}
diff --git a/src/org/helios/infra/Heartbeat.java b/src/org/helios/infra/Heartbeat.java
deleted file mode 100644
index f4703c4..0000000
--- a/src/org/helios/infra/Heartbeat.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.helios.infra;
-
-import org.agrona.concurrent.IdleStrategy;
-import org.agrona.concurrent.ringbuffer.RingBuffer;
-
-import java.util.Objects;
-
-import static org.helios.mmb.HeartbeatMessageFactory.heartbeatBuffer;
-
-public final class Heartbeat
-{
- private static final int MESSAGE_LENGTH = 128;
-
- public static void writeMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy)
- {
- Objects.requireNonNull(idleStrategy, "idleStrategy");
-
- while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, heartbeatBuffer, 0, MESSAGE_LENGTH))
- {
- idleStrategy.idle(0);
- }
- }
-}
diff --git a/src/org/helios/infra/InputMessageHandler.java b/src/org/helios/infra/InputMessageHandler.java
index 6fdf114..1cd1f28 100644
--- a/src/org/helios/infra/InputMessageHandler.java
+++ b/src/org/helios/infra/InputMessageHandler.java
@@ -5,29 +5,124 @@
import org.agrona.DirectBuffer;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ringbuffer.RingBuffer;
+import org.helios.mmb.sbe.HeartbeatDecoder;
+import org.helios.mmb.sbe.MessageHeaderDecoder;
+import org.helios.mmb.sbe.ShutdownDecoder;
+import org.helios.mmb.sbe.StartupDecoder;
-public class InputMessageHandler implements FragmentHandler, AutoCloseable
+import static org.agrona.UnsafeAccess.UNSAFE;
+
+class InputMessageHandler implements FragmentHandler
{
+ private static final long HEARTBEAT_RECEIVED_OFFSET;
+ private static final long ADMINISTRATIVE_MESSAGES_OFFSET;
+ private static final long APPLICATION_MESSAGES_OFFSET;
+ private static final long BYTES_READ_OFFSET;
+
+ private volatile long heartbeatReceived = 0;
+ private volatile long administrativeMessages = 0;
+ private volatile long applicationMessages = 0;
+ private volatile long bytesRead = 0;
+
private final RingBuffer inputRingBuffer;
private final IdleStrategy idleStrategy;
+ private final AssociationHandler associationHandler;
+ private final MessageHeaderDecoder messageHeaderDecoder;
+ private final StartupDecoder startupDecoder;
+ private final ShutdownDecoder shutdownDecoder;
- public InputMessageHandler(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy)
+ InputMessageHandler(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy,
+ final AssociationHandler associationHandler)
{
this.inputRingBuffer = inputRingBuffer;
this.idleStrategy = idleStrategy;
+ this.associationHandler = associationHandler;
+
+ messageHeaderDecoder = new MessageHeaderDecoder();
+ startupDecoder = new StartupDecoder();
+ shutdownDecoder = new ShutdownDecoder();
}
@Override
public void onFragment(DirectBuffer buffer, int offset, int length, Header header)
{
- while (!inputRingBuffer.write(MessageTypes.APPLICATION_MSG_ID, buffer, offset, length))
+ messageHeaderDecoder.wrap(buffer, offset);
+
+ final int templateId = messageHeaderDecoder.templateId();
+ if (templateId == HeartbeatDecoder.TEMPLATE_ID)
+ {
+ UNSAFE.putOrderedLong(this, ADMINISTRATIVE_MESSAGES_OFFSET, administrativeMessages + 1);
+ UNSAFE.putOrderedLong(this, HEARTBEAT_RECEIVED_OFFSET, heartbeatReceived + 1);
+ }
+ else if (templateId == StartupDecoder.TEMPLATE_ID && associationHandler != null)
+ {
+ UNSAFE.putOrderedLong(this, ADMINISTRATIVE_MESSAGES_OFFSET, administrativeMessages + 1);
+
+ offset += messageHeaderDecoder.encodedLength();
+
+ startupDecoder.wrap(buffer, offset, messageHeaderDecoder.blockLength(), messageHeaderDecoder.version());
+
+ // TODO: add component type/identifier to AssociationHandler hooks
+
+ associationHandler.onAssociationEstablished();
+ }
+ else if (templateId == ShutdownDecoder.TEMPLATE_ID && associationHandler != null)
+ {
+ UNSAFE.putOrderedLong(this, ADMINISTRATIVE_MESSAGES_OFFSET, administrativeMessages + 1);
+
+ offset += messageHeaderDecoder.encodedLength();
+
+ shutdownDecoder.wrap(buffer, offset, messageHeaderDecoder.blockLength(), messageHeaderDecoder.version());
+
+ // TODO: add component type/identifier to AssociationHandler hooks
+
+ associationHandler.onAssociationBroken();
+ }
+ else
{
- idleStrategy.idle(0);
+ UNSAFE.putOrderedLong(this, APPLICATION_MESSAGES_OFFSET, applicationMessages + 1);
+
+ while (!inputRingBuffer.write(MessageTypes.APPLICATION_MSG_ID, buffer, offset, length))
+ {
+ idleStrategy.idle(0);
+ }
}
+
+ UNSAFE.putOrderedLong(this, BYTES_READ_OFFSET, bytesRead + length);
}
- @Override
- public void close() throws Exception
+ long heartbeatReceived()
+ {
+ return heartbeatReceived;
+ }
+
+ long administrativeMessages()
+ {
+ return administrativeMessages;
+ }
+
+ long applicationMessages()
+ {
+ return applicationMessages;
+ }
+
+ long bytesRead()
+ {
+ return bytesRead;
+ }
+
+ static
{
+ try
+ {
+ HEARTBEAT_RECEIVED_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("heartbeatReceived"));
+ ADMINISTRATIVE_MESSAGES_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("administrativeMessages"));
+ APPLICATION_MESSAGES_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("applicationMessages"));
+ BYTES_READ_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("bytesRead"));
+ }
+ catch (final Exception ex)
+ {
+ throw new RuntimeException(ex);
+ }
}
}
diff --git a/src/org/helios/infra/InputMessageProcessor.java b/src/org/helios/infra/InputMessageProcessor.java
index e0e5988..22cd256 100644
--- a/src/org/helios/infra/InputMessageProcessor.java
+++ b/src/org/helios/infra/InputMessageProcessor.java
@@ -5,15 +5,11 @@
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.helios.AeronStream;
-import org.helios.snapshot.Snapshot;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.helios.mmb.SnapshotMessage;
import static org.agrona.UnsafeAccess.UNSAFE;
-public class InputMessageProcessor implements Processor, AvailableImageHandler, UnavailableImageHandler
+public class InputMessageProcessor implements Processor, AvailableImageHandler, UnavailableImageHandler, InputReport
{
private static final long SUCCESSFUL_READS_OFFSET;
private static final long FAILED_READS_OFFSET;
@@ -22,68 +18,93 @@ public class InputMessageProcessor implements Processor, AvailableImageHandler,
private volatile long failedReads = 0;
private final RingBuffer inputRingBuffer;
- private final InputMessageHandler handler;
- private final Set inputSubscriptionList;
+ private final Subscription inputSubscription;
+ private final InputMessageHandler inputMessageHandler;
+ private final FragmentAssembler dataHandler;
+ private final SnapshotMessage snapshotMessage;
private final int frameCountLimit;
private final IdleStrategy idleStrategy;
- private final AtomicBoolean running;
+ private final long maxHeartbeatLiveness;
+ private long heartbeatLiveness;
+ private volatile boolean running;
+ private volatile boolean polling;
private final Thread processorThread;
- public InputMessageProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy, final int frameCountLimit, final String threadName)
+ public InputMessageProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy,
+ final int frameCountLimit, final long maxHeartbeatLiveness, final AeronStream stream,
+ final AssociationHandler associationHandler, final String threadName)
{
this.inputRingBuffer = inputRingBuffer;
this.idleStrategy = idleStrategy;
this.frameCountLimit = frameCountLimit;
+ this.maxHeartbeatLiveness = maxHeartbeatLiveness;
+
+ inputSubscription = stream.aeron.addSubscription(stream.channel, stream.streamId);
+ inputMessageHandler = new InputMessageHandler(inputRingBuffer, idleStrategy, associationHandler);
+ dataHandler = new FragmentAssembler(inputMessageHandler);
+ snapshotMessage = new SnapshotMessage();
- handler = new InputMessageHandler(inputRingBuffer, idleStrategy);
- inputSubscriptionList = new LinkedHashSet<>();
+ heartbeatLiveness = maxHeartbeatLiveness;
- running = new AtomicBoolean(false);
+ running = false;
+ polling = false;
processorThread = new Thread(this, threadName);
}
- public long addSubscription(final AeronStream stream)
+ @Override
+ public String name()
{
- final Subscription inputSubscription = stream.aeron.addSubscription(stream.channel, stream.streamId);
- inputSubscriptionList.add(inputSubscription);
-
- return inputSubscription.registrationId();
+ return processorThread.getName();
}
@Override
public void start()
{
- running.set(true);
+ running = true;
processorThread.start();
}
@Override
public void run()
{
- // First of all, write the Load Data Snapshot message into the input pipeline.
- Snapshot.writeLoadMessage(inputRingBuffer, idleStrategy); // TODO: is this the right place?
-
- // Poll ALL the input subscriptions for incoming data until running.
- final FragmentAssembler dataHandler = new FragmentAssembler(handler);
+ // First of all, write the Load Data SnapshotMessage message into the input pipeline.
+ snapshotMessage.writeLoadMessage(inputRingBuffer, idleStrategy); // TODO: is this the right place?
+ // Poll the input subscription for incoming data until running.
int idleCount = 0;
- while (running.get())
+ while (running)
{
- int fragmentsRead = 0;
- for (final Subscription inputSubscription: inputSubscriptionList)
+ if (polling)
{
- fragmentsRead += inputSubscription.poll(dataHandler, frameCountLimit);
- }
-
- if (0 == fragmentsRead)
- {
- UNSAFE.putOrderedLong(this, FAILED_READS_OFFSET, failedReads + 1);
- idleStrategy.idle(idleCount++);
+ final int fragmentsRead = inputSubscription.poll(dataHandler, frameCountLimit);
+ if (0 == fragmentsRead)
+ {
+ // No incoming data from poll
+ heartbeatLiveness--;
+ if (heartbeatLiveness == 0)
+ {
+ // TODO: notify heartbeat lost
+
+ heartbeatLiveness = maxHeartbeatLiveness;
+ }
+
+ // Update statistics
+ UNSAFE.putOrderedLong(this, FAILED_READS_OFFSET, failedReads + 1);
+ idleStrategy.idle(idleCount++);
+ }
+ else
+ {
+ // Incoming data arrived from poll (it DOES NOT matter if heartbeat or not)
+ heartbeatLiveness = maxHeartbeatLiveness;
+
+ // Update statistics
+ UNSAFE.putOrderedLong(this, SUCCESSFUL_READS_OFFSET, successfulReads + 1);
+ idleCount = 0;
+ }
}
else
{
- UNSAFE.putOrderedLong(this, SUCCESSFUL_READS_OFFSET, successfulReads + 1);
- idleCount = 0;
+ idleStrategy.idle(idleCount++);
}
}
}
@@ -91,35 +112,75 @@ public void run()
@Override
public void onAvailableImage(final Image image)
{
- // TODO: when at least one image is present resume subscription polling
+ // When at least one image is present resume subscription polling
+ if (inputSubscription.imageCount() > 0)
+ {
+ polling = true;
+ }
}
@Override
public void onUnavailableImage(final Image image)
{
- // TODO: when no more images are present suspend subscription polling
+ dataHandler.freeSessionBuffer(image.sessionId());
+
+ // When no more images are present suspend subscription polling
+ if (inputSubscription.imageCount() == 0)
+ {
+ polling = false;
+ }
}
@Override
public void close() throws Exception
{
- running.set(false);
+ running = false;
processorThread.join();
- inputSubscriptionList.forEach(CloseHelper::quietClose);
- CloseHelper.quietClose(handler);
+ CloseHelper.quietClose(inputSubscription);
+ }
+
+ public long subscriptionId()
+ {
+ return inputSubscription.registrationId();
}
+ @Override
public long successfulReads()
{
return successfulReads;
}
+ @Override
public long failedReads()
{
return failedReads;
}
+ @Override
+ public long heartbeatReceived()
+ {
+ return inputMessageHandler.heartbeatReceived();
+ }
+
+ @Override
+ public long administrativeMessages()
+ {
+ return inputMessageHandler.administrativeMessages();
+ }
+
+ @Override
+ public long applicationMessages()
+ {
+ return inputMessageHandler.applicationMessages();
+ }
+
+ @Override
+ public long bytesRead()
+ {
+ return inputMessageHandler.bytesRead();
+ }
+
static
{
try
diff --git a/src/org/helios/infra/InputReport.java b/src/org/helios/infra/InputReport.java
new file mode 100644
index 0000000..4722638
--- /dev/null
+++ b/src/org/helios/infra/InputReport.java
@@ -0,0 +1,18 @@
+package org.helios.infra;
+
+public interface InputReport
+{
+ String name();
+
+ long successfulReads();
+
+ long failedReads();
+
+ long bytesRead();
+
+ long administrativeMessages();
+
+ long applicationMessages();
+
+ long heartbeatReceived();
+}
diff --git a/src/org/helios/infra/OutputMessageHandler.java b/src/org/helios/infra/OutputMessageHandler.java
index 5d1ac4e..e9355d4 100644
--- a/src/org/helios/infra/OutputMessageHandler.java
+++ b/src/org/helios/infra/OutputMessageHandler.java
@@ -1,12 +1,15 @@
package org.helios.infra;
import io.aeron.Publication;
+import io.aeron.logbuffer.BufferClaim;
import org.agrona.CloseHelper;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.MessageHandler;
import org.helios.AeronStream;
+import org.helios.mmb.sbe.MessageHeaderDecoder;
+import org.helios.mmb.sbe.ShutdownDecoder;
import static org.agrona.UnsafeAccess.UNSAFE;
@@ -22,26 +25,39 @@ public class OutputMessageHandler implements MessageHandler, AutoCloseable
private final Publication outputPublication;
private final IdleStrategy idleStrategy;
+ private final BufferClaim bufferClaim;
- public OutputMessageHandler(final AeronStream outputStream, final IdleStrategy idleStrategy)
+ OutputMessageHandler(final AeronStream outputStream, final IdleStrategy idleStrategy)
{
this.idleStrategy = idleStrategy;
outputPublication = outputStream.aeron.addPublication(outputStream.channel, outputStream.streamId);
- }
+ bufferClaim = new BufferClaim();
+ }
+ private MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
@Override
public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length)
{
try
{
- while (outputPublication.offer(buffer, index, length) < 0L)
+ messageHeaderDecoder.wrap(buffer, index);
+ if (messageHeaderDecoder.templateId() == ShutdownDecoder.TEMPLATE_ID)
+ {
+ System.out.println("ShutdownDecoder.TEMPLATE_ID matched!");
+ }
+
+ while (outputPublication.tryClaim(length, bufferClaim) <= 0)
{
UNSAFE.putOrderedLong(this, FAILED_WRITES_OFFSET, failedWrites + 1);
idleStrategy.idle(0);
}
+ final int offset = bufferClaim.offset();
+ bufferClaim.buffer().putBytes(offset, buffer, index, length);
+ bufferClaim.commit();
+
UNSAFE.putOrderedLong(this, SUCCESSFUL_WRITES_OFFSET, successfulWrites + 1);
UNSAFE.putOrderedLong(this, BYTES_WRITTEN_OFFSET, bytesWritten + length);
}
@@ -62,17 +78,17 @@ public long outputPublicationId()
return this.outputPublication.registrationId();
}
- public long successfulWrites()
+ long successfulWrites()
{
return successfulWrites;
}
- public long failedWrites()
+ long failedWrites()
{
return failedWrites;
}
- public long bytesWritten()
+ long bytesWritten()
{
return bytesWritten;
}
diff --git a/src/org/helios/infra/OutputMessageProcessor.java b/src/org/helios/infra/OutputMessageProcessor.java
index b3c7760..adb85d9 100644
--- a/src/org/helios/infra/OutputMessageProcessor.java
+++ b/src/org/helios/infra/OutputMessageProcessor.java
@@ -1,14 +1,149 @@
package org.helios.infra;
+import org.agrona.CloseHelper;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.helios.AeronStream;
+import org.helios.mmb.HeartbeatMessage;
+import org.helios.mmb.StartupMessage;
-public class OutputMessageProcessor extends RingBufferProcessor
+import static org.agrona.UnsafeAccess.UNSAFE;
+
+public class OutputMessageProcessor implements Processor, OutputReport
{
+ private static final long SUCCESSFUL_READS_OFFSET;
+ private static final long FAILED_READS_OFFSET;
+ private static final long HEARTBEAT_SENT_OFFSET;
+
+ private volatile long successfulBufferReads = 0;
+ private volatile long failedBufferReads = 0;
+ private volatile long heartbeatSent = 0;
+
+ private final RingBuffer outputRingBuffer;
+ private final IdleStrategy idleStrategy;
+ private final OutputMessageHandler handler;
+ private final int heartbeatInterval;
+ private final StartupMessage startupMessage;
+ private final HeartbeatMessage heartbeatMessage;
+ private volatile boolean running;
+ private final Thread processorThread;
+
public OutputMessageProcessor(final RingBuffer outputRingBuffer, final AeronStream outputStream,
- final IdleStrategy idleStrategy, final String threadName)
+ final IdleStrategy idleStrategy, final int heartbeatInterval, final String threadName)
+ {
+ this.outputRingBuffer = outputRingBuffer;
+ this.idleStrategy = idleStrategy;
+ this.heartbeatInterval = heartbeatInterval;
+
+ handler = new OutputMessageHandler(outputStream, idleStrategy);
+ startupMessage = new StartupMessage(outputStream.componentType, outputStream.componentId);
+ heartbeatMessage = new HeartbeatMessage(outputStream.componentType, outputStream.componentId);
+
+ running = false;
+ processorThread = new Thread(this, threadName);
+ }
+
+ @Override
+ public String name()
+ {
+ return processorThread.getName();
+ }
+
+ @Override
+ public void start()
+ {
+ running = true;
+ processorThread.start();
+ }
+
+ @Override
+ public void run()
+ {
+ // Send StartupMessage MMB message
+ startupMessage.write(handler);
+
+ long heartbeatTimeMillis = System.currentTimeMillis() + heartbeatInterval;
+
+ while (running)
+ {
+ final int readCount = outputRingBuffer.read(handler);
+ if (0 == readCount)
+ {
+ UNSAFE.putOrderedLong(this, FAILED_READS_OFFSET, failedBufferReads + 1);
+
+ idleStrategy.idle(0);
+
+ if (System.currentTimeMillis() > heartbeatTimeMillis)
+ {
+ heartbeatTimeMillis = System.currentTimeMillis() + heartbeatInterval;
+ heartbeatMessage.write(handler);
+
+ UNSAFE.putOrderedLong(this, HEARTBEAT_SENT_OFFSET, heartbeatSent + 1);
+ }
+ }
+ else
+ {
+ UNSAFE.putOrderedLong(this, SUCCESSFUL_READS_OFFSET, successfulBufferReads + 1);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ running = false;
+ processorThread.join();
+
+ CloseHelper.close(handler);
+ }
+
+ @Override
+ public long successfulWrites()
+ {
+ return handler.successfulWrites();
+ }
+
+ @Override
+ public long failedWrites()
+ {
+ return handler.failedWrites();
+ }
+
+ @Override
+ public long bytesWritten()
+ {
+ return handler.bytesWritten();
+ }
+
+ @Override
+ public long successfulBufferReads()
+ {
+ return successfulBufferReads;
+ }
+
+ @Override
+ public long failedBufferReads()
+ {
+ return failedBufferReads;
+ }
+
+ @Override
+ public long heartbeatSent()
+ {
+ return heartbeatSent;
+ }
+
+ static
{
- super(outputRingBuffer, new OutputMessageHandler(outputStream, idleStrategy), idleStrategy, threadName);
+ try
+ {
+ SUCCESSFUL_READS_OFFSET = UNSAFE.objectFieldOffset(OutputMessageProcessor.class.getDeclaredField("successfulBufferReads"));
+ FAILED_READS_OFFSET = UNSAFE.objectFieldOffset(OutputMessageProcessor.class.getDeclaredField("failedBufferReads"));
+ HEARTBEAT_SENT_OFFSET = UNSAFE.objectFieldOffset(OutputMessageProcessor.class.getDeclaredField("heartbeatSent"));
+ }
+ catch (final Exception ex)
+ {
+ throw new RuntimeException(ex);
+ }
}
}
diff --git a/src/org/helios/infra/OutputReport.java b/src/org/helios/infra/OutputReport.java
new file mode 100644
index 0000000..a56b7ed
--- /dev/null
+++ b/src/org/helios/infra/OutputReport.java
@@ -0,0 +1,18 @@
+package org.helios.infra;
+
+public interface OutputReport
+{
+ String name();
+
+ long successfulWrites();
+
+ long failedWrites();
+
+ long bytesWritten();
+
+ long successfulBufferReads();
+
+ long failedBufferReads();
+
+ long heartbeatSent();
+}
diff --git a/src/org/helios/infra/Processor.java b/src/org/helios/infra/Processor.java
index 3bef2ec..06a68e2 100644
--- a/src/org/helios/infra/Processor.java
+++ b/src/org/helios/infra/Processor.java
@@ -2,5 +2,7 @@
public interface Processor extends Runnable, AutoCloseable
{
+ String name();
+
void start();
}
diff --git a/src/org/helios/infra/RateReport.java b/src/org/helios/infra/RateReport.java
deleted file mode 100644
index 50e7417..0000000
--- a/src/org/helios/infra/RateReport.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.helios.infra;
-
-import java.io.PrintStream;
-
-public interface RateReport
-{
- void print(final PrintStream stream);
-}
diff --git a/src/org/helios/infra/RateReporter.java b/src/org/helios/infra/RateReporter.java
deleted file mode 100644
index 0a45699..0000000
--- a/src/org/helios/infra/RateReporter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.helios.infra;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.LockSupport;
-
-public final class RateReporter implements Processor
-{
- private final List reportList;
- private final AtomicBoolean running;
- private final Thread reporterThread;
-
- public RateReporter()
- {
- reportList = new ArrayList<>();
-
- running = new AtomicBoolean(false);
- reporterThread = new Thread(this, "rateReporter");
- }
-
- public void addAll(final List reports)
- {
- reportList.addAll(reports);
- }
-
- @Override
- public void start()
- {
- running.set(true);
- reporterThread.start();
- }
-
- @Override
- public void run()
- {
- while (running.get())
- {
- LockSupport.parkNanos(1_000_000_000L); // TODO: configure
-
- reportList.forEach(report -> report.print(System.out)); // TODO: write to SHM, publish via Aeron, log
- }
- }
-
- @Override
- public void close() throws Exception
- {
- running.set(false);
- reporterThread.join();
- }
-}
diff --git a/src/org/helios/infra/Report.java b/src/org/helios/infra/Report.java
new file mode 100644
index 0000000..0ad6112
--- /dev/null
+++ b/src/org/helios/infra/Report.java
@@ -0,0 +1,12 @@
+package org.helios.infra;
+
+import java.util.List;
+
+public interface Report
+{
+ String name();
+
+ List inputReports();
+
+ List outputReports();
+}
diff --git a/src/org/helios/infra/ReportProcessor.java b/src/org/helios/infra/ReportProcessor.java
new file mode 100644
index 0000000..fc014ee
--- /dev/null
+++ b/src/org/helios/infra/ReportProcessor.java
@@ -0,0 +1,64 @@
+package org.helios.infra;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.locks.LockSupport;
+
+public final class ReportProcessor implements Processor
+{
+ private final long reportInterval;
+ private final Reporter reportingFunc;
+ private final List reportList;
+ private volatile boolean running;
+ private Thread reporterThread;
+
+ public ReportProcessor(final long reportInterval, final Reporter reportingFunc)
+ {
+ Objects.requireNonNull(reportingFunc, "reportingFunc");
+
+ this.reportInterval = reportInterval;
+ this.reportingFunc = reportingFunc;
+ reportList = new ArrayList<>();
+ }
+
+ public void add(final Report report)
+ {
+ Objects.requireNonNull(report, "report");
+
+ reportList.add(report);
+ }
+
+ @Override
+ public String name()
+ {
+ return reporterThread.getName();
+ }
+
+ @Override
+ public void start()
+ {
+ running = true;
+ reporterThread = new Thread(this, "reportProcessor");
+ reporterThread.start();
+ }
+
+ @Override
+ public void run()
+ {
+ do
+ {
+ LockSupport.parkNanos(reportInterval);
+
+ reportList.forEach(reportingFunc::onReport); // TODO: write to SHM, publish via Aeron, log
+ }
+ while (running);
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ running = false;
+ reporterThread.join();
+ }
+}
diff --git a/src/org/helios/infra/Reporter.java b/src/org/helios/infra/Reporter.java
new file mode 100644
index 0000000..90d3c75
--- /dev/null
+++ b/src/org/helios/infra/Reporter.java
@@ -0,0 +1,7 @@
+package org.helios.infra;
+
+@FunctionalInterface
+public interface Reporter
+{
+ void onReport(final Report report);
+}
diff --git a/src/org/helios/infra/RingBufferProcessor.java b/src/org/helios/infra/RingBufferProcessor.java
index dece950..a2b4fb7 100644
--- a/src/org/helios/infra/RingBufferProcessor.java
+++ b/src/org/helios/infra/RingBufferProcessor.java
@@ -6,8 +6,6 @@
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RingBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import static org.agrona.UnsafeAccess.UNSAFE;
public class RingBufferProcessor implements Processor, MessageHandler
@@ -21,7 +19,7 @@ public class RingBufferProcessor imple
private final RingBuffer ringBuffer;
private final IdleStrategy idleStrategy;
private final T handler;
- private final AtomicBoolean running;
+ private volatile boolean running;
private final Thread processorThread;
public RingBufferProcessor(final RingBuffer ringBuffer, final T handler, final IdleStrategy idleStrategy, final String threadName)
@@ -30,21 +28,27 @@ public RingBufferProcessor(final RingBuffer ringBuffer, final T handler, final I
this.handler = handler;
this.idleStrategy = idleStrategy;
- running = new AtomicBoolean(false);
+ running = false;
processorThread = new Thread(this, threadName);
}
+ @Override
+ public String name()
+ {
+ return processorThread.getName();
+ }
+
@Override
public void start()
{
- running.set(true);
+ running = true;
processorThread.start();
}
@Override
public void run()
{
- while (running.get())
+ while (running)
{
final int readCount = ringBuffer.read(this);
if (0 == readCount)
@@ -68,7 +72,7 @@ public void onMessage(int msgTypeId, final MutableDirectBuffer buffer, int index
@Override
public void close() throws Exception
{
- running.set(false);
+ running = false;
processorThread.join();
CloseHelper.close(handler);
diff --git a/src/org/helios/journal/JournalHandler.java b/src/org/helios/journal/JournalHandler.java
index 2698d4c..8ee2676 100644
--- a/src/org/helios/journal/JournalHandler.java
+++ b/src/org/helios/journal/JournalHandler.java
@@ -5,7 +5,7 @@
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RingBuffer;
-import org.helios.snapshot.Snapshot;
+import org.helios.mmb.SnapshotMessage;
import java.util.Objects;
@@ -14,12 +14,15 @@ public class JournalHandler implements MessageHandler, JournalDepletionHandler,
private final JournalWriter writer;
private final RingBuffer nextRingBuffer;
private final IdleStrategy idleStrategy;
+ private final SnapshotMessage snapshotMessage;
public JournalHandler(final JournalWriter writer, final RingBuffer nextRingBuffer, final IdleStrategy idleStrategy)
{
this.writer = writer.depletionHandler(this);
this.nextRingBuffer = Objects.requireNonNull(nextRingBuffer);
this.idleStrategy = Objects.requireNonNull(idleStrategy);
+
+ snapshotMessage = new SnapshotMessage();
}
@Override
@@ -41,7 +44,7 @@ public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int
@Override
public void onJournalDepletion(Journalling journalling)
{
- Snapshot.writeSaveMessage(nextRingBuffer, idleStrategy);
+ snapshotMessage.writeSaveMessage(nextRingBuffer, idleStrategy);
}
@Override
diff --git a/src/org/helios/journal/JournalProcessor.java b/src/org/helios/journal/JournalProcessor.java
index facf8ec..6bcf87a 100644
--- a/src/org/helios/journal/JournalProcessor.java
+++ b/src/org/helios/journal/JournalProcessor.java
@@ -25,6 +25,12 @@ public JournalProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idl
journallerThread = new Thread(this, "journalProcessor");
}
+ @Override
+ public String name()
+ {
+ return journallerThread.getName();
+ }
+
@Override
public void start()
{
diff --git a/src/org/helios/mmb/DataMessage.java b/src/org/helios/mmb/DataMessage.java
new file mode 100644
index 0000000..45cd7a3
--- /dev/null
+++ b/src/org/helios/mmb/DataMessage.java
@@ -0,0 +1,88 @@
+package org.helios.mmb;
+
+import org.agrona.MutableDirectBuffer;
+import org.agrona.concurrent.UnsafeBuffer;
+import org.helios.mmb.sbe.*;
+import org.helios.util.Check;
+import org.helios.util.DirectBufferAllocator;
+
+public final class DataMessage
+{
+ private static final int MESSAGE_LENGTH = 152;
+
+ private final UnsafeBuffer dataBuffer;
+ private int dataBufferOffset;
+ private final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
+ private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
+ private final DataEncoder dataEncoder = new DataEncoder();
+ private final DataDecoder dataDecoder = new DataDecoder();
+
+ public DataMessage()
+ {
+ dataBuffer = new UnsafeBuffer(0, 0);
+ dataBufferOffset = 0;
+ }
+
+ public DataEncoder allocate(final ComponentType componentType, final short componentId, final int dataLength)
+ {
+ Check.enforce(dataLength >= 0, "Negative dataBufferLength");
+
+ dataBuffer.wrap(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH + dataLength));
+
+ int bufferOffset = 0;
+
+ messageHeaderEncoder.wrap(dataBuffer, bufferOffset)
+ .blockLength(dataEncoder.sbeBlockLength())
+ .templateId(dataEncoder.sbeTemplateId())
+ .schemaId(dataEncoder.sbeSchemaId())
+ .version(dataEncoder.sbeSchemaVersion());
+
+ bufferOffset += messageHeaderEncoder.encodedLength();
+
+ dataEncoder.wrap(dataBuffer, bufferOffset)
+ .mmbHeader()
+ .nodeId((short)0)
+ .component()
+ .componentId(componentId)
+ .componentType(componentType);
+
+ dataBufferOffset = bufferOffset;
+
+ return dataEncoder;
+ }
+
+ public DataDecoder wrap(final MutableDirectBuffer buffer, final int offset, final int length)
+ {
+ dataBuffer.wrap(buffer, offset, length);
+
+ int bufferOffset = 0;
+
+ messageHeaderDecoder.wrap(dataBuffer, bufferOffset);
+
+ final int actingBlockLength = messageHeaderDecoder.blockLength();
+ final int actingVersion = messageHeaderDecoder.version();
+
+ bufferOffset += messageHeaderDecoder.encodedLength();
+
+ dataDecoder.wrap(dataBuffer, bufferOffset, actingBlockLength, actingVersion);
+
+ dataBufferOffset = bufferOffset;
+
+ return dataDecoder;
+ }
+
+ public UnsafeBuffer dataBuffer()
+ {
+ return dataBuffer;
+ }
+
+ public int dataBufferOffset()
+ {
+ return dataBufferOffset;
+ }
+
+ public int dataBufferLength()
+ {
+ return dataBuffer.capacity() - MESSAGE_LENGTH;
+ }
+}
diff --git a/src/org/helios/mmb/HeartbeatMessage.java b/src/org/helios/mmb/HeartbeatMessage.java
new file mode 100644
index 0000000..a4d06f2
--- /dev/null
+++ b/src/org/helios/mmb/HeartbeatMessage.java
@@ -0,0 +1,55 @@
+package org.helios.mmb;
+
+import org.agrona.concurrent.UnsafeBuffer;
+import org.helios.infra.MessageTypes;
+import org.helios.infra.OutputMessageHandler;
+import org.helios.mmb.sbe.ComponentType;
+import org.helios.mmb.sbe.HeartbeatEncoder;
+import org.helios.mmb.sbe.MessageHeaderEncoder;
+import org.helios.util.DirectBufferAllocator;
+
+public final class HeartbeatMessage
+{
+ private static final int MESSAGE_LENGTH = 152;
+
+ private final UnsafeBuffer heartbeatBuffer;
+ private final int bufferOffset;
+ private final HeartbeatEncoder heartbeatEncoder;
+
+ public HeartbeatMessage(final ComponentType componentType, final short componentId)
+ {
+ heartbeatBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH));
+ heartbeatEncoder = new HeartbeatEncoder();
+
+ final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
+
+ // Encode the HeartbeatMessage message once and forever
+ int bufferOffset = 0;
+
+ messageHeaderEncoder.wrap(heartbeatBuffer, bufferOffset)
+ .blockLength(heartbeatEncoder.sbeBlockLength())
+ .templateId(heartbeatEncoder.sbeTemplateId())
+ .schemaId(heartbeatEncoder.sbeSchemaId())
+ .version(heartbeatEncoder.sbeSchemaVersion());
+
+ bufferOffset += messageHeaderEncoder.encodedLength();
+
+ heartbeatEncoder.wrap(heartbeatBuffer, bufferOffset)
+ .mmbHeader()
+ .nodeId((short)0)
+ .component()
+ .componentId(componentId)
+ .componentType(componentType);
+
+ this.bufferOffset = bufferOffset;
+ }
+
+ public void write(final OutputMessageHandler outputMessageHandler)
+ {
+ heartbeatEncoder.wrap(heartbeatBuffer, bufferOffset)
+ .mmbHeader()
+ .timestamp(System.nanoTime());
+
+ outputMessageHandler.onMessage(MessageTypes.ADMINISTRATIVE_MSG_ID, heartbeatBuffer, 0, MESSAGE_LENGTH);
+ }
+}
diff --git a/src/org/helios/mmb/HeartbeatMessageFactory.java b/src/org/helios/mmb/HeartbeatMessageFactory.java
deleted file mode 100644
index fb98600..0000000
--- a/src/org/helios/mmb/HeartbeatMessageFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.helios.mmb;
-
-import org.agrona.concurrent.UnsafeBuffer;
-import org.helios.mmb.sbe.HeartbeatEncoder;
-import org.helios.mmb.sbe.MessageHeaderEncoder;
-import org.helios.util.DirectBufferAllocator;
-
-public final class HeartbeatMessageFactory
-{
- public static final int MESSAGE_LENGTH = 128;
-
- public static final UnsafeBuffer heartbeatBuffer;
-
- static
- {
- heartbeatBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH));
-
- final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
-
- // Encode the Heartbeat message once and forever
- final HeartbeatEncoder heartbeatEncoder = new HeartbeatEncoder();
- int bufferOffset = 0;
-
- messageHeaderEncoder.wrap(heartbeatBuffer, bufferOffset)
- .blockLength(heartbeatEncoder.sbeBlockLength())
- .templateId(heartbeatEncoder.sbeTemplateId())
- .schemaId(heartbeatEncoder.sbeSchemaId())
- .version(heartbeatEncoder.sbeSchemaVersion());
-
- bufferOffset += messageHeaderEncoder.encodedLength();
-
- heartbeatEncoder.wrap(heartbeatBuffer, bufferOffset)
- .mmbHeader()
- //.messageNumber(2L)
- .nodeId((short)0)
- .timestamp(System.nanoTime());
- }
-}
diff --git a/src/org/helios/mmb/SnapshotMessage.java b/src/org/helios/mmb/SnapshotMessage.java
new file mode 100644
index 0000000..344baee
--- /dev/null
+++ b/src/org/helios/mmb/SnapshotMessage.java
@@ -0,0 +1,98 @@
+package org.helios.mmb;
+
+import org.agrona.concurrent.IdleStrategy;
+import org.agrona.concurrent.UnsafeBuffer;
+import org.agrona.concurrent.ringbuffer.RingBuffer;
+import org.helios.infra.MessageTypes;
+import org.helios.mmb.sbe.LoadSnapshotEncoder;
+import org.helios.mmb.sbe.MessageHeaderEncoder;
+import org.helios.mmb.sbe.SaveSnapshotEncoder;
+import org.helios.util.DirectBufferAllocator;
+
+import java.util.Objects;
+
+public final class SnapshotMessage
+{
+ private static final int MESSAGE_LENGTH = 152;
+
+ private final UnsafeBuffer loadSnapshotBuffer;
+ private final UnsafeBuffer saveSnapshotBuffer;
+ private final LoadSnapshotEncoder loadSnapshotEncoder = new LoadSnapshotEncoder();
+ private final SaveSnapshotEncoder saveSnapshotEncoder = new SaveSnapshotEncoder();
+ private final int loadBufferOffset;
+ private final int saveBufferOffset;
+
+ public SnapshotMessage()
+ {
+ loadSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH));
+ saveSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH));
+
+ final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
+
+ // Encode the Load Data SnapshotMessage message once and forever
+ int bufferOffset = 0;
+
+ messageHeaderEncoder.wrap(loadSnapshotBuffer, bufferOffset)
+ .blockLength(loadSnapshotEncoder.sbeBlockLength())
+ .templateId(loadSnapshotEncoder.sbeTemplateId())
+ .schemaId(loadSnapshotEncoder.sbeSchemaId())
+ .version(loadSnapshotEncoder.sbeSchemaVersion());
+
+ bufferOffset += messageHeaderEncoder.encodedLength();
+
+ loadSnapshotEncoder.wrap(loadSnapshotBuffer, bufferOffset)
+ .mmbHeader()
+ .messageNumber(0L)
+ .nodeId((short)0)
+ .timestamp(System.nanoTime());
+
+ loadBufferOffset = bufferOffset;
+
+ // Encode the Save Data SnapshotMessage message once and forever
+ bufferOffset = 0;
+
+ messageHeaderEncoder.wrap(saveSnapshotBuffer, bufferOffset)
+ .blockLength(saveSnapshotEncoder.sbeBlockLength())
+ .templateId(saveSnapshotEncoder.sbeTemplateId())
+ .schemaId(saveSnapshotEncoder.sbeSchemaId())
+ .version(saveSnapshotEncoder.sbeSchemaVersion());
+
+ bufferOffset += messageHeaderEncoder.encodedLength();
+
+ saveSnapshotEncoder.wrap(saveSnapshotBuffer, bufferOffset)
+ .mmbHeader()
+ .messageNumber(0L)
+ .nodeId((short)0)
+ .timestamp(System.nanoTime());
+
+ saveBufferOffset = bufferOffset;
+ }
+
+ public void writeLoadMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy)
+ {
+ Objects.requireNonNull(idleStrategy, "idleStrategy");
+
+ loadSnapshotEncoder.wrap(loadSnapshotBuffer, loadBufferOffset)
+ .mmbHeader()
+ .timestamp(System.nanoTime());
+
+ while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, loadSnapshotBuffer, 0, MESSAGE_LENGTH))
+ {
+ idleStrategy.idle(0);
+ }
+ }
+
+ public void writeSaveMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy)
+ {
+ Objects.requireNonNull(idleStrategy, "idleStrategy");
+
+ saveSnapshotEncoder.wrap(saveSnapshotBuffer, saveBufferOffset)
+ .mmbHeader()
+ .timestamp(System.nanoTime());
+
+ while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, saveSnapshotBuffer, 0, MESSAGE_LENGTH))
+ {
+ idleStrategy.idle(0);
+ }
+ }
+}
diff --git a/src/org/helios/mmb/SnapshotMessageFactory.java b/src/org/helios/mmb/SnapshotMessageFactory.java
deleted file mode 100644
index 95d5a14..0000000
--- a/src/org/helios/mmb/SnapshotMessageFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.helios.mmb;
-
-import org.agrona.concurrent.UnsafeBuffer;
-import org.helios.mmb.sbe.LoadSnapshotEncoder;
-import org.helios.mmb.sbe.MessageHeaderEncoder;
-import org.helios.mmb.sbe.SaveSnapshotEncoder;
-import org.helios.util.DirectBufferAllocator;
-
-public final class SnapshotMessageFactory
-{
- public static final int MESSAGE_LENGTH = 128;
-
- public static final UnsafeBuffer saveSnapshotBuffer;
- public static final UnsafeBuffer loadSnapshotBuffer;
-
- static
- {
- saveSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH));
- loadSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH));
-
- final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
-
- // Encode the Load Data Snapshot message once and forever
- final LoadSnapshotEncoder loadSnapshotEncoder = new LoadSnapshotEncoder();
- int bufferOffset = 0;
-
- messageHeaderEncoder.wrap(loadSnapshotBuffer, bufferOffset)
- .blockLength(loadSnapshotEncoder.sbeBlockLength())
- .templateId(loadSnapshotEncoder.sbeTemplateId())
- .schemaId(loadSnapshotEncoder.sbeSchemaId())
- .version(loadSnapshotEncoder.sbeSchemaVersion());
-
- bufferOffset += messageHeaderEncoder.encodedLength();
-
- loadSnapshotEncoder.wrap(loadSnapshotBuffer, bufferOffset)
- .mmbHeader()
- .messageNumber(0L)
- .nodeId((short)0)
- .timestamp(System.nanoTime());
-
- // Encode the Save Data Snapshot message once and forever
- final SaveSnapshotEncoder saveSnapshotEncoder = new SaveSnapshotEncoder();
- bufferOffset = 0;
-
- messageHeaderEncoder.wrap(saveSnapshotBuffer, bufferOffset)
- .blockLength(saveSnapshotEncoder.sbeBlockLength())
- .templateId(saveSnapshotEncoder.sbeTemplateId())
- .schemaId(saveSnapshotEncoder.sbeSchemaId())
- .version(saveSnapshotEncoder.sbeSchemaVersion());
-
- bufferOffset += messageHeaderEncoder.encodedLength();
-
- saveSnapshotEncoder.wrap(saveSnapshotBuffer, bufferOffset)
- .mmbHeader()
- .messageNumber(0L)
- .nodeId((short)0)
- .timestamp(System.nanoTime());
- }
-}
diff --git a/src/org/helios/mmb/StartupMessage.java b/src/org/helios/mmb/StartupMessage.java
new file mode 100644
index 0000000..77fe9bf
--- /dev/null
+++ b/src/org/helios/mmb/StartupMessage.java
@@ -0,0 +1,48 @@
+package org.helios.mmb;
+
+import org.agrona.concurrent.UnsafeBuffer;
+import org.helios.infra.MessageTypes;
+import org.helios.infra.OutputMessageHandler;
+import org.helios.mmb.sbe.ComponentType;
+import org.helios.mmb.sbe.MessageHeaderEncoder;
+import org.helios.mmb.sbe.StartupEncoder;
+import org.helios.util.DirectBufferAllocator;
+
+public final class StartupMessage
+{
+ private static final int MESSAGE_LENGTH = 152;
+
+ private final UnsafeBuffer startupBuffer;
+
+ public StartupMessage(final ComponentType componentType, final short componentId)
+ {
+ startupBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH));
+
+ final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
+ final StartupEncoder startupEncoder = new StartupEncoder();
+
+ // Encode the StartupMessage message once and forever
+ int bufferOffset = 0;
+
+ messageHeaderEncoder.wrap(startupBuffer, bufferOffset)
+ .blockLength(startupEncoder.sbeBlockLength())
+ .templateId(startupEncoder.sbeTemplateId())
+ .schemaId(startupEncoder.sbeSchemaId())
+ .version(startupEncoder.sbeSchemaVersion());
+
+ bufferOffset += messageHeaderEncoder.encodedLength();
+
+ startupEncoder.wrap(startupBuffer, bufferOffset)
+ .mmbHeader()
+ .nodeId((short)0)
+ .timestamp(System.nanoTime())
+ .component()
+ .componentId(componentId)
+ .componentType(componentType);
+ }
+
+ public void write(final OutputMessageHandler outputMessageHandler)
+ {
+ outputMessageHandler.onMessage(MessageTypes.ADMINISTRATIVE_MSG_ID, startupBuffer, 0, MESSAGE_LENGTH);
+ }
+}
diff --git a/src/org/helios/replica/ReplicaProcessor.java b/src/org/helios/replica/ReplicaProcessor.java
index 3557f09..29c8841 100644
--- a/src/org/helios/replica/ReplicaProcessor.java
+++ b/src/org/helios/replica/ReplicaProcessor.java
@@ -24,6 +24,12 @@ public ReplicaProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idl
replicatorThread = new Thread(this, "replicator");
}
+ @Override
+ public String name()
+ {
+ return replicatorThread.getName();
+ }
+
@Override
public void start()
{
diff --git a/src/org/helios/service/Service.java b/src/org/helios/service/Service.java
index b3916d8..6e05f4a 100644
--- a/src/org/helios/service/Service.java
+++ b/src/org/helios/service/Service.java
@@ -2,14 +2,12 @@
import org.helios.AeronStream;
import org.helios.infra.AvailableAssociationHandler;
-import org.helios.infra.RateReport;
+import org.helios.infra.Report;
import org.helios.infra.UnavailableAssociationHandler;
-import java.util.List;
-
public interface Service extends AutoCloseable
{
- Service addEndPoint(final AeronStream reqStream, final AeronStream rspStream);
+ Service addEndPoint(final AeronStream rspStream);
Service addEventChannel(final AeronStream eventStream);
@@ -17,7 +15,7 @@ public interface Service extends AutoCloseable
Service unavailableAssociationHandler(final UnavailableAssociationHandler handler);
- List reportList();
+ Report report();
T handler();
diff --git a/src/org/helios/service/ServiceReport.java b/src/org/helios/service/ServiceReport.java
index b4e2a6f..58edbf8 100644
--- a/src/org/helios/service/ServiceReport.java
+++ b/src/org/helios/service/ServiceReport.java
@@ -1,55 +1,49 @@
package org.helios.service;
-import org.helios.infra.InputMessageProcessor;
-import org.helios.infra.OutputMessageProcessor;
-import org.helios.infra.RateReport;
+import org.helios.infra.*;
import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
-public final class ServiceReport implements RateReport
+public final class ServiceReport implements Report
{
- private final InputMessageProcessor requestProcessor;
- private final OutputMessageProcessor responseProcessor;
+ private final List inputReportList;
+ private final List outputReportList;
- private long lastTimeStamp;
- private long lastSuccessfulReads;
- private long lastSuccessfulWrites;
- private long lastBytesWritten;
-
- public ServiceReport(final InputMessageProcessor requestProcessor, final OutputMessageProcessor responseProcessor)
+ public ServiceReport(final InputMessageProcessor requestProcessor)
{
Objects.requireNonNull(requestProcessor, "requestProcessor");
+
+ inputReportList = new ArrayList<>();
+ outputReportList = new ArrayList<>();
+
+ inputReportList.add(requestProcessor);
+ }
+
+ public void addResponseProcessor(final OutputMessageProcessor responseProcessor)
+ {
Objects.requireNonNull(responseProcessor, "responseProcessor");
- this.requestProcessor = requestProcessor;
- this.responseProcessor = responseProcessor;
+ outputReportList.add(responseProcessor);
+ }
- lastTimeStamp = System.currentTimeMillis();
+ @Override
+ public String name()
+ {
+ return "ServiceReport";
+ }
+
+ @Override
+ public List inputReports()
+ {
+ return inputReportList;
}
@Override
- public void print(final PrintStream stream)
+ public List outputReports()
{
- final long timeStamp = System.currentTimeMillis();
- long successfulReads = requestProcessor.successfulReads();
- long successfulWrites = responseProcessor.handler().successfulWrites();
- long bytesWritten = responseProcessor.handler().bytesWritten();
- long failedReads = requestProcessor.failedReads();
-
- final long duration = timeStamp - lastTimeStamp;
- final long successfulReadsDelta = successfulReads - lastSuccessfulReads;
- final long successfulWritesDelta = successfulWrites - lastSuccessfulWrites;
- final long bytesTransferred = bytesWritten - lastBytesWritten;
-
- final double failureRatio = failedReads / (double)(successfulReads + failedReads);
-
- stream.format("ServiceReport: T %dms IN %,d messages - OUT %,d messages - %,d bytes [read failure ratio: %f]\n",
- duration, successfulReadsDelta, successfulWritesDelta, bytesTransferred, failureRatio);
-
- lastTimeStamp = timeStamp;
- lastSuccessfulReads = successfulReads;
- lastSuccessfulWrites = successfulWrites;
- lastBytesWritten = bytesWritten;
+ return outputReportList;
}
}
diff --git a/src/org/helios/snapshot/Snapshot.java b/src/org/helios/snapshot/Snapshot.java
deleted file mode 100644
index 81da6bb..0000000
--- a/src/org/helios/snapshot/Snapshot.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.helios.snapshot;
-
-import org.agrona.concurrent.IdleStrategy;
-import org.agrona.concurrent.ringbuffer.RingBuffer;
-import org.helios.infra.MessageTypes;
-
-import java.util.Objects;
-
-import static org.helios.mmb.SnapshotMessageFactory.*;
-
-public final class Snapshot
-{
- public static void writeLoadMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy)
- {
- Objects.requireNonNull(idleStrategy, "idleStrategy");
-
- while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, loadSnapshotBuffer, 0, MESSAGE_LENGTH))
- {
- idleStrategy.idle(0);
- }
- }
-
- public static void writeSaveMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy)
- {
- Objects.requireNonNull(idleStrategy, "idleStrategy");
-
- while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, saveSnapshotBuffer, 0, MESSAGE_LENGTH))
- {
- idleStrategy.idle(0);
- }
- }
-}
diff --git a/src/org/helios/snapshot/SnapshotTimer.java b/src/org/helios/snapshot/SnapshotTimer.java
index afa4ac8..ad640ca 100644
--- a/src/org/helios/snapshot/SnapshotTimer.java
+++ b/src/org/helios/snapshot/SnapshotTimer.java
@@ -4,6 +4,7 @@
import org.agrona.concurrent.BusySpinIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ringbuffer.RingBuffer;
+import org.helios.mmb.SnapshotMessage;
import org.helios.util.Check;
import java.util.Objects;
@@ -14,6 +15,7 @@ public final class SnapshotTimer implements Runnable, AutoCloseable
public static final long MAX_SNAPSHOT_PERIOD = TimeUnit.HOURS.toMillis(24);
public static final long DEFAULT_SNAPSHOT_PERIOD = TimeUnit.HOURS.toMillis(24);
+ private final SnapshotMessage snapshotMessage;
private final TimerWheel timerWheel;
private final RingBuffer inputRingBuffer;
private final long snapshotPeriod;
@@ -36,6 +38,8 @@ public SnapshotTimer(final TimerWheel timerWheel, final RingBuffer inputRingBuff
this.timerWheel = timerWheel;
this.inputRingBuffer = inputRingBuffer;
this.snapshotPeriod = snapshotPeriod;
+
+ snapshotMessage = new SnapshotMessage();
}
public void start()
@@ -47,8 +51,8 @@ public void start()
@Override
public void run()
{
- // Write the Save Data Snapshot message into the input pipeline.
- Snapshot.writeSaveMessage(inputRingBuffer, idleStrategy);
+ // Write the Save Data SnapshotMessage message into the input pipeline.
+ snapshotMessage.writeSaveMessage(inputRingBuffer, idleStrategy);
// Schedule the next data snapshot after periodic time interval.
timerWheel.rescheduleTimeout(snapshotPeriod, TimeUnit.MILLISECONDS, timer);
diff --git a/src/org/helios/status/StatusCounterDescriptor.java b/src/org/helios/status/StatusCounterDescriptor.java
new file mode 100644
index 0000000..3ccf9d9
--- /dev/null
+++ b/src/org/helios/status/StatusCounterDescriptor.java
@@ -0,0 +1,58 @@
+package org.helios.status;
+
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.concurrent.status.AtomicCounter;
+import org.agrona.concurrent.status.CountersManager;
+
+public enum StatusCounterDescriptor
+{
+ BYTES_SENT(0, "Bytes sent"),
+ BYTES_RECEIVED(1, "Bytes received"),
+ HEARTBEATS_SENT(2, "Heartbeats sent"),
+ HEARTBEATS_RECEIVED(3, "Heartbeats received"),
+ ERRORS(4, "Errors");
+
+ public static final int SYSTEM_COUNTER_TYPE_ID = 0;
+
+ private static final Int2ObjectHashMap DESCRIPTOR_BY_ID_MAP = new Int2ObjectHashMap<>();
+
+ static
+ {
+ for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values())
+ {
+ if (null != DESCRIPTOR_BY_ID_MAP.put(descriptor.id, descriptor))
+ {
+ throw new IllegalStateException("Descriptor id already in use: " + descriptor.id);
+ }
+ }
+ }
+
+ public static StatusCounterDescriptor get(final int id)
+ {
+ return DESCRIPTOR_BY_ID_MAP.get(id);
+ }
+
+ private final int id;
+ private final String label;
+
+ StatusCounterDescriptor(final int id, final String label)
+ {
+ this.id = id;
+ this.label = label;
+ }
+
+ public int id()
+ {
+ return id;
+ }
+
+ public String label()
+ {
+ return label;
+ }
+
+ public AtomicCounter newCounter(final CountersManager countersManager)
+ {
+ return countersManager.newCounter(label, SYSTEM_COUNTER_TYPE_ID, (buffer) -> buffer.putInt(0, id));
+ }
+}
diff --git a/src/org/helios/status/StatusCounters.java b/src/org/helios/status/StatusCounters.java
new file mode 100644
index 0000000..814d062
--- /dev/null
+++ b/src/org/helios/status/StatusCounters.java
@@ -0,0 +1,30 @@
+package org.helios.status;
+
+import org.agrona.concurrent.status.AtomicCounter;
+import org.agrona.concurrent.status.CountersManager;
+
+import java.util.EnumMap;
+
+public class StatusCounters implements AutoCloseable
+{
+ private final EnumMap counterByDescriptorMap =
+ new EnumMap<>(StatusCounterDescriptor.class);
+
+ public StatusCounters(final CountersManager countersManager)
+ {
+ for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values())
+ {
+ counterByDescriptorMap.put(descriptor, descriptor.newCounter(countersManager));
+ }
+ }
+
+ public AtomicCounter get(final StatusCounterDescriptor descriptor)
+ {
+ return counterByDescriptorMap.get(descriptor);
+ }
+
+ public void close()
+ {
+ counterByDescriptorMap.values().forEach(AtomicCounter::close);
+ }
+}
diff --git a/test-perf/echo/EchoConfiguration.java b/test-perf/echo/EchoConfiguration.java
index eba38fd..302271e 100644
--- a/test-perf/echo/EchoConfiguration.java
+++ b/test-perf/echo/EchoConfiguration.java
@@ -58,16 +58,16 @@ public class EchoConfiguration
SERVICE_INPUT_RING_SIZE = getInteger(SERVICE_INPUT_RING_SIZE_PROP, 512 * 1024);
SERVICE_OUTPUT_RING_SIZE = getInteger(SERVICE_OUTPUT_RING_SIZE_PROP, 512 * 1024);
- SERVICE_INPUT_CHANNEL = getProperty(SERVICE_INPUT_CHANNEL_PROP, "udp://localhost:40123");
+ SERVICE_INPUT_CHANNEL = getProperty(SERVICE_INPUT_CHANNEL_PROP, "aeron:udp?endpoint=localhost:40123");
SERVICE_INPUT_STREAM_ID = getInteger(SERVICE_INPUT_STREAM_ID_PROP, 10);
- SERVICE_OUTPUT_CHANNEL = getProperty(SERVICE_OUTPUT_CHANNEL_PROP, "udp://localhost:40124");
+ SERVICE_OUTPUT_CHANNEL = getProperty(SERVICE_OUTPUT_CHANNEL_PROP, "aeron:udp?endpoint=localhost:40124");
SERVICE_OUTPUT_STREAM_ID = getInteger(SERVICE_OUTPUT_STREAM_ID_PROP, 11);
MESSAGE_LENGTH = getInteger(MESSAGE_LENGTH_PROP, 256);
- NUMBER_OF_MESSAGES = getInteger(NUMBER_OF_MESSAGES_PROP, 1_000_000/*1_000_000*/);
- NUMBER_OF_ITERATIONS = getInteger(NUMBER_OF_ITERATIONS_PROP, 10/*5*/);
- WARMUP_NUMBER_OF_MESSAGES = getInteger(WARMUP_NUMBER_OF_MESSAGES_PROP, 10_000/*10_000*/);
- WARMUP_NUMBER_OF_ITERATIONS = getInteger(WARMUP_NUMBER_OF_ITERATIONS_PROP, 1/*5*/);
+ NUMBER_OF_MESSAGES = getInteger(NUMBER_OF_MESSAGES_PROP, 1_000_000/*1_000_000*/);//10
+ NUMBER_OF_ITERATIONS = getInteger(NUMBER_OF_ITERATIONS_PROP, 10/*10*/);//1
+ WARMUP_NUMBER_OF_MESSAGES = getInteger(WARMUP_NUMBER_OF_MESSAGES_PROP, 10_000/*10_000*/);//1
+ WARMUP_NUMBER_OF_ITERATIONS = getInteger(WARMUP_NUMBER_OF_ITERATIONS_PROP, 5/*5*/);
LINGER_TIMEOUT_MS = getLong(LINGER_TIMEOUT_MS_PROP, TimeUnit.SECONDS.toMillis(5));
}
}
diff --git a/test-perf/echo/EchoEmbedded.java b/test-perf/echo/EchoEmbedded.java
index d80412f..840da61 100644
--- a/test-perf/echo/EchoEmbedded.java
+++ b/test-perf/echo/EchoEmbedded.java
@@ -6,13 +6,21 @@
import org.helios.HeliosContext;
import org.helios.HeliosDriver;
import org.helios.gateway.Gateway;
+import org.helios.infra.ConsoleReporter;
+import org.helios.service.Service;
import java.util.concurrent.CountDownLatch;
public class EchoEmbedded
{
+ private static final String SERVICE_INPUT_CHANNEL = EchoConfiguration.SERVICE_INPUT_CHANNEL;
+ private static final String SERVICE_OUTPUT_CHANNEL = EchoConfiguration.SERVICE_OUTPUT_CHANNEL;
+ private static final String GATEWAY_INPUT_CHANNEL = EchoConfiguration.SERVICE_OUTPUT_CHANNEL;
+ private static final String GATEWAY_OUTPUT_CHANNEL = EchoConfiguration.SERVICE_INPUT_CHANNEL;
private static final int SERVICE_INPUT_STREAM_ID = EchoConfiguration.SERVICE_INPUT_STREAM_ID;
private static final int SERVICE_OUTPUT_STREAM_ID = EchoConfiguration.SERVICE_OUTPUT_STREAM_ID;
+ private static final int GATEWAY_INPUT_STREAM_ID = EchoConfiguration.SERVICE_OUTPUT_STREAM_ID;
+ private static final int GATEWAY_OUTPUT_STREAM_ID = EchoConfiguration.SERVICE_INPUT_STREAM_ID;
private static final CountDownLatch GW_ASSOCIATION_LATCH = new CountDownLatch(1);
private static final CountDownLatch SVC_ASSOCIATION_LATCH = new CountDownLatch(1);
@@ -36,17 +44,21 @@ public static void main(String[] args) throws Exception
System.out.print("done\nCreating Helios service...");
- final AeronStream svcEmbeddedInputStream = helios.newEmbeddedStream(SERVICE_INPUT_STREAM_ID);
- final AeronStream svcEmbeddedOutputStream = helios.newEmbeddedStream(SERVICE_OUTPUT_STREAM_ID);
- helios.addService(EchoServiceHandler::new,
+ final AeronStream svcInputStream = helios.newStream(SERVICE_INPUT_CHANNEL, SERVICE_INPUT_STREAM_ID);
+ final AeronStream svcOutputStream = helios.newStream(SERVICE_OUTPUT_CHANNEL, SERVICE_OUTPUT_STREAM_ID);
+ final Service svc = helios.addService(EchoServiceHandler::new,
EchoEmbedded::associationWithGatewayEstablished, EchoEmbedded::associationWithGatewayBroken,
- svcEmbeddedInputStream, svcEmbeddedOutputStream);
+ svcInputStream, svcOutputStream);
System.out.print("done\nCreating Helios gateway...");
- final Gateway gw = helios.addGateway(EchoGatewayHandler::new,
- EchoEmbedded::associationWithServiceEstablished, EchoEmbedded::associationWithServiceBroken,
- svcEmbeddedInputStream, svcEmbeddedOutputStream);
+ final AeronStream gwInputStream = helios.newStream(GATEWAY_INPUT_CHANNEL, GATEWAY_INPUT_STREAM_ID);
+ final AeronStream gwOutputStream = helios.newStream(GATEWAY_OUTPUT_CHANNEL, GATEWAY_OUTPUT_STREAM_ID);
+ final Gateway gw = helios.addGateway(
+ EchoEmbedded::associationWithServiceEstablished, EchoEmbedded::associationWithServiceBroken);
+ final EchoGatewayHandler gwHandler = gw.addEndPoint(gwOutputStream, gwInputStream, EchoGatewayHandler::new);
+
+ final ConsoleReporter reporter = new ConsoleReporter();
System.out.println("done\nEchoEmbedded is now running.");
@@ -62,7 +74,10 @@ public static void main(String[] args) throws Exception
System.out.println("done");
- EchoGateway.runTest(gw);
+ EchoGateway.runTest(gwHandler, () -> reporter.onReport(gw.report()));
+
+ reporter.onReport(gw.report());
+ reporter.onReport(svc.report());
}
System.out.println("EchoEmbedded is now terminated.");
diff --git a/test-perf/echo/EchoEmbeddedIpc.java b/test-perf/echo/EchoEmbeddedIpc.java
new file mode 100644
index 0000000..523c8c6
--- /dev/null
+++ b/test-perf/echo/EchoEmbeddedIpc.java
@@ -0,0 +1,103 @@
+package echo;
+
+import org.agrona.concurrent.BusySpinIdleStrategy;
+import org.helios.AeronStream;
+import org.helios.Helios;
+import org.helios.HeliosContext;
+import org.helios.HeliosDriver;
+import org.helios.gateway.Gateway;
+import org.helios.infra.ConsoleReporter;
+import org.helios.service.Service;
+
+import java.util.concurrent.CountDownLatch;
+
+public class EchoEmbeddedIpc
+{
+ private static final int SERVICE_INPUT_STREAM_ID = EchoConfiguration.SERVICE_INPUT_STREAM_ID;
+ private static final int SERVICE_OUTPUT_STREAM_ID = EchoConfiguration.SERVICE_OUTPUT_STREAM_ID;
+
+ private static final CountDownLatch GW_ASSOCIATION_LATCH = new CountDownLatch(1);
+ private static final CountDownLatch SVC_ASSOCIATION_LATCH = new CountDownLatch(1);
+
+ public static void main(String[] args) throws Exception
+ {
+ System.out.print("Starting Helios...");
+
+ final HeliosContext context = new HeliosContext()
+ //.setJournalEnabled(true)
+ .setReadIdleStrategy(new BusySpinIdleStrategy())
+ .setWriteIdleStrategy(new BusySpinIdleStrategy())
+ .setSubscriberIdleStrategy(new BusySpinIdleStrategy())
+ .setPublisherIdleStrategy(new BusySpinIdleStrategy());
+
+ final HeliosDriver driver = new HeliosDriver(context);
+
+ try(final Helios helios = new Helios(context, driver))
+ {
+ helios.errorHandler(EchoEmbeddedIpc::serviceError);
+
+ System.out.print("done\nCreating Helios service...");
+
+ final AeronStream ipcInputStream = helios.newIpcStream(SERVICE_INPUT_STREAM_ID);
+ final AeronStream ipcOutputStream = helios.newIpcStream(SERVICE_OUTPUT_STREAM_ID);
+ final Service svc = helios.addService(EchoServiceHandler::new,
+ EchoEmbeddedIpc::associationWithGatewayEstablished, EchoEmbeddedIpc::associationWithGatewayBroken,
+ ipcInputStream, ipcOutputStream);
+
+ System.out.print("done\nCreating Helios gateway...");
+
+ final Gateway gw = helios.addGateway(
+ EchoEmbeddedIpc::associationWithServiceEstablished, EchoEmbeddedIpc::associationWithServiceBroken);
+ final EchoGatewayHandler gwHandler = gw.addEndPoint(ipcInputStream, ipcOutputStream, EchoGatewayHandler::new);
+
+ final ConsoleReporter reporter = new ConsoleReporter();
+
+ System.out.println("done\nEchoEmbedded is now running.");
+
+ helios.start();
+
+ System.out.print("Waiting for Gateway to see association with Service...");
+
+ GW_ASSOCIATION_LATCH.await();
+
+ System.out.print("done\nWaiting for Service to see association with Gateway...");
+
+ SVC_ASSOCIATION_LATCH.await();
+
+ System.out.println("done");
+
+ EchoGateway.runTest(gwHandler, () -> reporter.onReport(gw.report()));
+
+ reporter.onReport(gw.report());
+ reporter.onReport(svc.report());
+ }
+
+ System.out.println("EchoEmbeddedIpc is now terminated.");
+ System.exit(0);
+ }
+
+ private static void serviceError(final Throwable th)
+ {
+ th.printStackTrace();
+ }
+
+ private static void associationWithServiceEstablished()
+ {
+ GW_ASSOCIATION_LATCH.countDown();
+ }
+
+ private static void associationWithServiceBroken()
+ {
+ System.out.println("Association with Service broken.");
+ }
+
+ private static void associationWithGatewayEstablished()
+ {
+ SVC_ASSOCIATION_LATCH.countDown();
+ }
+
+ private static void associationWithGatewayBroken()
+ {
+ System.out.println("Association with Gateway broken.");
+ }
+}
diff --git a/test-perf/echo/EchoGateway.java b/test-perf/echo/EchoGateway.java
index ba6ed25..36943d1 100644
--- a/test-perf/echo/EchoGateway.java
+++ b/test-perf/echo/EchoGateway.java
@@ -6,12 +6,15 @@
import org.helios.AeronStream;
import org.helios.Helios;
import org.helios.HeliosContext;
+import org.helios.HeliosDriver;
import org.helios.gateway.Gateway;
-import org.helios.infra.RateReport;
+import org.helios.infra.ConsoleReporter;
+import org.helios.infra.Report;
+import org.helios.util.ShutdownHelper;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import static java.lang.System.nanoTime;
@@ -32,11 +35,12 @@ public class EchoGateway
public static void main(String[] args) throws Exception
{
- System.out.print("Starting Helios service...");
+ System.out.print("Starting Helios...");
final HeliosContext context = new HeliosContext();
+ final HeliosDriver driver = new HeliosDriver(context, "./.aeronGateway");
- try(final Helios helios = new Helios(context))
+ try(final Helios helios = new Helios(context, driver))
{
helios.errorHandler(EchoGateway::serviceError);
@@ -44,34 +48,37 @@ public static void main(String[] args) throws Exception
final AeronStream inputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
final AeronStream outputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
- final Gateway gw = helios.addGateway(EchoGatewayHandler::new,
- EchoGateway::serviceAssociationEstablished, EchoGateway::serviceAssociationBroken,
- outputStream, inputStream);
+ final Gateway gw = helios.addGateway(
+ EchoGateway::serviceAssociationEstablished, EchoGateway::serviceAssociationBroken);
+ final EchoGatewayHandler gwHandler = gw.addEndPoint(outputStream, inputStream, EchoGatewayHandler::new);
+
+ final ConsoleReporter reporter = new ConsoleReporter();
+
+ ShutdownHelper.register(() -> reporter.onReport(gw.report()));
helios.start();
- System.out.print("Waiting for association with EchoService...");
+ System.out.print("done\nEchoGateway is now running\nWaiting for association with EchoService...");
ASSOCIATION_LATCH.await();
- System.out.println("done\nEchoGateway is now running.");
+ System.out.println("done");
- runTest(gw);
+ runTest(gwHandler, () -> reporter.onReport(gw.report()));
- System.out.println("EchoGateway is now terminated.");
+ reporter.onReport(gw.report());
}
+
+ System.out.println("EchoGateway is now terminated");
}
- static void runTest(final Gateway gw)
+ static void runTest(final EchoGatewayHandler gwHandler, final Runnable reportRunnable)
{
- final EchoGatewayHandler proxy = gw.handler();
- final List reportList = gw.reportList();
-
System.out.println("Warming up... " + WARMUP_NUMBER_OF_ITERATIONS + " iterations of " + WARMUP_NUMBER_OF_MESSAGES + " messages");
for (int i = 0; i < WARMUP_NUMBER_OF_ITERATIONS; i++)
{
- runIterations(proxy, WARMUP_NUMBER_OF_MESSAGES);
+ runIterations(gwHandler, WARMUP_NUMBER_OF_MESSAGES);
}
final ContinueBarrier barrier = new ContinueBarrier("Execute again?");
@@ -85,7 +92,7 @@ static void runTest(final Gateway gw)
System.out.println("Echoing " + NUMBER_OF_MESSAGES + " messages");
- final long elapsedTime = runIterations(proxy, NUMBER_OF_MESSAGES);
+ final long elapsedTime = runIterations(gwHandler, NUMBER_OF_MESSAGES);
System.out.println(
String.format("%d iteration, %d ops, %d ns, %d ms, rate %.02g ops/s",
@@ -95,7 +102,7 @@ static void runTest(final Gateway gw)
TimeUnit.NANOSECONDS.toMillis(elapsedTime),
((double)NUMBER_OF_MESSAGES / (double)elapsedTime) * 1_000_000_000));
- reportList.forEach(report -> report.print(System.out));
+ reportRunnable.run();
if (NUMBER_OF_ITERATIONS <= 0)
{
@@ -114,18 +121,18 @@ private static long runIterations(final EchoGatewayHandler gatewayHandler, final
{
try
{
- //final long echoTimestampSent = System.nanoTime();
+ final long echoTimestampSent = System.nanoTime();
- gatewayHandler.sendEcho(i);
+ gatewayHandler.sendEcho(echoTimestampSent);//i
/*long echoTimestampReceived;
do
{
echoTimestampReceived = gatewayHandler.getTimestamp();
}
- while (echoTimestampReceived != echoTimestampSent);
+ while (echoTimestampReceived != echoTimestampSent);*/
- final long echoRttNs = System.nanoTime() - echoTimestampReceived;
+ /*final long echoRttNs = System.nanoTime() - echoTimestampReceived;
HISTOGRAM.recordValue(echoRttNs);*/
}
diff --git a/test-perf/echo/EchoGatewayHandler.java b/test-perf/echo/EchoGatewayHandler.java
index a1bf6cb..15a6958 100644
--- a/test-perf/echo/EchoGatewayHandler.java
+++ b/test-perf/echo/EchoGatewayHandler.java
@@ -5,37 +5,52 @@
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
-import org.helios.infra.MessageTypes;
import org.helios.gateway.GatewayHandler;
-import org.helios.util.DirectBufferAllocator;
+import org.helios.infra.MessageTypes;
+import org.helios.mmb.DataMessage;
+import org.helios.mmb.sbe.ComponentType;
import org.helios.util.RingBufferPool;
import static echo.EchoConfiguration.MESSAGE_LENGTH;
-import static org.agrona.UnsafeAccess.UNSAFE;
public class EchoGatewayHandler implements GatewayHandler
{
- private static final long TIMESTAMP_OFFSET;
+ //private static final long TIMESTAMP_OFFSET;
private final RingBufferPool outputBufferPool;
private final IdleStrategy idleStrategy = new BusySpinIdleStrategy();
- private final UnsafeBuffer echoBuffer = new UnsafeBuffer(DirectBufferAllocator.allocate(MESSAGE_LENGTH));
+ //private final UnsafeBuffer echoBuffer = new UnsafeBuffer(DirectBufferAllocator.allocate(MESSAGE_LENGTH));
+ private final DataMessage outgoingDataMessage = new DataMessage();
+ private final DataMessage incomingDataMessage = new DataMessage();
- private volatile long timestamp = 0;
+ //private volatile long timestamp = 0;
- public EchoGatewayHandler(final RingBufferPool outputBufferPool)
+ EchoGatewayHandler(final RingBufferPool outputBufferPool)
{
this.outputBufferPool = outputBufferPool;
+
+ outgoingDataMessage.allocate(ComponentType.Gateway, (short)1, MESSAGE_LENGTH);
}
@Override
public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length)
{
- /* ECHO GATEWAY message processing: store last echo timestamp */
+ if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
+ {
+ incomingDataMessage.wrap(buffer, index, length);
+ final UnsafeBuffer dataBuffer = incomingDataMessage.dataBuffer();
+ final int dataOffset = incomingDataMessage.dataBufferOffset();
+
+ /* ECHO GATEWAY message processing: store last echo timestamp */
+ final long echoTimestamp = dataBuffer.getLong(dataOffset);
+
+ //final long echoTimestamp = buffer.getLong(index);
- final long echoTimestamp = buffer.getLong(index);
+ //UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, echoTimestamp);
- UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, echoTimestamp);
+ //final long timestamp = timestampQueue.remove();
+ //System.out.println("RECEIVED echoTimestamp=" + echoTimestamp);
+ }
}
@Override
@@ -43,21 +58,31 @@ public void close() throws Exception
{
}
- public void sendEcho(final long echoTimestamp)
+ void sendEcho(final long echoTimestamp)
{
final RingBuffer outputBuffer = outputBufferPool.outputRingBuffers().iterator().next(); // FIXME: refactoring to avoid this API
- echoBuffer.putLong(0, echoTimestamp);
+ final UnsafeBuffer dataBuffer = outgoingDataMessage.dataBuffer();
+ final int dataOffset = outgoingDataMessage.dataBufferOffset();
+
+ //echoBuffer.putLong(0, echoTimestamp);
+ dataBuffer.putLong(dataOffset, echoTimestamp);
- while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, echoBuffer, 0, MESSAGE_LENGTH))
+ /*while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, echoBuffer, 0, MESSAGE_LENGTH))
+ {
+ idleStrategy.idle(0);
+ }*/
+ while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, dataBuffer, 0, dataBuffer.capacity()))
{
idleStrategy.idle(0);
}
- UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, 0);
+ //System.out.println("SENT echoTimestamp=" + echoTimestamp);
+ //UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, 0);
+ //timestampQueue.add(echoTimestamp);
}
- public long getTimestamp()
+ /*public long getTimestamp()
{
return timestamp;
}
@@ -72,5 +97,5 @@ public long getTimestamp()
{
throw new RuntimeException(ex);
}
- }
+ }*/
}
diff --git a/test-perf/echo/EchoService.java b/test-perf/echo/EchoService.java
index 1a864e1..6111072 100644
--- a/test-perf/echo/EchoService.java
+++ b/test-perf/echo/EchoService.java
@@ -3,6 +3,8 @@
import org.helios.AeronStream;
import org.helios.Helios;
import org.helios.HeliosContext;
+import org.helios.HeliosDriver;
+import org.helios.infra.ConsoleReporter;
import org.helios.service.Service;
import org.helios.util.ShutdownHelper;
@@ -17,34 +19,37 @@ public class EchoService
public static void main(String[] args) throws Exception
{
- System.out.print("Starting Helios service...");
+ System.out.print("Starting Helios...");
final HeliosContext context = new HeliosContext()
.setJournalEnabled(true);
+ final HeliosDriver driver = new HeliosDriver(context, "./.aeronService");
- try(final Helios helios = new Helios(context))
+ try(final Helios helios = new Helios(context, driver))
{
System.out.print("done\nCreating Helios service...");
final AeronStream inputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
final AeronStream outputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
- final Service echoService = helios.addService(
+ final Service svc = helios.addService(
EchoServiceHandler::new, inputStream, outputStream);
- final EchoServiceHandler echoServiceHandler = echoService.handler();
+ final EchoServiceHandler echoServiceHandler = svc.handler();
final CountDownLatch runningLatch = new CountDownLatch(1);
ShutdownHelper.register(runningLatch::countDown);
- System.out.println("done\nEchoService is now running.");
-
helios.start();
+ System.out.println("done\nEchoService is now running");
+
runningLatch.await();
System.out.println("EchoService last snapshot: " + echoServiceHandler.lastSnapshotTimestamp());
+
+ new ConsoleReporter().onReport(svc.report());
}
- System.out.println("EchoService is now terminated.");
+ System.out.println("EchoService is now terminated");
}
}
diff --git a/test-perf/runEchoEmbeddedIpc.sh b/test-perf/runEchoEmbeddedIpc.sh
new file mode 100755
index 0000000..30f3a27
--- /dev/null
+++ b/test-perf/runEchoEmbeddedIpc.sh
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+
+sudo -S sysctl net.core.rmem_max=2097152 > /dev/null
+sudo -S sysctl net.core.wmem_max=2097152 > /dev/null
+
+java -Xms2G -Xmx4G -XX:BiasedLockingStartupDelay=0 \
+-Daeron.mtu.length=16384 -Daeron.rcv.buffer.length=16384 -Daeron.socket.so_sndbuf=2097152 \
+-Daeron.socket.so_rcvbuf=2097152 -Daeron.rcv.initial.window.length=2097152 -Dagrona.disable.bounds.checks=true \
+-cp ".:../build/production/Helios:../build/test/Helios:../lib/*" \
+echo.EchoEmbeddedIpc
\ No newline at end of file
diff --git a/test/org/helios/HeliosTest.java b/test/org/helios/HeliosTest.java
index 291daef..583ee99 100644
--- a/test/org/helios/HeliosTest.java
+++ b/test/org/helios/HeliosTest.java
@@ -11,6 +11,10 @@
import org.helios.gateway.GatewayHandler;
import org.helios.gateway.GatewayHandlerFactory;
import org.helios.infra.MessageTypes;
+import org.helios.mmb.DataMessage;
+import org.helios.mmb.sbe.ComponentDecoder;
+import org.helios.mmb.sbe.ComponentType;
+import org.helios.mmb.sbe.DataDecoder;
import org.helios.service.Service;
import org.helios.service.ServiceHandler;
import org.helios.util.RingBufferPool;
@@ -26,9 +30,6 @@
public class HeliosTest
{
- private static final int EMBEDDED_INPUT_STREAM_ID = 10;
- private static final int EMBEDDED_OUTPUT_STREAM_ID = 11;
-
private static final String INPUT_CHANNEL = "aeron:udp?endpoint=localhost:40123";
private static final String OUTPUT_CHANNEL = "aeron:udp?endpoint=localhost:40124";
private static final int INPUT_STREAM_ID = 10;
@@ -37,20 +38,20 @@ public class HeliosTest
private static final int NUM_GATEWAYS = 2; //2
@Test
- public void shouldAssociateOneEmbeddedServiceWithOneEmbeddedGateway() throws Exception
+ public void shouldAssociateOneServiceWithOneGatewayIPC() throws Exception
{
try(final Helios helios = new Helios())
{
final CountDownLatch s2gAssociationLatch = new CountDownLatch(1);
final CountDownLatch g2sAssociationLatch = new CountDownLatch(1);
- final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID);
- final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID);
+ final AeronStream ipcInputStream = helios.newIpcStream(INPUT_STREAM_ID);
+ final AeronStream ipcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID);
helios.addService(NullServiceHandler::new, s2gAssociationLatch::countDown, ()->{},
- embeddedInputStream, embeddedOutputStream);
+ ipcInputStream, ipcOutputStream);
helios.addGateway(NullGatewayHandler::new, g2sAssociationLatch::countDown, ()->{},
- embeddedInputStream, embeddedOutputStream);
+ ipcInputStream, ipcOutputStream);
helios.start();
@@ -71,15 +72,15 @@ public void shouldAssociateOneServiceWithOneGateway() throws Exception
final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
- helios.addService(NullServiceHandler::new)
+ helios.addService(NullServiceHandler::new, svcInputStream)
.availableAssociationHandler(s2gAssociationLatch::countDown)
- .addEndPoint(svcInputStream, svcOutputStream);
+ .addEndPoint(svcOutputStream);
final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
- helios.addGateway(NullGatewayHandler::new)
+ helios.addGateway()
.availableAssociationHandler(g2sAssociationLatch::countDown)
- .addEndPoint(gwOutputStream, gwInputStream);
+ .addEndPoint(gwOutputStream, gwInputStream, NullGatewayHandler::new);
helios.start();
@@ -91,26 +92,26 @@ public void shouldAssociateOneServiceWithOneGateway() throws Exception
}
@Test
- public void shouldAssociateOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Exception
+ public void shouldAssociateOneServiceWithMultipleGatewaysIPC() throws Exception
{
try(final Helios helios = new Helios())
{
final CountDownLatch s2gAssociationLatch = new CountDownLatch(NUM_GATEWAYS);
final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS);
- final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID);
+ final AeronStream ipcInputStream = helios.newIpcStream(INPUT_STREAM_ID);
final Service svc = helios.addService(NullServiceHandler::new,
- s2gAssociationLatch::countDown, () -> {});
+ ipcInputStream, s2gAssociationLatch::countDown, () -> {});
for (int i = 0; i < NUM_GATEWAYS; i++)
{
- final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID+i);
+ final AeronStream ipcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID+i);
- svc.addEndPoint(embeddedInputStream, embeddedOutputStream);
+ svc.addEndPoint(ipcOutputStream);
helios.addGateway(NullGatewayHandler::new, g2sAssociationLatch::countDown, () -> {},
- embeddedInputStream, embeddedOutputStream);
+ ipcInputStream, ipcOutputStream);
}
helios.start();
@@ -130,22 +131,23 @@ public void shouldAssociateOneServiceWithMultipleGateways() throws Exception
final CountDownLatch s2gAssociationLatch = new CountDownLatch(NUM_GATEWAYS);
final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS);
- final Service svc = helios.addService(NullServiceHandler::new)
+ final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
+ final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
+
+ final Service svc = helios.addService(NullServiceHandler::new, svcInputStream)
.availableAssociationHandler(s2gAssociationLatch::countDown);
for (int i = 0; i < NUM_GATEWAYS; i++)
{
- final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
- final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i);
- svc.addEndPoint(svcInputStream, svcOutputStream);
+ final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i);
+ svc.addEndPoint(svcOutputStream);
- final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
- final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i);
+ final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i);
helios.addGateway(NullGatewayHandler::new, g2sAssociationLatch::countDown, () -> {},
gwOutputStream, gwInputStream);
}
- assertTrue(helios.numServiceSubscriptions() == NUM_GATEWAYS);
+ assertTrue(helios.numServiceSubscriptions() == 1);
assertTrue(helios.numGatewaySubscriptions() == NUM_GATEWAYS);
helios.start();
@@ -158,7 +160,7 @@ public void shouldAssociateOneServiceWithMultipleGateways() throws Exception
}
@Test
- public void shouldPingOneServiceWithOneGateway() throws Exception
+ public void shouldPingOneServiceWithOneGatewayIPC() throws Exception
{
try(final Helios helios = new Helios())
{
@@ -166,44 +168,43 @@ public void shouldPingOneServiceWithOneGateway() throws Exception
final CountDownLatch g2sAssociationLatch = new CountDownLatch(1);
final CountDownLatch pingLatch = new CountDownLatch(1);
- final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
- final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
- final Service svc = helios.addService(PingServiceHandler::new)
- .availableAssociationHandler(s2gAssociationLatch::countDown)
- .addEndPoint(svcInputStream, svcOutputStream);
+ final AeronStream svcInputStream = helios.newIpcStream(INPUT_STREAM_ID);
+ final AeronStream svcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID);
+
+ final Service svc = helios.addService(PingServiceHandler::new,
+ s2gAssociationLatch::countDown, () -> {},
+ svcInputStream, svcOutputStream);
final int gatewayId = 1;
svc.handler().addOutputStream(gatewayId, svcOutputStream);// FIXME: define Helios Gateway-Service (HGS) protocol
- final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
- final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
- final Gateway gw =
- helios.addGateway(
- (outputBufferPool) -> new PingGatewayHandler(gatewayId,
- outputBufferPool, gwOutputStream,
- (msgTypeId, buffer, index, length) -> {
- if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
- {
- // APPLICATION message type, ignore */
- assertTrue(buffer.getInt(index) == gatewayId);
- assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH);
- pingLatch.countDown();
- }
- /*else
- {
- // ADMINISTRATIVE message type, ignore
- }*/
- })
- );
+ final AeronStream gwOutputStream = helios.newIpcStream(INPUT_STREAM_ID);
+ final AeronStream gwInputStream = helios.newIpcStream(OUTPUT_STREAM_ID);
+ final Gateway gw = helios.addGateway();
gw.availableAssociationHandler(g2sAssociationLatch::countDown);
- gw.addEndPoint(gwOutputStream, gwInputStream);
+ final PingGatewayHandler pingHandler = gw.addEndPoint(gwOutputStream, gwInputStream,
+ (outputBufferPool) -> new PingGatewayHandler(gatewayId,
+ outputBufferPool, gwOutputStream,
+ (msgTypeId, buffer, index, length) -> {
+ if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
+ {
+ assertTrue(buffer.getInt(index) == gatewayId);
+ assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH);
+ pingLatch.countDown();
+ }
+ /*else
+ {
+ // ADMINISTRATIVE message type, ignore
+ }*/
+ })
+ );
helios.start();
s2gAssociationLatch.await();
g2sAssociationLatch.await();
- gw.handler().sendPing();
+ pingHandler.sendPing();
pingLatch.await();
}
@@ -212,7 +213,7 @@ public void shouldPingOneServiceWithOneGateway() throws Exception
}
@Test
- public void shouldPingOneEmbeddedServiceWithOneEmbeddedGateway() throws Exception
+ public void shouldPingOneServiceWithOneGateway() throws Exception
{
try(final Helios helios = new Helios())
{
@@ -220,43 +221,43 @@ public void shouldPingOneEmbeddedServiceWithOneEmbeddedGateway() throws Exceptio
final CountDownLatch g2sAssociationLatch = new CountDownLatch(1);
final CountDownLatch pingLatch = new CountDownLatch(1);
- final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID);
- final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID);
-
- final Service svc = helios.addService(PingServiceHandler::new,
- s2gAssociationLatch::countDown, () -> {},
- embeddedInputStream, embeddedOutputStream);
+ final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
+ final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
+ final Service svc = helios.addService(PingServiceHandler::new, svcInputStream)
+ .availableAssociationHandler(s2gAssociationLatch::countDown)
+ .addEndPoint(svcOutputStream);
final int gatewayId = 1;
- svc.handler().addOutputStream(gatewayId, embeddedOutputStream);// FIXME: define Helios Gateway-Service (HGS) protocol
-
- final Gateway gw =
- helios.addGateway(
- (outputBufferPool) -> new PingGatewayHandler(gatewayId,
- outputBufferPool, embeddedInputStream,
- (msgTypeId, buffer, index, length) -> {
- if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
- {
- // APPLICATION message type, ignore */
- assertTrue(buffer.getInt(index) == gatewayId);
- assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH);
- pingLatch.countDown();
- }
- /*else
- {
- // ADMINISTRATIVE message type, ignore
- }*/
- })
- );
+ svc.handler().addOutputStream(gatewayId, svcOutputStream);// FIXME: define Helios Gateway-Service (HGS) protocol
+
+ final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
+ final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID);
+ final Gateway gw = helios.addGateway();
gw.availableAssociationHandler(g2sAssociationLatch::countDown);
- gw.addEndPoint(embeddedInputStream, embeddedOutputStream);
+ final PingGatewayHandler pingHandler = gw.addEndPoint(gwOutputStream, gwInputStream,
+ (outputBufferPool) -> new PingGatewayHandler(gatewayId,
+ outputBufferPool, gwOutputStream,
+ (msgTypeId, buffer, index, length) -> {
+ if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
+ {
+ // APPLICATION message type, ignore */
+ assertTrue(buffer.getInt(index) == gatewayId);
+ assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH);
+ pingLatch.countDown();
+ }
+ /*else
+ {
+ // ADMINISTRATIVE message type, ignore
+ }*/
+ })
+ );
helios.start();
s2gAssociationLatch.await();
g2sAssociationLatch.await();
- gw.handler().sendPing();
+ pingHandler.sendPing();
pingLatch.await();
}
@@ -265,7 +266,7 @@ public void shouldPingOneEmbeddedServiceWithOneEmbeddedGateway() throws Exceptio
}
@Test
- public void shouldPingOneServiceWithMultipleGateways() throws Exception
+ public void shouldPingOneServiceWithMultipleGatewaysIPC() throws Exception
{
try(final Helios helios = new Helios())
{
@@ -273,28 +274,26 @@ public void shouldPingOneServiceWithMultipleGateways() throws Exception
final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS);
final CountDownLatch pingLatch = new CountDownLatch(NUM_GATEWAYS);
- final Service svc = helios.addService(PingServiceHandler::new,
+ final AeronStream ipcInputStream = helios.newIpcStream(INPUT_STREAM_ID);
+
+ final Service svc = helios.addService(PingServiceHandler::new, ipcInputStream,
s2gAssociationLatch::countDown, () -> {});
- final List> gateways = new ArrayList<>();
+ final List gatewayHandlers = new ArrayList<>();
for (int i = 0; i < NUM_GATEWAYS; i++)
{
- final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID+i);
- final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i);
+ final AeronStream ipcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID+i);
- svc.addEndPoint(svcInputStream, svcOutputStream);
+ svc.addEndPoint(ipcOutputStream);
- svc.handler().addOutputStream(i, svcOutputStream); // FIXME: define Helios Gateway-Service (HGS) protocol
-
- final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID+i);
- final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i);
+ svc.handler().addOutputStream(i, ipcOutputStream); // FIXME: define Helios Gateway-Service (HGS) protocol
- final Gateway gw = helios.addGateway(
- new PingGatewayHandlerFactory(i, gwOutputStream, new PingMessageHandler(i, pingLatch)));
+ final Gateway gw = helios.addGateway();
gw.availableAssociationHandler(g2sAssociationLatch::countDown);
- gw.addEndPoint(gwOutputStream, gwInputStream);
+ final PingGatewayHandler gwHandler = gw.addEndPoint(ipcInputStream, ipcOutputStream,
+ new PingGatewayHandlerFactory(i, ipcInputStream, new PingMessageHandler(i, pingLatch)));
- gateways.add(gw);
+ gatewayHandlers.add(gwHandler);
}
helios.start();
@@ -302,7 +301,7 @@ public void shouldPingOneServiceWithMultipleGateways() throws Exception
s2gAssociationLatch.await();
g2sAssociationLatch.await();
- gateways.forEach((gw) -> gw.handler().sendPing());
+ gatewayHandlers.forEach(PingGatewayHandler::sendPing);
pingLatch.await();
}
@@ -310,8 +309,8 @@ public void shouldPingOneServiceWithMultipleGateways() throws Exception
assertTrue(true);
}
- /*@Test
- public void shouldPingOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Exception
+ @Test
+ public void shouldPingOneServiceWithMultipleGateways() throws Exception
{
try(final Helios helios = new Helios())
{
@@ -319,42 +318,29 @@ public void shouldPingOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Ex
final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS);
final CountDownLatch pingLatch = new CountDownLatch(NUM_GATEWAYS);
- final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID);
+ final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
- final Service svc = helios.addService(PingServiceHandler::new,
+ final Service svc = helios.addService(PingServiceHandler::new, svcInputStream,
s2gAssociationLatch::countDown, () -> {});
- final List> gateways = new ArrayList<>();
+ final List gatewayHandlers = new ArrayList<>();
for (int i = 0; i < NUM_GATEWAYS; i++)
{
- final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID+i);
-
- svc.addEndPoint(embeddedInputStream, embeddedOutputStream);
-
- final int gatewayId = i;
- final Gateway gw = helios.addGateway(
- (outputBufferPool) -> new PingGatewayHandler(outputBufferPool,
- (msgTypeId, buffer, index, length) -> {
- if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
- {
- System.out.println("PingGatewayHandler::onMessage buffer.getInt(index)=" + buffer.getInt(index) + " gatewayId=" + gatewayId);
- boolean b = buffer.getInt(index) == gatewayId;
- System.out.println("PingGatewayHandler::onMessage b=" + b);
- // APPLICATION message type, ignore
- assertTrue(buffer.getInt(index) == gatewayId);
- assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH);
- pingLatch.countDown();
- }
- //else
- //{
- // ADMINISTRATIVE message type, ignore
- //}
- },
- gatewayId));
+ final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i);
+
+ svc.addEndPoint(svcOutputStream);
+
+ svc.handler().addOutputStream(i, svcOutputStream); // FIXME: define Helios Gateway-Service (HGS) protocol
+
+ final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i);
+ final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID);
+
+ final Gateway gw = helios.addGateway();
gw.availableAssociationHandler(g2sAssociationLatch::countDown);
- gw.addEndPoint(embeddedInputStream, embeddedOutputStream);
+ final PingGatewayHandler pingHandler = gw.addEndPoint(gwOutputStream, gwInputStream,
+ new PingGatewayHandlerFactory(i, gwOutputStream, new PingMessageHandler(i, pingLatch)));
- gateways.add(gw);
+ gatewayHandlers.add(pingHandler);
}
helios.start();
@@ -362,13 +348,13 @@ public void shouldPingOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Ex
s2gAssociationLatch.await();
g2sAssociationLatch.await();
- gateways.forEach((gw) -> gw.handler().sendPing());
+ gatewayHandlers.forEach(PingGatewayHandler::sendPing);
pingLatch.await();
}
assertTrue(true);
- }*/
+ }
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenOnlyContextIsNull()
@@ -421,7 +407,7 @@ public void shouldThrowExceptionWhenServiceRequestStreamIsNull()
{
try(final Helios helios = new Helios())
{
- helios.addService((outputBuffers) -> null).addEndPoint(null, helios.newStream(OUTPUT_CHANNEL, 0));
+ helios.addService((outputBuffers) -> null, null).addEndPoint(helios.newStream(OUTPUT_CHANNEL, 0));
}
}
@@ -431,7 +417,7 @@ public void shouldThrowExceptionWhenServiceResponseStreamIsNull()
{
try(final Helios helios = new Helios())
{
- helios.addService((outputBuffers) -> null).addEndPoint(helios.newStream(INPUT_CHANNEL, 0), null);
+ helios.addService((outputBuffers) -> null, helios.newStream(INPUT_CHANNEL, 0)).addEndPoint(null);
}
}
@@ -440,7 +426,7 @@ public void shouldThrowExceptionWhenServiceFactoryIsNull()
{
try(final Helios helios = new Helios())
{
- helios.addService(null);
+ helios.addService(null, helios.newStream(INPUT_CHANNEL, 0));
}
}
@@ -449,7 +435,7 @@ public void shouldThrowExceptionWhenGatewayRequestStreamIsNull()
{
try(final Helios helios = new Helios())
{
- helios.addGateway((outputBuffers) -> null).addEndPoint(null, helios.newStream(INPUT_CHANNEL, 0));
+ helios.addGateway().addEndPoint(null, helios.newStream(INPUT_CHANNEL, 0), (outputBuffers) -> null);
}
}
@@ -458,7 +444,7 @@ public void shouldThrowExceptionWhenGatewayResponseStreamIsNull()
{
try(final Helios helios = new Helios())
{
- helios.addGateway((outputBuffers) -> null).addEndPoint(helios.newStream(INPUT_CHANNEL, 0), null);
+ helios.addGateway().addEndPoint(helios.newStream(OUTPUT_CHANNEL, 0), null, (outputBuffers) -> null);
}
}
@@ -467,7 +453,7 @@ public void shouldThrowExceptionWhenGatewayFactoryIsNull()
{
try(final Helios helios = new Helios())
{
- helios.addGateway(null);
+ helios.addGateway().addEndPoint(helios.newStream(OUTPUT_CHANNEL, 0), helios.newStream(INPUT_CHANNEL, 0), null);
}
}
@@ -512,6 +498,7 @@ private class PingServiceHandler implements ServiceHandler
private final RingBufferPool ringBufferPool;
private final IdleStrategy idleStrategy;
private Int2ObjectHashMap gatewayId2StreamMap; // FIXME: temporary workaround, use HGS protocol
+ private final DataMessage incomingDataMessage = new DataMessage();
PingServiceHandler(final RingBufferPool ringBufferPool)
{
@@ -530,7 +517,15 @@ public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int
{
if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
{
- final int gatewayId = buffer.getInt(index); // FIXME: temporary workaround, use HGS protocol header
+ final DataDecoder dataDecoder = incomingDataMessage.wrap(buffer, index, length);
+ final ComponentDecoder componentDecoder = dataDecoder.mmbHeader().component();
+
+ final ComponentType componentType = componentDecoder.componentType();
+ assertTrue(componentType == ComponentType.Gateway);
+
+ final int gatewayId = componentDecoder.componentId();
+
+ //final int gatewayId = buffer.getInt(index); // FIXME: temporary workaround, use HGS protocol header
final AeronStream outputStream = gatewayId2StreamMap.get(gatewayId);
final RingBuffer outputBuffer = ringBufferPool.getOutputRingBuffer(outputStream);// FIXME: refactoring to avoid this API
@@ -562,7 +557,9 @@ private class PingGatewayHandler implements GatewayHandler
private final AeronStream gwOutputStream;
private final MessageHandler delegate;
private final IdleStrategy idleStrategy;
- private final UnsafeBuffer echoBuffer;
+ //private final UnsafeBuffer echoBuffer;
+ private final DataMessage outgoingDataMessage = new DataMessage();
+ private final DataMessage incomingDataMessage = new DataMessage();
PingGatewayHandler(final int gatewayId, final RingBufferPool ringBufferPool, final AeronStream gwOutputStream,
final MessageHandler delegate)
@@ -572,16 +569,22 @@ private class PingGatewayHandler implements GatewayHandler
this.gwOutputStream = gwOutputStream;
this.delegate = delegate;
this.idleStrategy = new BusySpinIdleStrategy();
- this.echoBuffer = new UnsafeBuffer(ByteBuffer.allocate(MESSAGE_LENGTH));
+ //this.echoBuffer = new UnsafeBuffer(ByteBuffer.allocate(MESSAGE_LENGTH));
+
+ outgoingDataMessage.allocate(ComponentType.Gateway, (short)gatewayId, MESSAGE_LENGTH);
}
void sendPing()
{
final RingBuffer outputBuffer = ringBufferPool.getOutputRingBuffer(gwOutputStream); // FIXME: refactoring to avoid this API
- echoBuffer.putInt(0, gatewayId); // FIXME: temporary workaround, use HGS protocol header
+ //echoBuffer.putInt(0, gatewayId); // FIXME: temporary workaround, use HGS protocol header
+ final MutableDirectBuffer dataBuffer = outgoingDataMessage.dataBuffer();
+ final int dataBufferOffset = outgoingDataMessage.dataBufferOffset();
+
+ dataBuffer.putInt(dataBufferOffset, gatewayId);
- while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, echoBuffer, 0, MESSAGE_LENGTH))
+ while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, dataBuffer, 0, dataBuffer.capacity()))
{
idleStrategy.idle(0);
}
@@ -590,7 +593,23 @@ void sendPing()
@Override
public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length)
{
- delegate.onMessage(msgTypeId, buffer, index, length);
+ if (msgTypeId == MessageTypes.APPLICATION_MSG_ID)
+ {
+ incomingDataMessage.wrap(buffer, index, length);
+
+ final MutableDirectBuffer dataBuffer = incomingDataMessage.dataBuffer();
+ final int dataBufferOffset = incomingDataMessage.dataBufferOffset();
+ final int dataBufferLength = incomingDataMessage.dataBufferLength();
+
+ //
+ delegate.onMessage(msgTypeId, dataBuffer, dataBufferOffset, dataBufferLength);
+ }
+ /*else
+ {
+ // ADMINISTRATIVE message type, ignore
+ }*/
+
+ //delegate.onMessage(msgTypeId, buffer, index, length);
}
@Override
diff --git a/test/org/helios/snapshot/SnapshotTest.java b/test/org/helios/mmb/SnapshotMessageTest.java
similarity index 87%
rename from test/org/helios/snapshot/SnapshotTest.java
rename to test/org/helios/mmb/SnapshotMessageTest.java
index eafa42e..fe2733c 100644
--- a/test/org/helios/snapshot/SnapshotTest.java
+++ b/test/org/helios/mmb/SnapshotMessageTest.java
@@ -1,10 +1,11 @@
-package org.helios.snapshot;
+package org.helios.mmb;
import org.agrona.concurrent.BusySpinIdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.helios.infra.MessageTypes;
+import org.helios.mmb.SnapshotMessage;
import org.helios.mmb.sbe.*;
import org.helios.util.DirectBufferAllocator;
import org.junit.Test;
@@ -12,13 +13,14 @@
import static org.agrona.concurrent.ringbuffer.RingBufferDescriptor.TRAILER_LENGTH;
import static org.junit.Assert.assertTrue;
-public class SnapshotTest
+public class SnapshotMessageTest
{
private final int BUFFER_SIZE = (16 * 1024) + TRAILER_LENGTH;
private final RingBuffer ringBuffer = new OneToOneRingBuffer(
new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(BUFFER_SIZE)));
+ private final SnapshotMessage snapshotMessage = new SnapshotMessage();
private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
private final LoadSnapshotDecoder loadSnapshotDecoder = new LoadSnapshotDecoder();
private final SaveSnapshotDecoder saveSnapshotDecoder = new SaveSnapshotDecoder();
@@ -26,7 +28,7 @@ public class SnapshotTest
@Test
public void shouldWriteLoadSnapshotMessage()
{
- Snapshot.writeLoadMessage(ringBuffer, new BusySpinIdleStrategy());
+ snapshotMessage.writeLoadMessage(ringBuffer, new BusySpinIdleStrategy());
int readBytes;
do
@@ -59,7 +61,7 @@ public void shouldWriteLoadSnapshotMessage()
@Test
public void shouldWriteSaveSnapshotMessage()
{
- Snapshot.writeSaveMessage(ringBuffer, new BusySpinIdleStrategy());
+ snapshotMessage.writeSaveMessage(ringBuffer, new BusySpinIdleStrategy());
int readBytes;
do
@@ -92,19 +94,19 @@ public void shouldWriteSaveSnapshotMessage()
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenRingBufferIsNullInLoad()
{
- Snapshot.writeLoadMessage(null, new BusySpinIdleStrategy());
+ snapshotMessage.writeLoadMessage(null, new BusySpinIdleStrategy());
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenRingBufferIsNullInSave()
{
- Snapshot.writeSaveMessage(null, new BusySpinIdleStrategy());
+ snapshotMessage.writeSaveMessage(null, new BusySpinIdleStrategy());
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenIdleStrategyIsNullInLoad()
{
- Snapshot.writeLoadMessage(
+ snapshotMessage.writeLoadMessage(
new OneToOneRingBuffer(new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(BUFFER_SIZE))),
null);
}
@@ -112,7 +114,7 @@ public void shouldThrowExceptionWhenIdleStrategyIsNullInLoad()
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenIdleStrategyIsNullInSave()
{
- Snapshot.writeSaveMessage(
+ snapshotMessage.writeSaveMessage(
new OneToOneRingBuffer(new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(BUFFER_SIZE))),
null);
}
diff --git a/test/org/helios/service/ServiceReportTest.java b/test/org/helios/service/ServiceReportTest.java
index 64044d1..07f1e14 100644
--- a/test/org/helios/service/ServiceReportTest.java
+++ b/test/org/helios/service/ServiceReportTest.java
@@ -5,6 +5,7 @@
import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.helios.*;
+import org.helios.infra.AssociationHandler;
import org.helios.infra.InputMessageProcessor;
import org.helios.infra.OutputMessageProcessor;
import org.helios.util.DirectBufferAllocator;
@@ -26,16 +27,18 @@ public class ServiceReportTest
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenInputMessageProcessorIsNull()
{
- try (final Helios helios = new Helios())
- {
- new ServiceReport(null, new OutputMessageProcessor(ringBuffer, helios.newEmbeddedStream(0),
- new BusySpinIdleStrategy(), ""));
- }
+ new ServiceReport(null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenOutputMessageProcessorIsNull()
{
- new ServiceReport(new InputMessageProcessor(ringBuffer, new BusySpinIdleStrategy(), 0, ""), null);
+ try (final Helios helios = new Helios())
+ {
+ new ServiceReport(
+ new InputMessageProcessor(ringBuffer, new BusySpinIdleStrategy(), 0, 0, helios.newIpcStream(0),
+ null, "")
+ ).addResponseProcessor(null);
+ }
}
}
diff --git a/test/org/helios/status/StatusCounterTest.java b/test/org/helios/status/StatusCounterTest.java
new file mode 100644
index 0000000..6f46e46
--- /dev/null
+++ b/test/org/helios/status/StatusCounterTest.java
@@ -0,0 +1,60 @@
+package org.helios.status;
+
+import org.agrona.concurrent.AtomicBuffer;
+import org.agrona.concurrent.UnsafeBuffer;
+import org.agrona.concurrent.status.AtomicCounter;
+import org.agrona.concurrent.status.CountersManager;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StatusCounterTest
+{
+ private static final int VALUES_BUFFER_SIZE = 1024;
+ private static final AtomicBuffer METADATA_BUFFER = new UnsafeBuffer(ByteBuffer.allocate(VALUES_BUFFER_SIZE * 2));
+ private static final AtomicBuffer VALUES_BUFFER = new UnsafeBuffer(ByteBuffer.allocate(VALUES_BUFFER_SIZE));
+ private static final CountersManager COUNTERS_MANAGER = new CountersManager(METADATA_BUFFER, VALUES_BUFFER);
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowExceptionWhenCountersManagerIsNull()
+ {
+ new StatusCounters(null);
+ }
+
+ @Test
+ public void shouldReturnNullCounterWhenStatusCounterDescriptorIsNull()
+ {
+ try(final StatusCounters statusCounters = new StatusCounters(COUNTERS_MANAGER))
+ {
+ final AtomicCounter counter = statusCounters.get(null);
+ assertNull(counter);
+ }
+ }
+
+ @Test
+ public void shouldReturnCorrectStatusCounterDescriptor()
+ {
+ try(final StatusCounters statusCounters = new StatusCounters(COUNTERS_MANAGER))
+ {
+ for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values())
+ {
+ final AtomicCounter counter = statusCounters.get(descriptor);
+ assertNotNull(counter);
+
+ counter.add(descriptor.id());
+ }
+
+ for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values())
+ {
+ final AtomicCounter counter = statusCounters.get(descriptor);
+ assertNotNull(counter);
+
+ assertTrue(counter.get() == descriptor.id());
+ }
+ }
+ }
+}
diff --git a/test/org/helios/util/ProcessorHelperTest.java b/test/org/helios/util/ProcessorHelperTest.java
index e3ebbe2..7938806 100644
--- a/test/org/helios/util/ProcessorHelperTest.java
+++ b/test/org/helios/util/ProcessorHelperTest.java
@@ -36,6 +36,12 @@ private class ProcessorStub implements Processor
this.runnableCode = runnableCode;
}
+ @Override
+ public String name()
+ {
+ return this.toString();
+ }
+
@Override
public void start()
{