From abab29ab67d08212a1184cac0af10f16d03b2f39 Mon Sep 17 00:00:00 2001 From: Tullio Date: Sat, 17 Dec 2016 11:00:40 +0100 Subject: [PATCH] Added MMB Data message to correctly separate business and administrative data messages Added MMB Startup message to address application-level association between GWs and SVCs Handled MMB Startup/Shutdown messages to address application-level association between GWs and SVCs as per issues #9 and #15 Changed some thread running variables to use volatile instead of atomic primitives Added initial status counters support Refactored report classes and functions Added/modified unit and perf tests --- build.gradle | 4 +- resources/Helios-MMB.xml | 24 +- src/org/helios/AeronStream.java | 12 +- src/org/helios/Helios.java | 54 ++-- src/org/helios/HeliosConfiguration.java | 12 +- src/org/helios/HeliosContext.java | 28 ++ src/org/helios/HeliosDriver.java | 8 +- src/org/helios/HeliosGateway.java | 101 +++--- src/org/helios/HeliosService.java | 62 ++-- src/org/helios/archive/ArchiveProcessor.java | 18 +- src/org/helios/gateway/Gateway.java | 10 +- src/org/helios/gateway/GatewayReport.java | 72 ++--- src/org/helios/heartbeat/Heartbeat.java | 23 -- src/org/helios/infra/ConsoleReporter.java | 51 +++ src/org/helios/infra/Heartbeat.java | 23 -- src/org/helios/infra/InputMessageHandler.java | 107 ++++++- .../helios/infra/InputMessageProcessor.java | 143 ++++++--- src/org/helios/infra/InputReport.java | 18 ++ .../helios/infra/OutputMessageHandler.java | 28 +- .../helios/infra/OutputMessageProcessor.java | 141 +++++++- src/org/helios/infra/OutputReport.java | 18 ++ src/org/helios/infra/Processor.java | 2 + src/org/helios/infra/RateReport.java | 8 - src/org/helios/infra/RateReporter.java | 51 --- src/org/helios/infra/Report.java | 12 + src/org/helios/infra/ReportProcessor.java | 64 ++++ src/org/helios/infra/Reporter.java | 7 + src/org/helios/infra/RingBufferProcessor.java | 18 +- src/org/helios/journal/JournalHandler.java | 7 +- src/org/helios/journal/JournalProcessor.java | 6 + src/org/helios/mmb/DataMessage.java | 88 +++++ src/org/helios/mmb/HeartbeatMessage.java | 55 ++++ .../helios/mmb/HeartbeatMessageFactory.java | 38 --- src/org/helios/mmb/SnapshotMessage.java | 98 ++++++ .../helios/mmb/SnapshotMessageFactory.java | 59 ---- src/org/helios/mmb/StartupMessage.java | 48 +++ src/org/helios/replica/ReplicaProcessor.java | 6 + src/org/helios/service/Service.java | 8 +- src/org/helios/service/ServiceReport.java | 66 ++-- src/org/helios/snapshot/Snapshot.java | 32 -- src/org/helios/snapshot/SnapshotTimer.java | 8 +- .../status/StatusCounterDescriptor.java | 58 ++++ src/org/helios/status/StatusCounters.java | 30 ++ test-perf/echo/EchoConfiguration.java | 12 +- test-perf/echo/EchoEmbedded.java | 31 +- test-perf/echo/EchoEmbeddedIpc.java | 103 ++++++ test-perf/echo/EchoGateway.java | 51 +-- test-perf/echo/EchoGatewayHandler.java | 57 +++- test-perf/echo/EchoService.java | 19 +- test-perf/runEchoEmbeddedIpc.sh | 10 + test/org/helios/HeliosTest.java | 303 ++++++++++-------- .../SnapshotMessageTest.java} | 18 +- .../org/helios/service/ServiceReportTest.java | 15 +- test/org/helios/status/StatusCounterTest.java | 60 ++++ test/org/helios/util/ProcessorHelperTest.java | 6 + 55 files changed, 1680 insertions(+), 731 deletions(-) delete mode 100644 src/org/helios/heartbeat/Heartbeat.java create mode 100644 src/org/helios/infra/ConsoleReporter.java delete mode 100644 src/org/helios/infra/Heartbeat.java create mode 100644 src/org/helios/infra/InputReport.java create mode 100644 src/org/helios/infra/OutputReport.java delete mode 100644 src/org/helios/infra/RateReport.java delete mode 100644 src/org/helios/infra/RateReporter.java create mode 100644 src/org/helios/infra/Report.java create mode 100644 src/org/helios/infra/ReportProcessor.java create mode 100644 src/org/helios/infra/Reporter.java create mode 100644 src/org/helios/mmb/DataMessage.java create mode 100644 src/org/helios/mmb/HeartbeatMessage.java delete mode 100644 src/org/helios/mmb/HeartbeatMessageFactory.java create mode 100644 src/org/helios/mmb/SnapshotMessage.java delete mode 100644 src/org/helios/mmb/SnapshotMessageFactory.java create mode 100644 src/org/helios/mmb/StartupMessage.java delete mode 100644 src/org/helios/snapshot/Snapshot.java create mode 100644 src/org/helios/status/StatusCounterDescriptor.java create mode 100644 src/org/helios/status/StatusCounters.java create mode 100644 test-perf/echo/EchoEmbeddedIpc.java create mode 100755 test-perf/runEchoEmbeddedIpc.sh rename test/org/helios/{snapshot/SnapshotTest.java => mmb/SnapshotMessageTest.java} (87%) create mode 100644 test/org/helios/status/StatusCounterTest.java diff --git a/build.gradle b/build.gradle index b3cc95c..cebb9d5 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,3 @@ -import com.sun.org.apache.xalan.internal.xsltc.compiler.Copy - apply plugin: 'java' apply plugin: 'maven' apply plugin: 'signing' @@ -10,7 +8,7 @@ apply plugin: 'idea' defaultTasks 'clean', 'build', 'install' group = 'org.helios' -version = '0.0.4-RC' +version = '0.0.4' ext.isReleaseVersion = !version.endsWith("-RC") diff --git a/resources/Helios-MMB.xml b/resources/Helios-MMB.xml index 3195a9c..f0f684a 100644 --- a/resources/Helios-MMB.xml +++ b/resources/Helios-MMB.xml @@ -31,9 +31,9 @@ - - S - G + + 0 + 1 @@ -45,24 +45,32 @@ - + - + - + - + - + + + + + + + + + \ No newline at end of file diff --git a/src/org/helios/AeronStream.java b/src/org/helios/AeronStream.java index b0ecafb..6b66816 100644 --- a/src/org/helios/AeronStream.java +++ b/src/org/helios/AeronStream.java @@ -2,6 +2,7 @@ import io.aeron.Aeron; import io.aeron.Image; +import org.helios.mmb.sbe.ComponentType; import java.util.Objects; @@ -24,20 +25,15 @@ static AeronStream fromImage(final Aeron aeron, final Image image) public final Aeron aeron; public final String channel; public final int streamId; + public ComponentType componentType; + public short componentId; private final String key; @Override public boolean equals(Object obj) { - if (obj != null && obj instanceof AeronStream) - { - return ((AeronStream)obj).key.equals(key); - } - else - { - return false; - } + return obj != null && obj instanceof AeronStream && ((AeronStream) obj).key.equals(key); } @Override diff --git a/src/org/helios/Helios.java b/src/org/helios/Helios.java index a09cbfd..09256d9 100644 --- a/src/org/helios/Helios.java +++ b/src/org/helios/Helios.java @@ -23,7 +23,7 @@ import org.helios.gateway.GatewayHandler; import org.helios.gateway.GatewayHandlerFactory; import org.helios.infra.AvailableAssociationHandler; -import org.helios.infra.RateReporter; +import org.helios.infra.ReportProcessor; import org.helios.infra.UnavailableAssociationHandler; import org.helios.service.Service; import org.helios.service.ServiceHandler; @@ -46,7 +46,7 @@ public class Helios implements AutoCloseable, ErrorHandler, AvailableImageHandle private final Long2ObjectHashMap> serviceSubscriptionRepository; private final Long2ObjectHashMap> gatewaySubscriptionRepository; - private RateReporter reporter; + private ReportProcessor reporter; public Helios() { @@ -65,6 +65,7 @@ public Helios(final HeliosContext context, final HeliosDriver driver) final Aeron.Context aeronContext = new Aeron.Context() .errorHandler(this).availableImageHandler(this).unavailableImageHandler(this); + if (context.isMediaDriverEmbedded()) { aeronContext.aeronDirectoryName(driver.mediaDriver().aeronDirectoryName()); @@ -78,7 +79,7 @@ public Helios(final HeliosContext context, final HeliosDriver driver) serviceSubscriptionRepository = new Long2ObjectHashMap<>(); gatewaySubscriptionRepository = new Long2ObjectHashMap<>(); - reporter = context.isReportingEnabled() ? new RateReporter() : null; + reporter = context.isReportingEnabled() ? new ReportProcessor(1_000_000_000L, null) : null; // TODO: configure } public void start() @@ -159,21 +160,22 @@ public AeronStream newStream(final String channel, final int streamId) return new AeronStream(aeron, channel, streamId); } - public AeronStream newEmbeddedStream(final int streamId) + public AeronStream newIpcStream(final int streamId) { return newStream(CommonContext.IPC_CHANNEL, streamId); } - public Service addService(final ServiceHandlerFactory factory) + public Service addService(final ServiceHandlerFactory factory, final AeronStream reqStream) { Objects.requireNonNull(factory, "factory"); + Objects.requireNonNull(reqStream, "reqStream"); - final HeliosService svc = new HeliosService<>(this, factory); + final HeliosService svc = new HeliosService<>(this, factory, reqStream); serviceList.add(svc); if (reporter != null) { - reporter.addAll(svc.reportList()); + reporter.add(svc.report()); } return svc; @@ -182,15 +184,15 @@ public Service addService(final ServiceHandlerFact public Service addService(final ServiceHandlerFactory factory, final AeronStream reqStream, final AeronStream rspStream) { - final Service svc = addService(factory); + final Service svc = addService(factory, reqStream); - return svc.addEndPoint(reqStream, rspStream); + return svc.addEndPoint(rspStream); } - public Service addService(final ServiceHandlerFactory factory, + public Service addService(final ServiceHandlerFactory factory, final AeronStream reqStream, final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler) { - final Service svc = addService(factory); + final Service svc = addService(factory, reqStream); svc.availableAssociationHandler(availableHandler).unavailableAssociationHandler(unavailableHandler); @@ -201,51 +203,49 @@ public Service addService(final ServiceHandlerFact final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler, final AeronStream reqStream, final AeronStream rspStream) { - final Service svc = addService(factory, availableHandler, unavailableHandler); + final Service svc = addService(factory, reqStream, availableHandler, unavailableHandler); - return svc.addEndPoint(reqStream, rspStream); + return svc.addEndPoint(rspStream); } - public Gateway addGateway(final GatewayHandlerFactory factory) + public Gateway addGateway() { - Objects.requireNonNull(factory, "factory"); - - final HeliosGateway gw = new HeliosGateway<>(this, factory); + final HeliosGateway gw = new HeliosGateway<>(this); gatewayList.add(gw); if (reporter != null) { - reporter.addAll(gw.reportList()); + reporter.add(gw.report()); } return gw; } - public Gateway addGateway(final GatewayHandlerFactory factory, + public T addGateway(final GatewayHandlerFactory factory, final AeronStream reqStream, final AeronStream rspStream) { - final Gateway gw = addGateway(factory); + final Gateway gw = addGateway(); - return gw.addEndPoint(reqStream, rspStream); + return gw.addEndPoint(reqStream, rspStream, factory); } - public Gateway addGateway(final GatewayHandlerFactory factory, - final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler) + public Gateway addGateway(final AvailableAssociationHandler availableHandler, + final UnavailableAssociationHandler unavailableHandler) { - final Gateway gw = addGateway(factory); + final Gateway gw = addGateway(); gw.availableAssociationHandler(availableHandler).unavailableAssociationHandler(unavailableHandler); return gw; } - public Gateway addGateway(final GatewayHandlerFactory factory, + public T addGateway(final GatewayHandlerFactory factory, final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler, final AeronStream reqStream, final AeronStream rspStream) { - final Gateway gw = addGateway(factory, availableHandler, unavailableHandler); + final Gateway gw = addGateway(availableHandler, unavailableHandler); - return gw.addEndPoint(reqStream, rspStream); + return gw.addEndPoint(reqStream, rspStream, factory); } HeliosContext context() diff --git a/src/org/helios/HeliosConfiguration.java b/src/org/helios/HeliosConfiguration.java index 6c678e5..74687db 100644 --- a/src/org/helios/HeliosConfiguration.java +++ b/src/org/helios/HeliosConfiguration.java @@ -2,11 +2,11 @@ import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.YieldingWaitStrategy; -import org.helios.journal.Journalling; -import org.helios.journal.strategy.PositionalJournalling; import org.agrona.LangUtil; -import org.agrona.concurrent.BackoffIdleStrategy; import org.agrona.concurrent.IdleStrategy; +import org.agrona.concurrent.SleepingIdleStrategy; +import org.helios.journal.Journalling; +import org.helios.journal.strategy.PositionalJournalling; import java.nio.file.Path; import java.nio.file.Paths; @@ -49,6 +49,9 @@ public class HeliosConfiguration public static final long MIN_PARK_NS = getLong("helios.core.back_off.idle.strategy.min_park_ns", 1000); public static final long MAX_PARK_NS = getLong("helios.core.back_off.idle.strategy.max_park_ns", 100000); + public static final int HEARTBEAT_INTERVAL_MS = getInteger("helios.mmb.heartbeat_interval", 1000000); //1000 + public static final int HEARTBEAT_LIVENESS = getInteger("helios.mmb.heartbeat_liveness", 3); + public static Journalling journalStrategy() { return newJournalStrategy(JOURNAL_STRATEGY); @@ -117,7 +120,8 @@ private static IdleStrategy newIdleStrategy(final String strategyClassName) if (strategyClassName == null) { - idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS); + //idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS); + idleStrategy = new SleepingIdleStrategy(MAX_PARK_NS); } else { diff --git a/src/org/helios/HeliosContext.java b/src/org/helios/HeliosContext.java index 969322a..3f8b904 100644 --- a/src/org/helios/HeliosContext.java +++ b/src/org/helios/HeliosContext.java @@ -21,6 +21,9 @@ public class HeliosContext private IdleStrategy publisherIdleStrategy; private IdleStrategy subscriberIdleStrategy; + private int heartbeatInterval; + private int heartbeatLiveness; + public HeliosContext() { setMediaDriverConf(HeliosConfiguration.MEDIA_DRIVER_CONF); @@ -38,6 +41,9 @@ public HeliosContext() setWriteIdleStrategy(HeliosConfiguration.writeIdleStrategy()); setPublisherIdleStrategy(HeliosConfiguration.publisherIdleStrategy()); setSubscriberIdleStrategy(HeliosConfiguration.subscriberIdleStrategy()); + + setHeartbeatInterval(HeliosConfiguration.HEARTBEAT_INTERVAL_MS); + setHeartbeatLiveness(HeliosConfiguration.HEARTBEAT_LIVENESS); } public HeliosContext setMediaDriverConf(String mediaDriverConf) @@ -118,6 +124,18 @@ public HeliosContext setSubscriberIdleStrategy(IdleStrategy subscriberIdleStrate return this; } + public HeliosContext setHeartbeatInterval(int heartbeatInterval) + { + this.heartbeatInterval = heartbeatInterval; + return this; + } + + public HeliosContext setHeartbeatLiveness(int heartbeatLiveness) + { + this.heartbeatLiveness = heartbeatLiveness; + return this; + } + public String getMediaDriverConf() { return mediaDriverConf; @@ -182,4 +200,14 @@ public IdleStrategy subscriberIdleStrategy() { return subscriberIdleStrategy; } + + public int heartbeatInterval() + { + return heartbeatInterval; + } + + public int heartbeatLiveness() + { + return heartbeatLiveness; + } } diff --git a/src/org/helios/HeliosDriver.java b/src/org/helios/HeliosDriver.java index 1688881..9cf08db 100644 --- a/src/org/helios/HeliosDriver.java +++ b/src/org/helios/HeliosDriver.java @@ -15,9 +15,15 @@ public class HeliosDriver implements Closeable private final MediaDriver mediaDriver; public HeliosDriver(final HeliosContext context) + { + this(context, AERON_DIR_NAME_DEFAULT); + } + + public HeliosDriver(final HeliosContext context, final String aeronDirectoryName) { this(context, new MediaDriver.Context() - .aeronDirectoryName(AERON_DIR_NAME_DEFAULT) + .aeronDirectoryName(aeronDirectoryName) + .termBufferSparseFile(false) .threadingMode(ThreadingMode.SHARED) .conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1)) .receiverIdleStrategy(new NoOpIdleStrategy()) diff --git a/src/org/helios/HeliosGateway.java b/src/org/helios/HeliosGateway.java index f81e903..13d7606 100644 --- a/src/org/helios/HeliosGateway.java +++ b/src/org/helios/HeliosGateway.java @@ -14,6 +14,7 @@ import org.helios.gateway.GatewayHandlerFactory; import org.helios.gateway.GatewayReport; import org.helios.infra.*; +import org.helios.mmb.sbe.ComponentType; import org.helios.util.DirectBufferAllocator; import org.helios.util.ProcessorHelper; import org.helios.util.RingBufferPool; @@ -25,45 +26,46 @@ import static org.agrona.concurrent.ringbuffer.RingBufferDescriptor.TRAILER_LENGTH; -public class HeliosGateway implements Gateway, AssociationHandler, +class HeliosGateway implements Gateway, AssociationHandler, AvailableImageHandler, UnavailableImageHandler { private static final int FRAME_COUNT_LIMIT = Integer.getInteger("helios.gateway.poll.frame_count_limit", 10); // TODO: from HeliosContext + private static short nextGatewayId = 0; + private final short gatewayId; private final Helios helios; - private final InputMessageProcessor svcResponseProcessor; private final RingBufferPool ringBufferPool; - private final List svcRequestProcessorList; - private final RingBufferProcessor gatewayProcessor; + private final List gwOutputProcessorList; + private final List gwInputProcessorList; + private final List gatewayProcessorList; private final List eventProcessorList; - private final List reportList; + private final GatewayReport report; private AvailableAssociationHandler availableAssociationHandler; private UnavailableAssociationHandler unavailableAssociationHandler; - public HeliosGateway(final Helios helios, final GatewayHandlerFactory factory) + HeliosGateway(final Helios helios) { this.helios = helios; + gatewayId = ++nextGatewayId; ringBufferPool = new RingBufferPool(); - svcRequestProcessorList = new ArrayList<>(); + gwOutputProcessorList = new ArrayList<>(); + gwInputProcessorList = new ArrayList<>(); + gatewayProcessorList = new ArrayList<>(); eventProcessorList = new ArrayList<>(); - reportList = new ArrayList<>(); - final ByteBuffer inputBuffer = DirectBufferAllocator.allocateCacheAligned((16 * 1024) + TRAILER_LENGTH); // TODO: configure - final RingBuffer inputRingBuffer = new OneToOneRingBuffer(new UnsafeBuffer(inputBuffer)); - - final T gatewayHandler = factory.createGatewayHandler(ringBufferPool); - gatewayProcessor = new RingBufferProcessor<>(inputRingBuffer, gatewayHandler, new BusySpinIdleStrategy(), "gwProcessor"); - - final IdleStrategy pollIdleStrategy = helios.context().subscriberIdleStrategy(); - svcResponseProcessor = new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT, "svcResponseProcessor"); + report = new GatewayReport(); } @Override - public Gateway addEndPoint(final AeronStream reqStream, final AeronStream rspStream) + public T addEndPoint(final AeronStream reqStream, final AeronStream rspStream, final GatewayHandlerFactory factory) { Objects.requireNonNull(reqStream, "reqStream"); Objects.requireNonNull(rspStream, "rspStream"); + Objects.requireNonNull(factory, "factory"); + + reqStream.componentType = ComponentType.Gateway; + reqStream.componentId = gatewayId; final IdleStrategy idleStrategy = new BusySpinIdleStrategy(); @@ -72,17 +74,34 @@ public Gateway addEndPoint(final AeronStream reqStream, final AeronStream rsp ringBufferPool.addOutputRingBuffer(reqStream, outputRingBuffer); - final OutputMessageProcessor svcRequestProcessor = - new OutputMessageProcessor(outputRingBuffer, reqStream, idleStrategy, "svcRequestProcessor"); + final int heartbeatInterval = helios.context().heartbeatInterval(); + final OutputMessageProcessor gwOutputProcessor = + new OutputMessageProcessor(outputRingBuffer, reqStream, idleStrategy, heartbeatInterval, "gwOutputProcessor"); + + gwOutputProcessorList.add(gwOutputProcessor); + + final ByteBuffer inputBuffer = DirectBufferAllocator.allocateCacheAligned((16 * 1024) + TRAILER_LENGTH); // TODO: configure + final RingBuffer inputRingBuffer = new OneToOneRingBuffer(new UnsafeBuffer(inputBuffer)); - svcRequestProcessorList.add(svcRequestProcessor); + final T gatewayHandler = factory.createGatewayHandler(ringBufferPool); + final RingBufferProcessor gatewayProcessor = new RingBufferProcessor<>(inputRingBuffer, gatewayHandler, new BusySpinIdleStrategy(), "gwProcessor"); + gatewayProcessorList.add(gatewayProcessor); - final long subscriptionId = svcResponseProcessor.addSubscription(rspStream); + final IdleStrategy pollIdleStrategy = helios.context().subscriberIdleStrategy(); + final int heartbeatLiveness = helios.context().heartbeatLiveness(); + final InputMessageProcessor gwInputProcessor = + new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT, heartbeatLiveness, + rspStream, this, "gwInputProcessor"); + + gwInputProcessorList.add(gwInputProcessor); + + final long subscriptionId = gwInputProcessor.subscriptionId(); helios.addGatewaySubscription(subscriptionId, this); - reportList.add(new GatewayReport(svcRequestProcessor, svcResponseProcessor)); + report.addRequestProcessor(gwOutputProcessor); + report.addResponseProcessor(gwInputProcessor); - return this; + return gatewayHandler; } @Override @@ -97,8 +116,10 @@ public Gateway addEventChannel(final AeronStream eventStream) ringBufferPool.addEventRingBuffer(eventStream, eventRingBuffer); + final int heartbeatLiveness = helios.context().heartbeatLiveness(); final InputMessageProcessor eventProcessor = - new InputMessageProcessor(eventRingBuffer, readIdleStrategy, FRAME_COUNT_LIMIT, "eventProcessor"); // FIXME: eventProcessor name + new InputMessageProcessor(eventRingBuffer, readIdleStrategy, FRAME_COUNT_LIMIT, + heartbeatLiveness, eventStream, null, "eventProcessor"); // FIXME: eventProcessor name eventProcessorList.add(eventProcessor); @@ -106,24 +127,18 @@ public Gateway addEventChannel(final AeronStream eventStream) } @Override - public List reportList() + public Report report() { - return reportList; - } - - @Override - public T handler() - { - return gatewayProcessor.handler(); + return report; } @Override public void start() { - ProcessorHelper.start(gatewayProcessor); + gatewayProcessorList.forEach(ProcessorHelper::start); eventProcessorList.forEach(ProcessorHelper::start); - svcRequestProcessorList.forEach(ProcessorHelper::start); - ProcessorHelper.start(svcResponseProcessor); + gwOutputProcessorList.forEach(ProcessorHelper::start); + gwInputProcessorList.forEach(ProcessorHelper::start); } @Override @@ -143,19 +158,13 @@ public Gateway unavailableAssociationHandler(final UnavailableAssociationHand @Override public void onAvailableImage(final Image image) { - svcResponseProcessor.onAvailableImage(image); - - // TODO: send HEARTBEAT to service through GatewayHandler - - onAssociationEstablished(); // TODO: remove after HEARTBEAT handling + gwInputProcessorList.forEach((p) -> p.onAvailableImage(image)); } @Override public void onUnavailableImage(final Image image) { - svcResponseProcessor.onUnavailableImage(image); - - onAssociationBroken(); // TODO: remove after HEARTBEAT handling + gwInputProcessorList.forEach((p) -> p.onUnavailableImage(image)); } @Override @@ -179,9 +188,9 @@ public void onAssociationBroken() @Override public void close() { - CloseHelper.quietClose(svcResponseProcessor); - svcRequestProcessorList.forEach(CloseHelper::quietClose); + gwInputProcessorList.forEach(CloseHelper::quietClose); + gwOutputProcessorList.forEach(CloseHelper::quietClose); eventProcessorList.forEach(CloseHelper::quietClose); - CloseHelper.quietClose(gatewayProcessor); + gatewayProcessorList.forEach(CloseHelper::quietClose); } } diff --git a/src/org/helios/HeliosService.java b/src/org/helios/HeliosService.java index 5ee2a13..ffc6ef5 100644 --- a/src/org/helios/HeliosService.java +++ b/src/org/helios/HeliosService.java @@ -15,6 +15,7 @@ import org.helios.journal.JournalProcessor; import org.helios.journal.JournalWriter; import org.helios.journal.Journalling; +import org.helios.mmb.sbe.ComponentType; import org.helios.replica.ReplicaHandler; import org.helios.replica.ReplicaProcessor; import org.helios.service.Service; @@ -44,16 +45,18 @@ public class HeliosService implements Service, Asso private final long TICK_DURATION_NS = TimeUnit.MICROSECONDS.toNanos(100); // TODO: configure private final int TICKS_PER_WHEEL = 512; // TODO: configure + private static short nextServiceId = 0; + private final short serviceId; private final Helios helios; - private final InputMessageProcessor gwRequestProcessor; + private final InputMessageProcessor svcInputProcessor; private final RingBufferPool ringBufferPool; private final List gwResponseProcessorList; private final JournalProcessor journalProcessor; private final ReplicaProcessor replicaProcessor; private final RingBufferProcessor serviceProcessor; private final List eventProcessorList; - private final List reportList; + private final ServiceReport report; private final TimerWheel timerWheel; private final SnapshotTimer snapshotTimer; private final AtomicBoolean timerWheelRunning; @@ -61,14 +64,14 @@ public class HeliosService implements Service, Asso private AvailableAssociationHandler availableAssociationHandler; private UnavailableAssociationHandler unavailableAssociationHandler; - public HeliosService(final Helios helios, final ServiceHandlerFactory factory) + public HeliosService(final Helios helios, final ServiceHandlerFactory factory, final AeronStream reqStream) { this.helios = helios; + serviceId = ++nextServiceId; ringBufferPool = new RingBufferPool(); gwResponseProcessorList = new ArrayList<>(); eventProcessorList = new ArrayList<>(); - reportList = new ArrayList<>(); final HeliosContext context = helios.context(); @@ -127,19 +130,25 @@ public HeliosService(final Helios helios, final ServiceHandlerFactory factory final T serviceHandler = factory.createServiceHandler(ringBufferPool); serviceProcessor = new RingBufferProcessor<>( isJournalEnabled ? journalRingBuffer : (isReplicaEnabled ? replicaRingBuffer : inputRingBuffer), - serviceHandler, new BusySpinIdleStrategy(), "serviceProcessor"); + serviceHandler, new BusySpinIdleStrategy(), "svcProcessor"); final IdleStrategy pollIdleStrategy = context.subscriberIdleStrategy(); - gwRequestProcessor = new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT, "gwRequestProcessor"); + final int heartbeatLiveness = context.heartbeatLiveness(); + svcInputProcessor = new InputMessageProcessor(inputRingBuffer, pollIdleStrategy, FRAME_COUNT_LIMIT, + heartbeatLiveness, reqStream, this, "svcInputProcessor"); + + final long subscriptionId = svcInputProcessor.subscriptionId(); + helios.addServiceSubscription(subscriptionId, this); + + report = new ServiceReport(svcInputProcessor); } - public Service addEndPoint(final AeronStream reqStream, final AeronStream rspStream) + public Service addEndPoint(final AeronStream rspStream) { - Objects.requireNonNull(reqStream, "reqStream"); Objects.requireNonNull(rspStream, "rspStream"); - final long subscriptionId = gwRequestProcessor.addSubscription(reqStream); - helios.addServiceSubscription(subscriptionId, this); + rspStream.componentType = ComponentType.Service; + rspStream.componentId = serviceId; final IdleStrategy writeIdleStrategy = helios.context().writeIdleStrategy(); @@ -148,14 +157,13 @@ public Service addEndPoint(final AeronStream reqStream, final AeronStream rsp ringBufferPool.addOutputRingBuffer(rspStream, outputRingBuffer); - final OutputMessageProcessor gwResponseProcessor = - new OutputMessageProcessor(outputRingBuffer, rspStream, writeIdleStrategy, "gwResponseProcessor"); // FIXME: gwResponseProcessor name + final int heartbeatInterval = helios.context().heartbeatInterval(); + final OutputMessageProcessor svcOutputProcessor = + new OutputMessageProcessor(outputRingBuffer, rspStream, writeIdleStrategy, heartbeatInterval, "svcOutputProcessor"); // FIXME: gwResponseProcessor name - gwResponseProcessorList.add(gwResponseProcessor); + gwResponseProcessorList.add(svcOutputProcessor); - final long publicationId = gwResponseProcessor.handler().outputPublicationId(); - - reportList.add(new ServiceReport(gwRequestProcessor, gwResponseProcessor)); + report.addResponseProcessor(svcOutputProcessor); return this; } @@ -165,6 +173,9 @@ public Service addEventChannel(final AeronStream eventStream) { Objects.requireNonNull(eventStream, "eventStream"); + eventStream.componentType = ComponentType.Service; + eventStream.componentId = serviceId; + final IdleStrategy writeIdleStrategy = helios.context().writeIdleStrategy(); final ByteBuffer eventBuffer = DirectBufferAllocator.allocateCacheAligned((16 * 1024) + TRAILER_LENGTH); // TODO: configure @@ -172,8 +183,9 @@ public Service addEventChannel(final AeronStream eventStream) ringBufferPool.addEventRingBuffer(eventStream, eventRingBuffer); + final int heartbeatInterval = helios.context().heartbeatInterval(); final OutputMessageProcessor eventProcessor = - new OutputMessageProcessor(eventRingBuffer, eventStream, writeIdleStrategy, "eventProcessor"); // FIXME: eventProcessor name + new OutputMessageProcessor(eventRingBuffer, eventStream, writeIdleStrategy, heartbeatInterval, "eventProcessor"); // FIXME: eventProcessor name eventProcessorList.add(eventProcessor); @@ -181,9 +193,9 @@ public Service addEventChannel(final AeronStream eventStream) } @Override - public List reportList() + public Report report() { - return reportList; + return report; } @Override @@ -200,7 +212,7 @@ public void start() ProcessorHelper.start(journalProcessor); eventProcessorList.forEach(ProcessorHelper::start); gwResponseProcessorList.forEach(ProcessorHelper::start); - ProcessorHelper.start(gwRequestProcessor); + ProcessorHelper.start(svcInputProcessor); timerExecutor.execute(() -> { while (timerWheelRunning.get()) { @@ -227,17 +239,13 @@ public Service unavailableAssociationHandler(final UnavailableAssociationHand @Override public void onAvailableImage(final Image image) { - gwRequestProcessor.onAvailableImage(image); - - onAssociationEstablished(); + svcInputProcessor.onAvailableImage(image); } @Override public void onUnavailableImage(final Image image) { - gwRequestProcessor.onUnavailableImage(image); - - onAssociationBroken(); + svcInputProcessor.onUnavailableImage(image); } @Override @@ -265,7 +273,7 @@ public void close() timerWheelRunning.set(false); timerExecutor.shutdown(); - CloseHelper.quietClose(gwRequestProcessor); + CloseHelper.quietClose(svcInputProcessor); gwResponseProcessorList.forEach(CloseHelper::quietClose); eventProcessorList.forEach(CloseHelper::quietClose); CloseHelper.quietClose(journalProcessor); diff --git a/src/org/helios/archive/ArchiveProcessor.java b/src/org/helios/archive/ArchiveProcessor.java index c3203ee..517bb68 100644 --- a/src/org/helios/archive/ArchiveProcessor.java +++ b/src/org/helios/archive/ArchiveProcessor.java @@ -5,12 +5,10 @@ import com.lmax.disruptor.util.DaemonThreadFactory; import org.helios.infra.Processor; -import java.util.concurrent.atomic.AtomicBoolean; - public class ArchiveProcessor implements Processor { private final Disruptor eventDisruptor; - private final AtomicBoolean running; + private volatile boolean running; private final Thread archiveThread; @SuppressWarnings("unchecked") @@ -20,21 +18,27 @@ public ArchiveProcessor(final EventFactory eventFactory, int ringBufferSize, eventDisruptor = new Disruptor<>(eventFactory, ringBufferSize, DaemonThreadFactory.INSTANCE); eventDisruptor.handleEventsWith(new ArchiveEventHandler<>(eventClass, batchSize, batchHandler)); - running = new AtomicBoolean(false); + running = false; archiveThread = new Thread(this, "archiveProcessor"); } + @Override + public String name() + { + return archiveThread.getName(); + } + @Override public void start() { - running.set(true); + running = true; archiveThread.start(); } @Override public void run() { - while (running.get()) + while (running) { // TODO: } @@ -43,7 +47,7 @@ public void run() @Override public void close() throws Exception { - running.set(false); + running = false; archiveThread.join(); } } diff --git a/src/org/helios/gateway/Gateway.java b/src/org/helios/gateway/Gateway.java index 640836d..d169bec 100644 --- a/src/org/helios/gateway/Gateway.java +++ b/src/org/helios/gateway/Gateway.java @@ -2,14 +2,12 @@ import org.helios.AeronStream; import org.helios.infra.AvailableAssociationHandler; -import org.helios.infra.RateReport; +import org.helios.infra.Report; import org.helios.infra.UnavailableAssociationHandler; -import java.util.List; - public interface Gateway extends AutoCloseable { - Gateway addEndPoint(final AeronStream reqStream, final AeronStream rspStream); + T addEndPoint(final AeronStream reqStream, final AeronStream rspStream, final GatewayHandlerFactory factory); Gateway addEventChannel(final AeronStream eventStream); @@ -17,9 +15,7 @@ public interface Gateway extends AutoCloseable Gateway unavailableAssociationHandler(final UnavailableAssociationHandler handler); - List reportList(); - - T handler(); + Report report(); void start(); } diff --git a/src/org/helios/gateway/GatewayReport.java b/src/org/helios/gateway/GatewayReport.java index e2cb9d3..2ab0ee7 100644 --- a/src/org/helios/gateway/GatewayReport.java +++ b/src/org/helios/gateway/GatewayReport.java @@ -1,51 +1,51 @@ package org.helios.gateway; -import org.helios.infra.InputMessageProcessor; -import org.helios.infra.OutputMessageProcessor; -import org.helios.infra.RateReport; +import org.helios.infra.*; -import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; -public final class GatewayReport implements RateReport +public final class GatewayReport implements Report { - private final OutputMessageProcessor requestProcessor; - private final InputMessageProcessor responseProcessor; + private final List inputReportList; + private final List outputReportList; - private long lastTimeStamp; - private long lastSuccessfulWrites; - private long lastBytesWritten; - private long lastSuccessfulReads; + public GatewayReport() + { + inputReportList = new ArrayList<>(); + outputReportList = new ArrayList<>(); + } + + public void addRequestProcessor(final OutputMessageProcessor requestProcessor) + { + Objects.requireNonNull(requestProcessor, "requestProcessor"); + + outputReportList.add(requestProcessor); + } + + public void addResponseProcessor(final InputMessageProcessor responseProcessor) + { + Objects.requireNonNull(responseProcessor, "responseProcessor"); + + inputReportList.add(responseProcessor); + } - public GatewayReport(final OutputMessageProcessor requestProcessor, final InputMessageProcessor responseProcessor) + @Override + public String name() { - this.requestProcessor = requestProcessor; - this.responseProcessor = responseProcessor; + return "GatewayReport"; + } - lastTimeStamp = System.currentTimeMillis(); + @Override + public List inputReports() + { + return inputReportList; } @Override - public void print(final PrintStream stream) + public List outputReports() { - final long timeStamp = System.currentTimeMillis(); - long successfulWrites = requestProcessor.handler().successfulWrites(); - long bytesWritten = requestProcessor.handler().bytesWritten(); - long successfulReads = responseProcessor.successfulReads(); - long failedReads = responseProcessor.failedReads(); - - final long duration = timeStamp - lastTimeStamp; - final long successfulWritesDelta = successfulWrites - lastSuccessfulWrites; - final long bytesTransferred = bytesWritten - lastBytesWritten; - final long successfulReadsDelta = successfulReads - lastSuccessfulReads; - - final double failureRatio = failedReads / (double)(successfulReads + failedReads); - - stream.format("GatewayReport: T %dms OUT %,d messages - %,d bytes - IN %,d messages [read failure ratio: %f]\n", - duration, successfulWritesDelta, bytesTransferred, successfulReadsDelta, failureRatio); - - lastTimeStamp = timeStamp; - lastSuccessfulWrites = successfulWrites; - lastBytesWritten = bytesWritten; - lastSuccessfulReads = successfulReads; + return outputReportList; } } diff --git a/src/org/helios/heartbeat/Heartbeat.java b/src/org/helios/heartbeat/Heartbeat.java deleted file mode 100644 index b5b35c7..0000000 --- a/src/org/helios/heartbeat/Heartbeat.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.helios.heartbeat; - -import org.agrona.concurrent.IdleStrategy; -import org.agrona.concurrent.ringbuffer.RingBuffer; -import org.helios.infra.MessageTypes; - -import java.util.Objects; - -import static org.helios.mmb.HeartbeatMessageFactory.MESSAGE_LENGTH; -import static org.helios.mmb.HeartbeatMessageFactory.heartbeatBuffer; - -public final class Heartbeat -{ - public static void writeMessage(final RingBuffer outputRingBuffer, final IdleStrategy idleStrategy) - { - Objects.requireNonNull(idleStrategy, "idleStrategy"); - - while (!outputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, heartbeatBuffer, 0, MESSAGE_LENGTH)) - { - idleStrategy.idle(0); - } - } -} diff --git a/src/org/helios/infra/ConsoleReporter.java b/src/org/helios/infra/ConsoleReporter.java new file mode 100644 index 0000000..d4f0a6b --- /dev/null +++ b/src/org/helios/infra/ConsoleReporter.java @@ -0,0 +1,51 @@ +package org.helios.infra; + +import java.io.PrintStream; + +public final class ConsoleReporter implements Reporter +{ + private final PrintStream stream = System.out; + private long lastTimestamp = System.nanoTime(); + + @Override + public void onReport(final Report report) + { + final long currentTimestamp = System.nanoTime(); + final long duration = currentTimestamp - lastTimestamp; + lastTimestamp = currentTimestamp; + + stream.format("%s: T %dms ", report.name(), duration); + + for (final InputReport inputReport : report.inputReports()) + { + final long successfulReads = inputReport.successfulReads(); + final long failedReads = inputReport.failedReads(); + final long administrativeMessages = inputReport.administrativeMessages(); + final long heartbeatReceived = inputReport.heartbeatReceived(); + final long applicationMessages = inputReport.applicationMessages(); + final long bytesRead = inputReport.bytesRead(); + + final double failureRatio = failedReads / (double)(successfulReads + failedReads); + + stream.format("IN %s: %,d messages (%,d ADMIN %,d HBT %,d APP) - %,d failed - %,d bytes [read failure ratio: %f]\n", + inputReport.name(), successfulReads, administrativeMessages, heartbeatReceived, applicationMessages, + failedReads, bytesRead, failureRatio); + } + + for (final OutputReport outputReport : report.outputReports()) + { + final long successfulWrites = outputReport.successfulWrites(); + final long failedWrites = outputReport.failedWrites(); + final long heartbeatSent = outputReport.heartbeatSent(); + final long bytesWritten = outputReport.bytesWritten(); + final long successfulBufferReads = outputReport.successfulBufferReads(); + final long failedBufferReads = outputReport.failedBufferReads(); + + final double failureRatio = failedWrites / (double)(successfulWrites + failedWrites); + + stream.format("OUT %s: %,d messages (%,d HBT) - %,d failed - %,d bytes [write failure ratio: %f] {buffer %,d reads %,d failed}\n", + outputReport.name(), successfulWrites, heartbeatSent, failedWrites, bytesWritten, failureRatio, + successfulBufferReads, failedBufferReads); + } + } +} diff --git a/src/org/helios/infra/Heartbeat.java b/src/org/helios/infra/Heartbeat.java deleted file mode 100644 index f4703c4..0000000 --- a/src/org/helios/infra/Heartbeat.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.helios.infra; - -import org.agrona.concurrent.IdleStrategy; -import org.agrona.concurrent.ringbuffer.RingBuffer; - -import java.util.Objects; - -import static org.helios.mmb.HeartbeatMessageFactory.heartbeatBuffer; - -public final class Heartbeat -{ - private static final int MESSAGE_LENGTH = 128; - - public static void writeMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy) - { - Objects.requireNonNull(idleStrategy, "idleStrategy"); - - while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, heartbeatBuffer, 0, MESSAGE_LENGTH)) - { - idleStrategy.idle(0); - } - } -} diff --git a/src/org/helios/infra/InputMessageHandler.java b/src/org/helios/infra/InputMessageHandler.java index 6fdf114..1cd1f28 100644 --- a/src/org/helios/infra/InputMessageHandler.java +++ b/src/org/helios/infra/InputMessageHandler.java @@ -5,29 +5,124 @@ import org.agrona.DirectBuffer; import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.ringbuffer.RingBuffer; +import org.helios.mmb.sbe.HeartbeatDecoder; +import org.helios.mmb.sbe.MessageHeaderDecoder; +import org.helios.mmb.sbe.ShutdownDecoder; +import org.helios.mmb.sbe.StartupDecoder; -public class InputMessageHandler implements FragmentHandler, AutoCloseable +import static org.agrona.UnsafeAccess.UNSAFE; + +class InputMessageHandler implements FragmentHandler { + private static final long HEARTBEAT_RECEIVED_OFFSET; + private static final long ADMINISTRATIVE_MESSAGES_OFFSET; + private static final long APPLICATION_MESSAGES_OFFSET; + private static final long BYTES_READ_OFFSET; + + private volatile long heartbeatReceived = 0; + private volatile long administrativeMessages = 0; + private volatile long applicationMessages = 0; + private volatile long bytesRead = 0; + private final RingBuffer inputRingBuffer; private final IdleStrategy idleStrategy; + private final AssociationHandler associationHandler; + private final MessageHeaderDecoder messageHeaderDecoder; + private final StartupDecoder startupDecoder; + private final ShutdownDecoder shutdownDecoder; - public InputMessageHandler(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy) + InputMessageHandler(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy, + final AssociationHandler associationHandler) { this.inputRingBuffer = inputRingBuffer; this.idleStrategy = idleStrategy; + this.associationHandler = associationHandler; + + messageHeaderDecoder = new MessageHeaderDecoder(); + startupDecoder = new StartupDecoder(); + shutdownDecoder = new ShutdownDecoder(); } @Override public void onFragment(DirectBuffer buffer, int offset, int length, Header header) { - while (!inputRingBuffer.write(MessageTypes.APPLICATION_MSG_ID, buffer, offset, length)) + messageHeaderDecoder.wrap(buffer, offset); + + final int templateId = messageHeaderDecoder.templateId(); + if (templateId == HeartbeatDecoder.TEMPLATE_ID) + { + UNSAFE.putOrderedLong(this, ADMINISTRATIVE_MESSAGES_OFFSET, administrativeMessages + 1); + UNSAFE.putOrderedLong(this, HEARTBEAT_RECEIVED_OFFSET, heartbeatReceived + 1); + } + else if (templateId == StartupDecoder.TEMPLATE_ID && associationHandler != null) + { + UNSAFE.putOrderedLong(this, ADMINISTRATIVE_MESSAGES_OFFSET, administrativeMessages + 1); + + offset += messageHeaderDecoder.encodedLength(); + + startupDecoder.wrap(buffer, offset, messageHeaderDecoder.blockLength(), messageHeaderDecoder.version()); + + // TODO: add component type/identifier to AssociationHandler hooks + + associationHandler.onAssociationEstablished(); + } + else if (templateId == ShutdownDecoder.TEMPLATE_ID && associationHandler != null) + { + UNSAFE.putOrderedLong(this, ADMINISTRATIVE_MESSAGES_OFFSET, administrativeMessages + 1); + + offset += messageHeaderDecoder.encodedLength(); + + shutdownDecoder.wrap(buffer, offset, messageHeaderDecoder.blockLength(), messageHeaderDecoder.version()); + + // TODO: add component type/identifier to AssociationHandler hooks + + associationHandler.onAssociationBroken(); + } + else { - idleStrategy.idle(0); + UNSAFE.putOrderedLong(this, APPLICATION_MESSAGES_OFFSET, applicationMessages + 1); + + while (!inputRingBuffer.write(MessageTypes.APPLICATION_MSG_ID, buffer, offset, length)) + { + idleStrategy.idle(0); + } } + + UNSAFE.putOrderedLong(this, BYTES_READ_OFFSET, bytesRead + length); } - @Override - public void close() throws Exception + long heartbeatReceived() + { + return heartbeatReceived; + } + + long administrativeMessages() + { + return administrativeMessages; + } + + long applicationMessages() + { + return applicationMessages; + } + + long bytesRead() + { + return bytesRead; + } + + static { + try + { + HEARTBEAT_RECEIVED_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("heartbeatReceived")); + ADMINISTRATIVE_MESSAGES_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("administrativeMessages")); + APPLICATION_MESSAGES_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("applicationMessages")); + BYTES_READ_OFFSET = UNSAFE.objectFieldOffset(InputMessageHandler.class.getDeclaredField("bytesRead")); + } + catch (final Exception ex) + { + throw new RuntimeException(ex); + } } } diff --git a/src/org/helios/infra/InputMessageProcessor.java b/src/org/helios/infra/InputMessageProcessor.java index e0e5988..22cd256 100644 --- a/src/org/helios/infra/InputMessageProcessor.java +++ b/src/org/helios/infra/InputMessageProcessor.java @@ -5,15 +5,11 @@ import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.ringbuffer.RingBuffer; import org.helios.AeronStream; -import org.helios.snapshot.Snapshot; - -import java.util.LinkedHashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import org.helios.mmb.SnapshotMessage; import static org.agrona.UnsafeAccess.UNSAFE; -public class InputMessageProcessor implements Processor, AvailableImageHandler, UnavailableImageHandler +public class InputMessageProcessor implements Processor, AvailableImageHandler, UnavailableImageHandler, InputReport { private static final long SUCCESSFUL_READS_OFFSET; private static final long FAILED_READS_OFFSET; @@ -22,68 +18,93 @@ public class InputMessageProcessor implements Processor, AvailableImageHandler, private volatile long failedReads = 0; private final RingBuffer inputRingBuffer; - private final InputMessageHandler handler; - private final Set inputSubscriptionList; + private final Subscription inputSubscription; + private final InputMessageHandler inputMessageHandler; + private final FragmentAssembler dataHandler; + private final SnapshotMessage snapshotMessage; private final int frameCountLimit; private final IdleStrategy idleStrategy; - private final AtomicBoolean running; + private final long maxHeartbeatLiveness; + private long heartbeatLiveness; + private volatile boolean running; + private volatile boolean polling; private final Thread processorThread; - public InputMessageProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy, final int frameCountLimit, final String threadName) + public InputMessageProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy, + final int frameCountLimit, final long maxHeartbeatLiveness, final AeronStream stream, + final AssociationHandler associationHandler, final String threadName) { this.inputRingBuffer = inputRingBuffer; this.idleStrategy = idleStrategy; this.frameCountLimit = frameCountLimit; + this.maxHeartbeatLiveness = maxHeartbeatLiveness; + + inputSubscription = stream.aeron.addSubscription(stream.channel, stream.streamId); + inputMessageHandler = new InputMessageHandler(inputRingBuffer, idleStrategy, associationHandler); + dataHandler = new FragmentAssembler(inputMessageHandler); + snapshotMessage = new SnapshotMessage(); - handler = new InputMessageHandler(inputRingBuffer, idleStrategy); - inputSubscriptionList = new LinkedHashSet<>(); + heartbeatLiveness = maxHeartbeatLiveness; - running = new AtomicBoolean(false); + running = false; + polling = false; processorThread = new Thread(this, threadName); } - public long addSubscription(final AeronStream stream) + @Override + public String name() { - final Subscription inputSubscription = stream.aeron.addSubscription(stream.channel, stream.streamId); - inputSubscriptionList.add(inputSubscription); - - return inputSubscription.registrationId(); + return processorThread.getName(); } @Override public void start() { - running.set(true); + running = true; processorThread.start(); } @Override public void run() { - // First of all, write the Load Data Snapshot message into the input pipeline. - Snapshot.writeLoadMessage(inputRingBuffer, idleStrategy); // TODO: is this the right place? - - // Poll ALL the input subscriptions for incoming data until running. - final FragmentAssembler dataHandler = new FragmentAssembler(handler); + // First of all, write the Load Data SnapshotMessage message into the input pipeline. + snapshotMessage.writeLoadMessage(inputRingBuffer, idleStrategy); // TODO: is this the right place? + // Poll the input subscription for incoming data until running. int idleCount = 0; - while (running.get()) + while (running) { - int fragmentsRead = 0; - for (final Subscription inputSubscription: inputSubscriptionList) + if (polling) { - fragmentsRead += inputSubscription.poll(dataHandler, frameCountLimit); - } - - if (0 == fragmentsRead) - { - UNSAFE.putOrderedLong(this, FAILED_READS_OFFSET, failedReads + 1); - idleStrategy.idle(idleCount++); + final int fragmentsRead = inputSubscription.poll(dataHandler, frameCountLimit); + if (0 == fragmentsRead) + { + // No incoming data from poll + heartbeatLiveness--; + if (heartbeatLiveness == 0) + { + // TODO: notify heartbeat lost + + heartbeatLiveness = maxHeartbeatLiveness; + } + + // Update statistics + UNSAFE.putOrderedLong(this, FAILED_READS_OFFSET, failedReads + 1); + idleStrategy.idle(idleCount++); + } + else + { + // Incoming data arrived from poll (it DOES NOT matter if heartbeat or not) + heartbeatLiveness = maxHeartbeatLiveness; + + // Update statistics + UNSAFE.putOrderedLong(this, SUCCESSFUL_READS_OFFSET, successfulReads + 1); + idleCount = 0; + } } else { - UNSAFE.putOrderedLong(this, SUCCESSFUL_READS_OFFSET, successfulReads + 1); - idleCount = 0; + idleStrategy.idle(idleCount++); } } } @@ -91,35 +112,75 @@ public void run() @Override public void onAvailableImage(final Image image) { - // TODO: when at least one image is present resume subscription polling + // When at least one image is present resume subscription polling + if (inputSubscription.imageCount() > 0) + { + polling = true; + } } @Override public void onUnavailableImage(final Image image) { - // TODO: when no more images are present suspend subscription polling + dataHandler.freeSessionBuffer(image.sessionId()); + + // When no more images are present suspend subscription polling + if (inputSubscription.imageCount() == 0) + { + polling = false; + } } @Override public void close() throws Exception { - running.set(false); + running = false; processorThread.join(); - inputSubscriptionList.forEach(CloseHelper::quietClose); - CloseHelper.quietClose(handler); + CloseHelper.quietClose(inputSubscription); + } + + public long subscriptionId() + { + return inputSubscription.registrationId(); } + @Override public long successfulReads() { return successfulReads; } + @Override public long failedReads() { return failedReads; } + @Override + public long heartbeatReceived() + { + return inputMessageHandler.heartbeatReceived(); + } + + @Override + public long administrativeMessages() + { + return inputMessageHandler.administrativeMessages(); + } + + @Override + public long applicationMessages() + { + return inputMessageHandler.applicationMessages(); + } + + @Override + public long bytesRead() + { + return inputMessageHandler.bytesRead(); + } + static { try diff --git a/src/org/helios/infra/InputReport.java b/src/org/helios/infra/InputReport.java new file mode 100644 index 0000000..4722638 --- /dev/null +++ b/src/org/helios/infra/InputReport.java @@ -0,0 +1,18 @@ +package org.helios.infra; + +public interface InputReport +{ + String name(); + + long successfulReads(); + + long failedReads(); + + long bytesRead(); + + long administrativeMessages(); + + long applicationMessages(); + + long heartbeatReceived(); +} diff --git a/src/org/helios/infra/OutputMessageHandler.java b/src/org/helios/infra/OutputMessageHandler.java index 5d1ac4e..e9355d4 100644 --- a/src/org/helios/infra/OutputMessageHandler.java +++ b/src/org/helios/infra/OutputMessageHandler.java @@ -1,12 +1,15 @@ package org.helios.infra; import io.aeron.Publication; +import io.aeron.logbuffer.BufferClaim; import org.agrona.CloseHelper; import org.agrona.LangUtil; import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.MessageHandler; import org.helios.AeronStream; +import org.helios.mmb.sbe.MessageHeaderDecoder; +import org.helios.mmb.sbe.ShutdownDecoder; import static org.agrona.UnsafeAccess.UNSAFE; @@ -22,26 +25,39 @@ public class OutputMessageHandler implements MessageHandler, AutoCloseable private final Publication outputPublication; private final IdleStrategy idleStrategy; + private final BufferClaim bufferClaim; - public OutputMessageHandler(final AeronStream outputStream, final IdleStrategy idleStrategy) + OutputMessageHandler(final AeronStream outputStream, final IdleStrategy idleStrategy) { this.idleStrategy = idleStrategy; outputPublication = outputStream.aeron.addPublication(outputStream.channel, outputStream.streamId); - } + bufferClaim = new BufferClaim(); + } + private MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder(); @Override public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) { try { - while (outputPublication.offer(buffer, index, length) < 0L) + messageHeaderDecoder.wrap(buffer, index); + if (messageHeaderDecoder.templateId() == ShutdownDecoder.TEMPLATE_ID) + { + System.out.println("ShutdownDecoder.TEMPLATE_ID matched!"); + } + + while (outputPublication.tryClaim(length, bufferClaim) <= 0) { UNSAFE.putOrderedLong(this, FAILED_WRITES_OFFSET, failedWrites + 1); idleStrategy.idle(0); } + final int offset = bufferClaim.offset(); + bufferClaim.buffer().putBytes(offset, buffer, index, length); + bufferClaim.commit(); + UNSAFE.putOrderedLong(this, SUCCESSFUL_WRITES_OFFSET, successfulWrites + 1); UNSAFE.putOrderedLong(this, BYTES_WRITTEN_OFFSET, bytesWritten + length); } @@ -62,17 +78,17 @@ public long outputPublicationId() return this.outputPublication.registrationId(); } - public long successfulWrites() + long successfulWrites() { return successfulWrites; } - public long failedWrites() + long failedWrites() { return failedWrites; } - public long bytesWritten() + long bytesWritten() { return bytesWritten; } diff --git a/src/org/helios/infra/OutputMessageProcessor.java b/src/org/helios/infra/OutputMessageProcessor.java index b3c7760..adb85d9 100644 --- a/src/org/helios/infra/OutputMessageProcessor.java +++ b/src/org/helios/infra/OutputMessageProcessor.java @@ -1,14 +1,149 @@ package org.helios.infra; +import org.agrona.CloseHelper; import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.ringbuffer.RingBuffer; import org.helios.AeronStream; +import org.helios.mmb.HeartbeatMessage; +import org.helios.mmb.StartupMessage; -public class OutputMessageProcessor extends RingBufferProcessor +import static org.agrona.UnsafeAccess.UNSAFE; + +public class OutputMessageProcessor implements Processor, OutputReport { + private static final long SUCCESSFUL_READS_OFFSET; + private static final long FAILED_READS_OFFSET; + private static final long HEARTBEAT_SENT_OFFSET; + + private volatile long successfulBufferReads = 0; + private volatile long failedBufferReads = 0; + private volatile long heartbeatSent = 0; + + private final RingBuffer outputRingBuffer; + private final IdleStrategy idleStrategy; + private final OutputMessageHandler handler; + private final int heartbeatInterval; + private final StartupMessage startupMessage; + private final HeartbeatMessage heartbeatMessage; + private volatile boolean running; + private final Thread processorThread; + public OutputMessageProcessor(final RingBuffer outputRingBuffer, final AeronStream outputStream, - final IdleStrategy idleStrategy, final String threadName) + final IdleStrategy idleStrategy, final int heartbeatInterval, final String threadName) + { + this.outputRingBuffer = outputRingBuffer; + this.idleStrategy = idleStrategy; + this.heartbeatInterval = heartbeatInterval; + + handler = new OutputMessageHandler(outputStream, idleStrategy); + startupMessage = new StartupMessage(outputStream.componentType, outputStream.componentId); + heartbeatMessage = new HeartbeatMessage(outputStream.componentType, outputStream.componentId); + + running = false; + processorThread = new Thread(this, threadName); + } + + @Override + public String name() + { + return processorThread.getName(); + } + + @Override + public void start() + { + running = true; + processorThread.start(); + } + + @Override + public void run() + { + // Send StartupMessage MMB message + startupMessage.write(handler); + + long heartbeatTimeMillis = System.currentTimeMillis() + heartbeatInterval; + + while (running) + { + final int readCount = outputRingBuffer.read(handler); + if (0 == readCount) + { + UNSAFE.putOrderedLong(this, FAILED_READS_OFFSET, failedBufferReads + 1); + + idleStrategy.idle(0); + + if (System.currentTimeMillis() > heartbeatTimeMillis) + { + heartbeatTimeMillis = System.currentTimeMillis() + heartbeatInterval; + heartbeatMessage.write(handler); + + UNSAFE.putOrderedLong(this, HEARTBEAT_SENT_OFFSET, heartbeatSent + 1); + } + } + else + { + UNSAFE.putOrderedLong(this, SUCCESSFUL_READS_OFFSET, successfulBufferReads + 1); + } + } + } + + @Override + public void close() throws Exception + { + running = false; + processorThread.join(); + + CloseHelper.close(handler); + } + + @Override + public long successfulWrites() + { + return handler.successfulWrites(); + } + + @Override + public long failedWrites() + { + return handler.failedWrites(); + } + + @Override + public long bytesWritten() + { + return handler.bytesWritten(); + } + + @Override + public long successfulBufferReads() + { + return successfulBufferReads; + } + + @Override + public long failedBufferReads() + { + return failedBufferReads; + } + + @Override + public long heartbeatSent() + { + return heartbeatSent; + } + + static { - super(outputRingBuffer, new OutputMessageHandler(outputStream, idleStrategy), idleStrategy, threadName); + try + { + SUCCESSFUL_READS_OFFSET = UNSAFE.objectFieldOffset(OutputMessageProcessor.class.getDeclaredField("successfulBufferReads")); + FAILED_READS_OFFSET = UNSAFE.objectFieldOffset(OutputMessageProcessor.class.getDeclaredField("failedBufferReads")); + HEARTBEAT_SENT_OFFSET = UNSAFE.objectFieldOffset(OutputMessageProcessor.class.getDeclaredField("heartbeatSent")); + } + catch (final Exception ex) + { + throw new RuntimeException(ex); + } } } diff --git a/src/org/helios/infra/OutputReport.java b/src/org/helios/infra/OutputReport.java new file mode 100644 index 0000000..a56b7ed --- /dev/null +++ b/src/org/helios/infra/OutputReport.java @@ -0,0 +1,18 @@ +package org.helios.infra; + +public interface OutputReport +{ + String name(); + + long successfulWrites(); + + long failedWrites(); + + long bytesWritten(); + + long successfulBufferReads(); + + long failedBufferReads(); + + long heartbeatSent(); +} diff --git a/src/org/helios/infra/Processor.java b/src/org/helios/infra/Processor.java index 3bef2ec..06a68e2 100644 --- a/src/org/helios/infra/Processor.java +++ b/src/org/helios/infra/Processor.java @@ -2,5 +2,7 @@ public interface Processor extends Runnable, AutoCloseable { + String name(); + void start(); } diff --git a/src/org/helios/infra/RateReport.java b/src/org/helios/infra/RateReport.java deleted file mode 100644 index 50e7417..0000000 --- a/src/org/helios/infra/RateReport.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.helios.infra; - -import java.io.PrintStream; - -public interface RateReport -{ - void print(final PrintStream stream); -} diff --git a/src/org/helios/infra/RateReporter.java b/src/org/helios/infra/RateReporter.java deleted file mode 100644 index 0a45699..0000000 --- a/src/org/helios/infra/RateReporter.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.helios.infra; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; - -public final class RateReporter implements Processor -{ - private final List reportList; - private final AtomicBoolean running; - private final Thread reporterThread; - - public RateReporter() - { - reportList = new ArrayList<>(); - - running = new AtomicBoolean(false); - reporterThread = new Thread(this, "rateReporter"); - } - - public void addAll(final List reports) - { - reportList.addAll(reports); - } - - @Override - public void start() - { - running.set(true); - reporterThread.start(); - } - - @Override - public void run() - { - while (running.get()) - { - LockSupport.parkNanos(1_000_000_000L); // TODO: configure - - reportList.forEach(report -> report.print(System.out)); // TODO: write to SHM, publish via Aeron, log - } - } - - @Override - public void close() throws Exception - { - running.set(false); - reporterThread.join(); - } -} diff --git a/src/org/helios/infra/Report.java b/src/org/helios/infra/Report.java new file mode 100644 index 0000000..0ad6112 --- /dev/null +++ b/src/org/helios/infra/Report.java @@ -0,0 +1,12 @@ +package org.helios.infra; + +import java.util.List; + +public interface Report +{ + String name(); + + List inputReports(); + + List outputReports(); +} diff --git a/src/org/helios/infra/ReportProcessor.java b/src/org/helios/infra/ReportProcessor.java new file mode 100644 index 0000000..fc014ee --- /dev/null +++ b/src/org/helios/infra/ReportProcessor.java @@ -0,0 +1,64 @@ +package org.helios.infra; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.locks.LockSupport; + +public final class ReportProcessor implements Processor +{ + private final long reportInterval; + private final Reporter reportingFunc; + private final List reportList; + private volatile boolean running; + private Thread reporterThread; + + public ReportProcessor(final long reportInterval, final Reporter reportingFunc) + { + Objects.requireNonNull(reportingFunc, "reportingFunc"); + + this.reportInterval = reportInterval; + this.reportingFunc = reportingFunc; + reportList = new ArrayList<>(); + } + + public void add(final Report report) + { + Objects.requireNonNull(report, "report"); + + reportList.add(report); + } + + @Override + public String name() + { + return reporterThread.getName(); + } + + @Override + public void start() + { + running = true; + reporterThread = new Thread(this, "reportProcessor"); + reporterThread.start(); + } + + @Override + public void run() + { + do + { + LockSupport.parkNanos(reportInterval); + + reportList.forEach(reportingFunc::onReport); // TODO: write to SHM, publish via Aeron, log + } + while (running); + } + + @Override + public void close() throws Exception + { + running = false; + reporterThread.join(); + } +} diff --git a/src/org/helios/infra/Reporter.java b/src/org/helios/infra/Reporter.java new file mode 100644 index 0000000..90d3c75 --- /dev/null +++ b/src/org/helios/infra/Reporter.java @@ -0,0 +1,7 @@ +package org.helios.infra; + +@FunctionalInterface +public interface Reporter +{ + void onReport(final Report report); +} diff --git a/src/org/helios/infra/RingBufferProcessor.java b/src/org/helios/infra/RingBufferProcessor.java index dece950..a2b4fb7 100644 --- a/src/org/helios/infra/RingBufferProcessor.java +++ b/src/org/helios/infra/RingBufferProcessor.java @@ -6,8 +6,6 @@ import org.agrona.concurrent.MessageHandler; import org.agrona.concurrent.ringbuffer.RingBuffer; -import java.util.concurrent.atomic.AtomicBoolean; - import static org.agrona.UnsafeAccess.UNSAFE; public class RingBufferProcessor implements Processor, MessageHandler @@ -21,7 +19,7 @@ public class RingBufferProcessor imple private final RingBuffer ringBuffer; private final IdleStrategy idleStrategy; private final T handler; - private final AtomicBoolean running; + private volatile boolean running; private final Thread processorThread; public RingBufferProcessor(final RingBuffer ringBuffer, final T handler, final IdleStrategy idleStrategy, final String threadName) @@ -30,21 +28,27 @@ public RingBufferProcessor(final RingBuffer ringBuffer, final T handler, final I this.handler = handler; this.idleStrategy = idleStrategy; - running = new AtomicBoolean(false); + running = false; processorThread = new Thread(this, threadName); } + @Override + public String name() + { + return processorThread.getName(); + } + @Override public void start() { - running.set(true); + running = true; processorThread.start(); } @Override public void run() { - while (running.get()) + while (running) { final int readCount = ringBuffer.read(this); if (0 == readCount) @@ -68,7 +72,7 @@ public void onMessage(int msgTypeId, final MutableDirectBuffer buffer, int index @Override public void close() throws Exception { - running.set(false); + running = false; processorThread.join(); CloseHelper.close(handler); diff --git a/src/org/helios/journal/JournalHandler.java b/src/org/helios/journal/JournalHandler.java index 2698d4c..8ee2676 100644 --- a/src/org/helios/journal/JournalHandler.java +++ b/src/org/helios/journal/JournalHandler.java @@ -5,7 +5,7 @@ import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.MessageHandler; import org.agrona.concurrent.ringbuffer.RingBuffer; -import org.helios.snapshot.Snapshot; +import org.helios.mmb.SnapshotMessage; import java.util.Objects; @@ -14,12 +14,15 @@ public class JournalHandler implements MessageHandler, JournalDepletionHandler, private final JournalWriter writer; private final RingBuffer nextRingBuffer; private final IdleStrategy idleStrategy; + private final SnapshotMessage snapshotMessage; public JournalHandler(final JournalWriter writer, final RingBuffer nextRingBuffer, final IdleStrategy idleStrategy) { this.writer = writer.depletionHandler(this); this.nextRingBuffer = Objects.requireNonNull(nextRingBuffer); this.idleStrategy = Objects.requireNonNull(idleStrategy); + + snapshotMessage = new SnapshotMessage(); } @Override @@ -41,7 +44,7 @@ public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int @Override public void onJournalDepletion(Journalling journalling) { - Snapshot.writeSaveMessage(nextRingBuffer, idleStrategy); + snapshotMessage.writeSaveMessage(nextRingBuffer, idleStrategy); } @Override diff --git a/src/org/helios/journal/JournalProcessor.java b/src/org/helios/journal/JournalProcessor.java index facf8ec..6bcf87a 100644 --- a/src/org/helios/journal/JournalProcessor.java +++ b/src/org/helios/journal/JournalProcessor.java @@ -25,6 +25,12 @@ public JournalProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idl journallerThread = new Thread(this, "journalProcessor"); } + @Override + public String name() + { + return journallerThread.getName(); + } + @Override public void start() { diff --git a/src/org/helios/mmb/DataMessage.java b/src/org/helios/mmb/DataMessage.java new file mode 100644 index 0000000..45cd7a3 --- /dev/null +++ b/src/org/helios/mmb/DataMessage.java @@ -0,0 +1,88 @@ +package org.helios.mmb; + +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.UnsafeBuffer; +import org.helios.mmb.sbe.*; +import org.helios.util.Check; +import org.helios.util.DirectBufferAllocator; + +public final class DataMessage +{ + private static final int MESSAGE_LENGTH = 152; + + private final UnsafeBuffer dataBuffer; + private int dataBufferOffset; + private final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder(); + private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder(); + private final DataEncoder dataEncoder = new DataEncoder(); + private final DataDecoder dataDecoder = new DataDecoder(); + + public DataMessage() + { + dataBuffer = new UnsafeBuffer(0, 0); + dataBufferOffset = 0; + } + + public DataEncoder allocate(final ComponentType componentType, final short componentId, final int dataLength) + { + Check.enforce(dataLength >= 0, "Negative dataBufferLength"); + + dataBuffer.wrap(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH + dataLength)); + + int bufferOffset = 0; + + messageHeaderEncoder.wrap(dataBuffer, bufferOffset) + .blockLength(dataEncoder.sbeBlockLength()) + .templateId(dataEncoder.sbeTemplateId()) + .schemaId(dataEncoder.sbeSchemaId()) + .version(dataEncoder.sbeSchemaVersion()); + + bufferOffset += messageHeaderEncoder.encodedLength(); + + dataEncoder.wrap(dataBuffer, bufferOffset) + .mmbHeader() + .nodeId((short)0) + .component() + .componentId(componentId) + .componentType(componentType); + + dataBufferOffset = bufferOffset; + + return dataEncoder; + } + + public DataDecoder wrap(final MutableDirectBuffer buffer, final int offset, final int length) + { + dataBuffer.wrap(buffer, offset, length); + + int bufferOffset = 0; + + messageHeaderDecoder.wrap(dataBuffer, bufferOffset); + + final int actingBlockLength = messageHeaderDecoder.blockLength(); + final int actingVersion = messageHeaderDecoder.version(); + + bufferOffset += messageHeaderDecoder.encodedLength(); + + dataDecoder.wrap(dataBuffer, bufferOffset, actingBlockLength, actingVersion); + + dataBufferOffset = bufferOffset; + + return dataDecoder; + } + + public UnsafeBuffer dataBuffer() + { + return dataBuffer; + } + + public int dataBufferOffset() + { + return dataBufferOffset; + } + + public int dataBufferLength() + { + return dataBuffer.capacity() - MESSAGE_LENGTH; + } +} diff --git a/src/org/helios/mmb/HeartbeatMessage.java b/src/org/helios/mmb/HeartbeatMessage.java new file mode 100644 index 0000000..a4d06f2 --- /dev/null +++ b/src/org/helios/mmb/HeartbeatMessage.java @@ -0,0 +1,55 @@ +package org.helios.mmb; + +import org.agrona.concurrent.UnsafeBuffer; +import org.helios.infra.MessageTypes; +import org.helios.infra.OutputMessageHandler; +import org.helios.mmb.sbe.ComponentType; +import org.helios.mmb.sbe.HeartbeatEncoder; +import org.helios.mmb.sbe.MessageHeaderEncoder; +import org.helios.util.DirectBufferAllocator; + +public final class HeartbeatMessage +{ + private static final int MESSAGE_LENGTH = 152; + + private final UnsafeBuffer heartbeatBuffer; + private final int bufferOffset; + private final HeartbeatEncoder heartbeatEncoder; + + public HeartbeatMessage(final ComponentType componentType, final short componentId) + { + heartbeatBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH)); + heartbeatEncoder = new HeartbeatEncoder(); + + final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder(); + + // Encode the HeartbeatMessage message once and forever + int bufferOffset = 0; + + messageHeaderEncoder.wrap(heartbeatBuffer, bufferOffset) + .blockLength(heartbeatEncoder.sbeBlockLength()) + .templateId(heartbeatEncoder.sbeTemplateId()) + .schemaId(heartbeatEncoder.sbeSchemaId()) + .version(heartbeatEncoder.sbeSchemaVersion()); + + bufferOffset += messageHeaderEncoder.encodedLength(); + + heartbeatEncoder.wrap(heartbeatBuffer, bufferOffset) + .mmbHeader() + .nodeId((short)0) + .component() + .componentId(componentId) + .componentType(componentType); + + this.bufferOffset = bufferOffset; + } + + public void write(final OutputMessageHandler outputMessageHandler) + { + heartbeatEncoder.wrap(heartbeatBuffer, bufferOffset) + .mmbHeader() + .timestamp(System.nanoTime()); + + outputMessageHandler.onMessage(MessageTypes.ADMINISTRATIVE_MSG_ID, heartbeatBuffer, 0, MESSAGE_LENGTH); + } +} diff --git a/src/org/helios/mmb/HeartbeatMessageFactory.java b/src/org/helios/mmb/HeartbeatMessageFactory.java deleted file mode 100644 index fb98600..0000000 --- a/src/org/helios/mmb/HeartbeatMessageFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.helios.mmb; - -import org.agrona.concurrent.UnsafeBuffer; -import org.helios.mmb.sbe.HeartbeatEncoder; -import org.helios.mmb.sbe.MessageHeaderEncoder; -import org.helios.util.DirectBufferAllocator; - -public final class HeartbeatMessageFactory -{ - public static final int MESSAGE_LENGTH = 128; - - public static final UnsafeBuffer heartbeatBuffer; - - static - { - heartbeatBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH)); - - final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder(); - - // Encode the Heartbeat message once and forever - final HeartbeatEncoder heartbeatEncoder = new HeartbeatEncoder(); - int bufferOffset = 0; - - messageHeaderEncoder.wrap(heartbeatBuffer, bufferOffset) - .blockLength(heartbeatEncoder.sbeBlockLength()) - .templateId(heartbeatEncoder.sbeTemplateId()) - .schemaId(heartbeatEncoder.sbeSchemaId()) - .version(heartbeatEncoder.sbeSchemaVersion()); - - bufferOffset += messageHeaderEncoder.encodedLength(); - - heartbeatEncoder.wrap(heartbeatBuffer, bufferOffset) - .mmbHeader() - //.messageNumber(2L) - .nodeId((short)0) - .timestamp(System.nanoTime()); - } -} diff --git a/src/org/helios/mmb/SnapshotMessage.java b/src/org/helios/mmb/SnapshotMessage.java new file mode 100644 index 0000000..344baee --- /dev/null +++ b/src/org/helios/mmb/SnapshotMessage.java @@ -0,0 +1,98 @@ +package org.helios.mmb; + +import org.agrona.concurrent.IdleStrategy; +import org.agrona.concurrent.UnsafeBuffer; +import org.agrona.concurrent.ringbuffer.RingBuffer; +import org.helios.infra.MessageTypes; +import org.helios.mmb.sbe.LoadSnapshotEncoder; +import org.helios.mmb.sbe.MessageHeaderEncoder; +import org.helios.mmb.sbe.SaveSnapshotEncoder; +import org.helios.util.DirectBufferAllocator; + +import java.util.Objects; + +public final class SnapshotMessage +{ + private static final int MESSAGE_LENGTH = 152; + + private final UnsafeBuffer loadSnapshotBuffer; + private final UnsafeBuffer saveSnapshotBuffer; + private final LoadSnapshotEncoder loadSnapshotEncoder = new LoadSnapshotEncoder(); + private final SaveSnapshotEncoder saveSnapshotEncoder = new SaveSnapshotEncoder(); + private final int loadBufferOffset; + private final int saveBufferOffset; + + public SnapshotMessage() + { + loadSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH)); + saveSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH)); + + final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder(); + + // Encode the Load Data SnapshotMessage message once and forever + int bufferOffset = 0; + + messageHeaderEncoder.wrap(loadSnapshotBuffer, bufferOffset) + .blockLength(loadSnapshotEncoder.sbeBlockLength()) + .templateId(loadSnapshotEncoder.sbeTemplateId()) + .schemaId(loadSnapshotEncoder.sbeSchemaId()) + .version(loadSnapshotEncoder.sbeSchemaVersion()); + + bufferOffset += messageHeaderEncoder.encodedLength(); + + loadSnapshotEncoder.wrap(loadSnapshotBuffer, bufferOffset) + .mmbHeader() + .messageNumber(0L) + .nodeId((short)0) + .timestamp(System.nanoTime()); + + loadBufferOffset = bufferOffset; + + // Encode the Save Data SnapshotMessage message once and forever + bufferOffset = 0; + + messageHeaderEncoder.wrap(saveSnapshotBuffer, bufferOffset) + .blockLength(saveSnapshotEncoder.sbeBlockLength()) + .templateId(saveSnapshotEncoder.sbeTemplateId()) + .schemaId(saveSnapshotEncoder.sbeSchemaId()) + .version(saveSnapshotEncoder.sbeSchemaVersion()); + + bufferOffset += messageHeaderEncoder.encodedLength(); + + saveSnapshotEncoder.wrap(saveSnapshotBuffer, bufferOffset) + .mmbHeader() + .messageNumber(0L) + .nodeId((short)0) + .timestamp(System.nanoTime()); + + saveBufferOffset = bufferOffset; + } + + public void writeLoadMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy) + { + Objects.requireNonNull(idleStrategy, "idleStrategy"); + + loadSnapshotEncoder.wrap(loadSnapshotBuffer, loadBufferOffset) + .mmbHeader() + .timestamp(System.nanoTime()); + + while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, loadSnapshotBuffer, 0, MESSAGE_LENGTH)) + { + idleStrategy.idle(0); + } + } + + public void writeSaveMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy) + { + Objects.requireNonNull(idleStrategy, "idleStrategy"); + + saveSnapshotEncoder.wrap(saveSnapshotBuffer, saveBufferOffset) + .mmbHeader() + .timestamp(System.nanoTime()); + + while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, saveSnapshotBuffer, 0, MESSAGE_LENGTH)) + { + idleStrategy.idle(0); + } + } +} diff --git a/src/org/helios/mmb/SnapshotMessageFactory.java b/src/org/helios/mmb/SnapshotMessageFactory.java deleted file mode 100644 index 95d5a14..0000000 --- a/src/org/helios/mmb/SnapshotMessageFactory.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.helios.mmb; - -import org.agrona.concurrent.UnsafeBuffer; -import org.helios.mmb.sbe.LoadSnapshotEncoder; -import org.helios.mmb.sbe.MessageHeaderEncoder; -import org.helios.mmb.sbe.SaveSnapshotEncoder; -import org.helios.util.DirectBufferAllocator; - -public final class SnapshotMessageFactory -{ - public static final int MESSAGE_LENGTH = 128; - - public static final UnsafeBuffer saveSnapshotBuffer; - public static final UnsafeBuffer loadSnapshotBuffer; - - static - { - saveSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH)); - loadSnapshotBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH)); - - final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder(); - - // Encode the Load Data Snapshot message once and forever - final LoadSnapshotEncoder loadSnapshotEncoder = new LoadSnapshotEncoder(); - int bufferOffset = 0; - - messageHeaderEncoder.wrap(loadSnapshotBuffer, bufferOffset) - .blockLength(loadSnapshotEncoder.sbeBlockLength()) - .templateId(loadSnapshotEncoder.sbeTemplateId()) - .schemaId(loadSnapshotEncoder.sbeSchemaId()) - .version(loadSnapshotEncoder.sbeSchemaVersion()); - - bufferOffset += messageHeaderEncoder.encodedLength(); - - loadSnapshotEncoder.wrap(loadSnapshotBuffer, bufferOffset) - .mmbHeader() - .messageNumber(0L) - .nodeId((short)0) - .timestamp(System.nanoTime()); - - // Encode the Save Data Snapshot message once and forever - final SaveSnapshotEncoder saveSnapshotEncoder = new SaveSnapshotEncoder(); - bufferOffset = 0; - - messageHeaderEncoder.wrap(saveSnapshotBuffer, bufferOffset) - .blockLength(saveSnapshotEncoder.sbeBlockLength()) - .templateId(saveSnapshotEncoder.sbeTemplateId()) - .schemaId(saveSnapshotEncoder.sbeSchemaId()) - .version(saveSnapshotEncoder.sbeSchemaVersion()); - - bufferOffset += messageHeaderEncoder.encodedLength(); - - saveSnapshotEncoder.wrap(saveSnapshotBuffer, bufferOffset) - .mmbHeader() - .messageNumber(0L) - .nodeId((short)0) - .timestamp(System.nanoTime()); - } -} diff --git a/src/org/helios/mmb/StartupMessage.java b/src/org/helios/mmb/StartupMessage.java new file mode 100644 index 0000000..77fe9bf --- /dev/null +++ b/src/org/helios/mmb/StartupMessage.java @@ -0,0 +1,48 @@ +package org.helios.mmb; + +import org.agrona.concurrent.UnsafeBuffer; +import org.helios.infra.MessageTypes; +import org.helios.infra.OutputMessageHandler; +import org.helios.mmb.sbe.ComponentType; +import org.helios.mmb.sbe.MessageHeaderEncoder; +import org.helios.mmb.sbe.StartupEncoder; +import org.helios.util.DirectBufferAllocator; + +public final class StartupMessage +{ + private static final int MESSAGE_LENGTH = 152; + + private final UnsafeBuffer startupBuffer; + + public StartupMessage(final ComponentType componentType, final short componentId) + { + startupBuffer = new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(MESSAGE_LENGTH)); + + final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder(); + final StartupEncoder startupEncoder = new StartupEncoder(); + + // Encode the StartupMessage message once and forever + int bufferOffset = 0; + + messageHeaderEncoder.wrap(startupBuffer, bufferOffset) + .blockLength(startupEncoder.sbeBlockLength()) + .templateId(startupEncoder.sbeTemplateId()) + .schemaId(startupEncoder.sbeSchemaId()) + .version(startupEncoder.sbeSchemaVersion()); + + bufferOffset += messageHeaderEncoder.encodedLength(); + + startupEncoder.wrap(startupBuffer, bufferOffset) + .mmbHeader() + .nodeId((short)0) + .timestamp(System.nanoTime()) + .component() + .componentId(componentId) + .componentType(componentType); + } + + public void write(final OutputMessageHandler outputMessageHandler) + { + outputMessageHandler.onMessage(MessageTypes.ADMINISTRATIVE_MSG_ID, startupBuffer, 0, MESSAGE_LENGTH); + } +} diff --git a/src/org/helios/replica/ReplicaProcessor.java b/src/org/helios/replica/ReplicaProcessor.java index 3557f09..29c8841 100644 --- a/src/org/helios/replica/ReplicaProcessor.java +++ b/src/org/helios/replica/ReplicaProcessor.java @@ -24,6 +24,12 @@ public ReplicaProcessor(final RingBuffer inputRingBuffer, final IdleStrategy idl replicatorThread = new Thread(this, "replicator"); } + @Override + public String name() + { + return replicatorThread.getName(); + } + @Override public void start() { diff --git a/src/org/helios/service/Service.java b/src/org/helios/service/Service.java index b3916d8..6e05f4a 100644 --- a/src/org/helios/service/Service.java +++ b/src/org/helios/service/Service.java @@ -2,14 +2,12 @@ import org.helios.AeronStream; import org.helios.infra.AvailableAssociationHandler; -import org.helios.infra.RateReport; +import org.helios.infra.Report; import org.helios.infra.UnavailableAssociationHandler; -import java.util.List; - public interface Service extends AutoCloseable { - Service addEndPoint(final AeronStream reqStream, final AeronStream rspStream); + Service addEndPoint(final AeronStream rspStream); Service addEventChannel(final AeronStream eventStream); @@ -17,7 +15,7 @@ public interface Service extends AutoCloseable Service unavailableAssociationHandler(final UnavailableAssociationHandler handler); - List reportList(); + Report report(); T handler(); diff --git a/src/org/helios/service/ServiceReport.java b/src/org/helios/service/ServiceReport.java index b4e2a6f..58edbf8 100644 --- a/src/org/helios/service/ServiceReport.java +++ b/src/org/helios/service/ServiceReport.java @@ -1,55 +1,49 @@ package org.helios.service; -import org.helios.infra.InputMessageProcessor; -import org.helios.infra.OutputMessageProcessor; -import org.helios.infra.RateReport; +import org.helios.infra.*; import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; -public final class ServiceReport implements RateReport +public final class ServiceReport implements Report { - private final InputMessageProcessor requestProcessor; - private final OutputMessageProcessor responseProcessor; + private final List inputReportList; + private final List outputReportList; - private long lastTimeStamp; - private long lastSuccessfulReads; - private long lastSuccessfulWrites; - private long lastBytesWritten; - - public ServiceReport(final InputMessageProcessor requestProcessor, final OutputMessageProcessor responseProcessor) + public ServiceReport(final InputMessageProcessor requestProcessor) { Objects.requireNonNull(requestProcessor, "requestProcessor"); + + inputReportList = new ArrayList<>(); + outputReportList = new ArrayList<>(); + + inputReportList.add(requestProcessor); + } + + public void addResponseProcessor(final OutputMessageProcessor responseProcessor) + { Objects.requireNonNull(responseProcessor, "responseProcessor"); - this.requestProcessor = requestProcessor; - this.responseProcessor = responseProcessor; + outputReportList.add(responseProcessor); + } - lastTimeStamp = System.currentTimeMillis(); + @Override + public String name() + { + return "ServiceReport"; + } + + @Override + public List inputReports() + { + return inputReportList; } @Override - public void print(final PrintStream stream) + public List outputReports() { - final long timeStamp = System.currentTimeMillis(); - long successfulReads = requestProcessor.successfulReads(); - long successfulWrites = responseProcessor.handler().successfulWrites(); - long bytesWritten = responseProcessor.handler().bytesWritten(); - long failedReads = requestProcessor.failedReads(); - - final long duration = timeStamp - lastTimeStamp; - final long successfulReadsDelta = successfulReads - lastSuccessfulReads; - final long successfulWritesDelta = successfulWrites - lastSuccessfulWrites; - final long bytesTransferred = bytesWritten - lastBytesWritten; - - final double failureRatio = failedReads / (double)(successfulReads + failedReads); - - stream.format("ServiceReport: T %dms IN %,d messages - OUT %,d messages - %,d bytes [read failure ratio: %f]\n", - duration, successfulReadsDelta, successfulWritesDelta, bytesTransferred, failureRatio); - - lastTimeStamp = timeStamp; - lastSuccessfulReads = successfulReads; - lastSuccessfulWrites = successfulWrites; - lastBytesWritten = bytesWritten; + return outputReportList; } } diff --git a/src/org/helios/snapshot/Snapshot.java b/src/org/helios/snapshot/Snapshot.java deleted file mode 100644 index 81da6bb..0000000 --- a/src/org/helios/snapshot/Snapshot.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.helios.snapshot; - -import org.agrona.concurrent.IdleStrategy; -import org.agrona.concurrent.ringbuffer.RingBuffer; -import org.helios.infra.MessageTypes; - -import java.util.Objects; - -import static org.helios.mmb.SnapshotMessageFactory.*; - -public final class Snapshot -{ - public static void writeLoadMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy) - { - Objects.requireNonNull(idleStrategy, "idleStrategy"); - - while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, loadSnapshotBuffer, 0, MESSAGE_LENGTH)) - { - idleStrategy.idle(0); - } - } - - public static void writeSaveMessage(final RingBuffer inputRingBuffer, final IdleStrategy idleStrategy) - { - Objects.requireNonNull(idleStrategy, "idleStrategy"); - - while (!inputRingBuffer.write(MessageTypes.ADMINISTRATIVE_MSG_ID, saveSnapshotBuffer, 0, MESSAGE_LENGTH)) - { - idleStrategy.idle(0); - } - } -} diff --git a/src/org/helios/snapshot/SnapshotTimer.java b/src/org/helios/snapshot/SnapshotTimer.java index afa4ac8..ad640ca 100644 --- a/src/org/helios/snapshot/SnapshotTimer.java +++ b/src/org/helios/snapshot/SnapshotTimer.java @@ -4,6 +4,7 @@ import org.agrona.concurrent.BusySpinIdleStrategy; import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.ringbuffer.RingBuffer; +import org.helios.mmb.SnapshotMessage; import org.helios.util.Check; import java.util.Objects; @@ -14,6 +15,7 @@ public final class SnapshotTimer implements Runnable, AutoCloseable public static final long MAX_SNAPSHOT_PERIOD = TimeUnit.HOURS.toMillis(24); public static final long DEFAULT_SNAPSHOT_PERIOD = TimeUnit.HOURS.toMillis(24); + private final SnapshotMessage snapshotMessage; private final TimerWheel timerWheel; private final RingBuffer inputRingBuffer; private final long snapshotPeriod; @@ -36,6 +38,8 @@ public SnapshotTimer(final TimerWheel timerWheel, final RingBuffer inputRingBuff this.timerWheel = timerWheel; this.inputRingBuffer = inputRingBuffer; this.snapshotPeriod = snapshotPeriod; + + snapshotMessage = new SnapshotMessage(); } public void start() @@ -47,8 +51,8 @@ public void start() @Override public void run() { - // Write the Save Data Snapshot message into the input pipeline. - Snapshot.writeSaveMessage(inputRingBuffer, idleStrategy); + // Write the Save Data SnapshotMessage message into the input pipeline. + snapshotMessage.writeSaveMessage(inputRingBuffer, idleStrategy); // Schedule the next data snapshot after periodic time interval. timerWheel.rescheduleTimeout(snapshotPeriod, TimeUnit.MILLISECONDS, timer); diff --git a/src/org/helios/status/StatusCounterDescriptor.java b/src/org/helios/status/StatusCounterDescriptor.java new file mode 100644 index 0000000..3ccf9d9 --- /dev/null +++ b/src/org/helios/status/StatusCounterDescriptor.java @@ -0,0 +1,58 @@ +package org.helios.status; + +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersManager; + +public enum StatusCounterDescriptor +{ + BYTES_SENT(0, "Bytes sent"), + BYTES_RECEIVED(1, "Bytes received"), + HEARTBEATS_SENT(2, "Heartbeats sent"), + HEARTBEATS_RECEIVED(3, "Heartbeats received"), + ERRORS(4, "Errors"); + + public static final int SYSTEM_COUNTER_TYPE_ID = 0; + + private static final Int2ObjectHashMap DESCRIPTOR_BY_ID_MAP = new Int2ObjectHashMap<>(); + + static + { + for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values()) + { + if (null != DESCRIPTOR_BY_ID_MAP.put(descriptor.id, descriptor)) + { + throw new IllegalStateException("Descriptor id already in use: " + descriptor.id); + } + } + } + + public static StatusCounterDescriptor get(final int id) + { + return DESCRIPTOR_BY_ID_MAP.get(id); + } + + private final int id; + private final String label; + + StatusCounterDescriptor(final int id, final String label) + { + this.id = id; + this.label = label; + } + + public int id() + { + return id; + } + + public String label() + { + return label; + } + + public AtomicCounter newCounter(final CountersManager countersManager) + { + return countersManager.newCounter(label, SYSTEM_COUNTER_TYPE_ID, (buffer) -> buffer.putInt(0, id)); + } +} diff --git a/src/org/helios/status/StatusCounters.java b/src/org/helios/status/StatusCounters.java new file mode 100644 index 0000000..814d062 --- /dev/null +++ b/src/org/helios/status/StatusCounters.java @@ -0,0 +1,30 @@ +package org.helios.status; + +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersManager; + +import java.util.EnumMap; + +public class StatusCounters implements AutoCloseable +{ + private final EnumMap counterByDescriptorMap = + new EnumMap<>(StatusCounterDescriptor.class); + + public StatusCounters(final CountersManager countersManager) + { + for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values()) + { + counterByDescriptorMap.put(descriptor, descriptor.newCounter(countersManager)); + } + } + + public AtomicCounter get(final StatusCounterDescriptor descriptor) + { + return counterByDescriptorMap.get(descriptor); + } + + public void close() + { + counterByDescriptorMap.values().forEach(AtomicCounter::close); + } +} diff --git a/test-perf/echo/EchoConfiguration.java b/test-perf/echo/EchoConfiguration.java index eba38fd..302271e 100644 --- a/test-perf/echo/EchoConfiguration.java +++ b/test-perf/echo/EchoConfiguration.java @@ -58,16 +58,16 @@ public class EchoConfiguration SERVICE_INPUT_RING_SIZE = getInteger(SERVICE_INPUT_RING_SIZE_PROP, 512 * 1024); SERVICE_OUTPUT_RING_SIZE = getInteger(SERVICE_OUTPUT_RING_SIZE_PROP, 512 * 1024); - SERVICE_INPUT_CHANNEL = getProperty(SERVICE_INPUT_CHANNEL_PROP, "udp://localhost:40123"); + SERVICE_INPUT_CHANNEL = getProperty(SERVICE_INPUT_CHANNEL_PROP, "aeron:udp?endpoint=localhost:40123"); SERVICE_INPUT_STREAM_ID = getInteger(SERVICE_INPUT_STREAM_ID_PROP, 10); - SERVICE_OUTPUT_CHANNEL = getProperty(SERVICE_OUTPUT_CHANNEL_PROP, "udp://localhost:40124"); + SERVICE_OUTPUT_CHANNEL = getProperty(SERVICE_OUTPUT_CHANNEL_PROP, "aeron:udp?endpoint=localhost:40124"); SERVICE_OUTPUT_STREAM_ID = getInteger(SERVICE_OUTPUT_STREAM_ID_PROP, 11); MESSAGE_LENGTH = getInteger(MESSAGE_LENGTH_PROP, 256); - NUMBER_OF_MESSAGES = getInteger(NUMBER_OF_MESSAGES_PROP, 1_000_000/*1_000_000*/); - NUMBER_OF_ITERATIONS = getInteger(NUMBER_OF_ITERATIONS_PROP, 10/*5*/); - WARMUP_NUMBER_OF_MESSAGES = getInteger(WARMUP_NUMBER_OF_MESSAGES_PROP, 10_000/*10_000*/); - WARMUP_NUMBER_OF_ITERATIONS = getInteger(WARMUP_NUMBER_OF_ITERATIONS_PROP, 1/*5*/); + NUMBER_OF_MESSAGES = getInteger(NUMBER_OF_MESSAGES_PROP, 1_000_000/*1_000_000*/);//10 + NUMBER_OF_ITERATIONS = getInteger(NUMBER_OF_ITERATIONS_PROP, 10/*10*/);//1 + WARMUP_NUMBER_OF_MESSAGES = getInteger(WARMUP_NUMBER_OF_MESSAGES_PROP, 10_000/*10_000*/);//1 + WARMUP_NUMBER_OF_ITERATIONS = getInteger(WARMUP_NUMBER_OF_ITERATIONS_PROP, 5/*5*/); LINGER_TIMEOUT_MS = getLong(LINGER_TIMEOUT_MS_PROP, TimeUnit.SECONDS.toMillis(5)); } } diff --git a/test-perf/echo/EchoEmbedded.java b/test-perf/echo/EchoEmbedded.java index d80412f..840da61 100644 --- a/test-perf/echo/EchoEmbedded.java +++ b/test-perf/echo/EchoEmbedded.java @@ -6,13 +6,21 @@ import org.helios.HeliosContext; import org.helios.HeliosDriver; import org.helios.gateway.Gateway; +import org.helios.infra.ConsoleReporter; +import org.helios.service.Service; import java.util.concurrent.CountDownLatch; public class EchoEmbedded { + private static final String SERVICE_INPUT_CHANNEL = EchoConfiguration.SERVICE_INPUT_CHANNEL; + private static final String SERVICE_OUTPUT_CHANNEL = EchoConfiguration.SERVICE_OUTPUT_CHANNEL; + private static final String GATEWAY_INPUT_CHANNEL = EchoConfiguration.SERVICE_OUTPUT_CHANNEL; + private static final String GATEWAY_OUTPUT_CHANNEL = EchoConfiguration.SERVICE_INPUT_CHANNEL; private static final int SERVICE_INPUT_STREAM_ID = EchoConfiguration.SERVICE_INPUT_STREAM_ID; private static final int SERVICE_OUTPUT_STREAM_ID = EchoConfiguration.SERVICE_OUTPUT_STREAM_ID; + private static final int GATEWAY_INPUT_STREAM_ID = EchoConfiguration.SERVICE_OUTPUT_STREAM_ID; + private static final int GATEWAY_OUTPUT_STREAM_ID = EchoConfiguration.SERVICE_INPUT_STREAM_ID; private static final CountDownLatch GW_ASSOCIATION_LATCH = new CountDownLatch(1); private static final CountDownLatch SVC_ASSOCIATION_LATCH = new CountDownLatch(1); @@ -36,17 +44,21 @@ public static void main(String[] args) throws Exception System.out.print("done\nCreating Helios service..."); - final AeronStream svcEmbeddedInputStream = helios.newEmbeddedStream(SERVICE_INPUT_STREAM_ID); - final AeronStream svcEmbeddedOutputStream = helios.newEmbeddedStream(SERVICE_OUTPUT_STREAM_ID); - helios.addService(EchoServiceHandler::new, + final AeronStream svcInputStream = helios.newStream(SERVICE_INPUT_CHANNEL, SERVICE_INPUT_STREAM_ID); + final AeronStream svcOutputStream = helios.newStream(SERVICE_OUTPUT_CHANNEL, SERVICE_OUTPUT_STREAM_ID); + final Service svc = helios.addService(EchoServiceHandler::new, EchoEmbedded::associationWithGatewayEstablished, EchoEmbedded::associationWithGatewayBroken, - svcEmbeddedInputStream, svcEmbeddedOutputStream); + svcInputStream, svcOutputStream); System.out.print("done\nCreating Helios gateway..."); - final Gateway gw = helios.addGateway(EchoGatewayHandler::new, - EchoEmbedded::associationWithServiceEstablished, EchoEmbedded::associationWithServiceBroken, - svcEmbeddedInputStream, svcEmbeddedOutputStream); + final AeronStream gwInputStream = helios.newStream(GATEWAY_INPUT_CHANNEL, GATEWAY_INPUT_STREAM_ID); + final AeronStream gwOutputStream = helios.newStream(GATEWAY_OUTPUT_CHANNEL, GATEWAY_OUTPUT_STREAM_ID); + final Gateway gw = helios.addGateway( + EchoEmbedded::associationWithServiceEstablished, EchoEmbedded::associationWithServiceBroken); + final EchoGatewayHandler gwHandler = gw.addEndPoint(gwOutputStream, gwInputStream, EchoGatewayHandler::new); + + final ConsoleReporter reporter = new ConsoleReporter(); System.out.println("done\nEchoEmbedded is now running."); @@ -62,7 +74,10 @@ public static void main(String[] args) throws Exception System.out.println("done"); - EchoGateway.runTest(gw); + EchoGateway.runTest(gwHandler, () -> reporter.onReport(gw.report())); + + reporter.onReport(gw.report()); + reporter.onReport(svc.report()); } System.out.println("EchoEmbedded is now terminated."); diff --git a/test-perf/echo/EchoEmbeddedIpc.java b/test-perf/echo/EchoEmbeddedIpc.java new file mode 100644 index 0000000..523c8c6 --- /dev/null +++ b/test-perf/echo/EchoEmbeddedIpc.java @@ -0,0 +1,103 @@ +package echo; + +import org.agrona.concurrent.BusySpinIdleStrategy; +import org.helios.AeronStream; +import org.helios.Helios; +import org.helios.HeliosContext; +import org.helios.HeliosDriver; +import org.helios.gateway.Gateway; +import org.helios.infra.ConsoleReporter; +import org.helios.service.Service; + +import java.util.concurrent.CountDownLatch; + +public class EchoEmbeddedIpc +{ + private static final int SERVICE_INPUT_STREAM_ID = EchoConfiguration.SERVICE_INPUT_STREAM_ID; + private static final int SERVICE_OUTPUT_STREAM_ID = EchoConfiguration.SERVICE_OUTPUT_STREAM_ID; + + private static final CountDownLatch GW_ASSOCIATION_LATCH = new CountDownLatch(1); + private static final CountDownLatch SVC_ASSOCIATION_LATCH = new CountDownLatch(1); + + public static void main(String[] args) throws Exception + { + System.out.print("Starting Helios..."); + + final HeliosContext context = new HeliosContext() + //.setJournalEnabled(true) + .setReadIdleStrategy(new BusySpinIdleStrategy()) + .setWriteIdleStrategy(new BusySpinIdleStrategy()) + .setSubscriberIdleStrategy(new BusySpinIdleStrategy()) + .setPublisherIdleStrategy(new BusySpinIdleStrategy()); + + final HeliosDriver driver = new HeliosDriver(context); + + try(final Helios helios = new Helios(context, driver)) + { + helios.errorHandler(EchoEmbeddedIpc::serviceError); + + System.out.print("done\nCreating Helios service..."); + + final AeronStream ipcInputStream = helios.newIpcStream(SERVICE_INPUT_STREAM_ID); + final AeronStream ipcOutputStream = helios.newIpcStream(SERVICE_OUTPUT_STREAM_ID); + final Service svc = helios.addService(EchoServiceHandler::new, + EchoEmbeddedIpc::associationWithGatewayEstablished, EchoEmbeddedIpc::associationWithGatewayBroken, + ipcInputStream, ipcOutputStream); + + System.out.print("done\nCreating Helios gateway..."); + + final Gateway gw = helios.addGateway( + EchoEmbeddedIpc::associationWithServiceEstablished, EchoEmbeddedIpc::associationWithServiceBroken); + final EchoGatewayHandler gwHandler = gw.addEndPoint(ipcInputStream, ipcOutputStream, EchoGatewayHandler::new); + + final ConsoleReporter reporter = new ConsoleReporter(); + + System.out.println("done\nEchoEmbedded is now running."); + + helios.start(); + + System.out.print("Waiting for Gateway to see association with Service..."); + + GW_ASSOCIATION_LATCH.await(); + + System.out.print("done\nWaiting for Service to see association with Gateway..."); + + SVC_ASSOCIATION_LATCH.await(); + + System.out.println("done"); + + EchoGateway.runTest(gwHandler, () -> reporter.onReport(gw.report())); + + reporter.onReport(gw.report()); + reporter.onReport(svc.report()); + } + + System.out.println("EchoEmbeddedIpc is now terminated."); + System.exit(0); + } + + private static void serviceError(final Throwable th) + { + th.printStackTrace(); + } + + private static void associationWithServiceEstablished() + { + GW_ASSOCIATION_LATCH.countDown(); + } + + private static void associationWithServiceBroken() + { + System.out.println("Association with Service broken."); + } + + private static void associationWithGatewayEstablished() + { + SVC_ASSOCIATION_LATCH.countDown(); + } + + private static void associationWithGatewayBroken() + { + System.out.println("Association with Gateway broken."); + } +} diff --git a/test-perf/echo/EchoGateway.java b/test-perf/echo/EchoGateway.java index ba6ed25..36943d1 100644 --- a/test-perf/echo/EchoGateway.java +++ b/test-perf/echo/EchoGateway.java @@ -6,12 +6,15 @@ import org.helios.AeronStream; import org.helios.Helios; import org.helios.HeliosContext; +import org.helios.HeliosDriver; import org.helios.gateway.Gateway; -import org.helios.infra.RateReport; +import org.helios.infra.ConsoleReporter; +import org.helios.infra.Report; +import org.helios.util.ShutdownHelper; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static java.lang.System.nanoTime; @@ -32,11 +35,12 @@ public class EchoGateway public static void main(String[] args) throws Exception { - System.out.print("Starting Helios service..."); + System.out.print("Starting Helios..."); final HeliosContext context = new HeliosContext(); + final HeliosDriver driver = new HeliosDriver(context, "./.aeronGateway"); - try(final Helios helios = new Helios(context)) + try(final Helios helios = new Helios(context, driver)) { helios.errorHandler(EchoGateway::serviceError); @@ -44,34 +48,37 @@ public static void main(String[] args) throws Exception final AeronStream inputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); final AeronStream outputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); - final Gateway gw = helios.addGateway(EchoGatewayHandler::new, - EchoGateway::serviceAssociationEstablished, EchoGateway::serviceAssociationBroken, - outputStream, inputStream); + final Gateway gw = helios.addGateway( + EchoGateway::serviceAssociationEstablished, EchoGateway::serviceAssociationBroken); + final EchoGatewayHandler gwHandler = gw.addEndPoint(outputStream, inputStream, EchoGatewayHandler::new); + + final ConsoleReporter reporter = new ConsoleReporter(); + + ShutdownHelper.register(() -> reporter.onReport(gw.report())); helios.start(); - System.out.print("Waiting for association with EchoService..."); + System.out.print("done\nEchoGateway is now running\nWaiting for association with EchoService..."); ASSOCIATION_LATCH.await(); - System.out.println("done\nEchoGateway is now running."); + System.out.println("done"); - runTest(gw); + runTest(gwHandler, () -> reporter.onReport(gw.report())); - System.out.println("EchoGateway is now terminated."); + reporter.onReport(gw.report()); } + + System.out.println("EchoGateway is now terminated"); } - static void runTest(final Gateway gw) + static void runTest(final EchoGatewayHandler gwHandler, final Runnable reportRunnable) { - final EchoGatewayHandler proxy = gw.handler(); - final List reportList = gw.reportList(); - System.out.println("Warming up... " + WARMUP_NUMBER_OF_ITERATIONS + " iterations of " + WARMUP_NUMBER_OF_MESSAGES + " messages"); for (int i = 0; i < WARMUP_NUMBER_OF_ITERATIONS; i++) { - runIterations(proxy, WARMUP_NUMBER_OF_MESSAGES); + runIterations(gwHandler, WARMUP_NUMBER_OF_MESSAGES); } final ContinueBarrier barrier = new ContinueBarrier("Execute again?"); @@ -85,7 +92,7 @@ static void runTest(final Gateway gw) System.out.println("Echoing " + NUMBER_OF_MESSAGES + " messages"); - final long elapsedTime = runIterations(proxy, NUMBER_OF_MESSAGES); + final long elapsedTime = runIterations(gwHandler, NUMBER_OF_MESSAGES); System.out.println( String.format("%d iteration, %d ops, %d ns, %d ms, rate %.02g ops/s", @@ -95,7 +102,7 @@ static void runTest(final Gateway gw) TimeUnit.NANOSECONDS.toMillis(elapsedTime), ((double)NUMBER_OF_MESSAGES / (double)elapsedTime) * 1_000_000_000)); - reportList.forEach(report -> report.print(System.out)); + reportRunnable.run(); if (NUMBER_OF_ITERATIONS <= 0) { @@ -114,18 +121,18 @@ private static long runIterations(final EchoGatewayHandler gatewayHandler, final { try { - //final long echoTimestampSent = System.nanoTime(); + final long echoTimestampSent = System.nanoTime(); - gatewayHandler.sendEcho(i); + gatewayHandler.sendEcho(echoTimestampSent);//i /*long echoTimestampReceived; do { echoTimestampReceived = gatewayHandler.getTimestamp(); } - while (echoTimestampReceived != echoTimestampSent); + while (echoTimestampReceived != echoTimestampSent);*/ - final long echoRttNs = System.nanoTime() - echoTimestampReceived; + /*final long echoRttNs = System.nanoTime() - echoTimestampReceived; HISTOGRAM.recordValue(echoRttNs);*/ } diff --git a/test-perf/echo/EchoGatewayHandler.java b/test-perf/echo/EchoGatewayHandler.java index a1bf6cb..15a6958 100644 --- a/test-perf/echo/EchoGatewayHandler.java +++ b/test-perf/echo/EchoGatewayHandler.java @@ -5,37 +5,52 @@ import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.UnsafeBuffer; import org.agrona.concurrent.ringbuffer.RingBuffer; -import org.helios.infra.MessageTypes; import org.helios.gateway.GatewayHandler; -import org.helios.util.DirectBufferAllocator; +import org.helios.infra.MessageTypes; +import org.helios.mmb.DataMessage; +import org.helios.mmb.sbe.ComponentType; import org.helios.util.RingBufferPool; import static echo.EchoConfiguration.MESSAGE_LENGTH; -import static org.agrona.UnsafeAccess.UNSAFE; public class EchoGatewayHandler implements GatewayHandler { - private static final long TIMESTAMP_OFFSET; + //private static final long TIMESTAMP_OFFSET; private final RingBufferPool outputBufferPool; private final IdleStrategy idleStrategy = new BusySpinIdleStrategy(); - private final UnsafeBuffer echoBuffer = new UnsafeBuffer(DirectBufferAllocator.allocate(MESSAGE_LENGTH)); + //private final UnsafeBuffer echoBuffer = new UnsafeBuffer(DirectBufferAllocator.allocate(MESSAGE_LENGTH)); + private final DataMessage outgoingDataMessage = new DataMessage(); + private final DataMessage incomingDataMessage = new DataMessage(); - private volatile long timestamp = 0; + //private volatile long timestamp = 0; - public EchoGatewayHandler(final RingBufferPool outputBufferPool) + EchoGatewayHandler(final RingBufferPool outputBufferPool) { this.outputBufferPool = outputBufferPool; + + outgoingDataMessage.allocate(ComponentType.Gateway, (short)1, MESSAGE_LENGTH); } @Override public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) { - /* ECHO GATEWAY message processing: store last echo timestamp */ + if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) + { + incomingDataMessage.wrap(buffer, index, length); + final UnsafeBuffer dataBuffer = incomingDataMessage.dataBuffer(); + final int dataOffset = incomingDataMessage.dataBufferOffset(); + + /* ECHO GATEWAY message processing: store last echo timestamp */ + final long echoTimestamp = dataBuffer.getLong(dataOffset); + + //final long echoTimestamp = buffer.getLong(index); - final long echoTimestamp = buffer.getLong(index); + //UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, echoTimestamp); - UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, echoTimestamp); + //final long timestamp = timestampQueue.remove(); + //System.out.println("RECEIVED echoTimestamp=" + echoTimestamp); + } } @Override @@ -43,21 +58,31 @@ public void close() throws Exception { } - public void sendEcho(final long echoTimestamp) + void sendEcho(final long echoTimestamp) { final RingBuffer outputBuffer = outputBufferPool.outputRingBuffers().iterator().next(); // FIXME: refactoring to avoid this API - echoBuffer.putLong(0, echoTimestamp); + final UnsafeBuffer dataBuffer = outgoingDataMessage.dataBuffer(); + final int dataOffset = outgoingDataMessage.dataBufferOffset(); + + //echoBuffer.putLong(0, echoTimestamp); + dataBuffer.putLong(dataOffset, echoTimestamp); - while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, echoBuffer, 0, MESSAGE_LENGTH)) + /*while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, echoBuffer, 0, MESSAGE_LENGTH)) + { + idleStrategy.idle(0); + }*/ + while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, dataBuffer, 0, dataBuffer.capacity())) { idleStrategy.idle(0); } - UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, 0); + //System.out.println("SENT echoTimestamp=" + echoTimestamp); + //UNSAFE.putOrderedLong(this, TIMESTAMP_OFFSET, 0); + //timestampQueue.add(echoTimestamp); } - public long getTimestamp() + /*public long getTimestamp() { return timestamp; } @@ -72,5 +97,5 @@ public long getTimestamp() { throw new RuntimeException(ex); } - } + }*/ } diff --git a/test-perf/echo/EchoService.java b/test-perf/echo/EchoService.java index 1a864e1..6111072 100644 --- a/test-perf/echo/EchoService.java +++ b/test-perf/echo/EchoService.java @@ -3,6 +3,8 @@ import org.helios.AeronStream; import org.helios.Helios; import org.helios.HeliosContext; +import org.helios.HeliosDriver; +import org.helios.infra.ConsoleReporter; import org.helios.service.Service; import org.helios.util.ShutdownHelper; @@ -17,34 +19,37 @@ public class EchoService public static void main(String[] args) throws Exception { - System.out.print("Starting Helios service..."); + System.out.print("Starting Helios..."); final HeliosContext context = new HeliosContext() .setJournalEnabled(true); + final HeliosDriver driver = new HeliosDriver(context, "./.aeronService"); - try(final Helios helios = new Helios(context)) + try(final Helios helios = new Helios(context, driver)) { System.out.print("done\nCreating Helios service..."); final AeronStream inputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); final AeronStream outputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); - final Service echoService = helios.addService( + final Service svc = helios.addService( EchoServiceHandler::new, inputStream, outputStream); - final EchoServiceHandler echoServiceHandler = echoService.handler(); + final EchoServiceHandler echoServiceHandler = svc.handler(); final CountDownLatch runningLatch = new CountDownLatch(1); ShutdownHelper.register(runningLatch::countDown); - System.out.println("done\nEchoService is now running."); - helios.start(); + System.out.println("done\nEchoService is now running"); + runningLatch.await(); System.out.println("EchoService last snapshot: " + echoServiceHandler.lastSnapshotTimestamp()); + + new ConsoleReporter().onReport(svc.report()); } - System.out.println("EchoService is now terminated."); + System.out.println("EchoService is now terminated"); } } diff --git a/test-perf/runEchoEmbeddedIpc.sh b/test-perf/runEchoEmbeddedIpc.sh new file mode 100755 index 0000000..30f3a27 --- /dev/null +++ b/test-perf/runEchoEmbeddedIpc.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +sudo -S sysctl net.core.rmem_max=2097152 > /dev/null +sudo -S sysctl net.core.wmem_max=2097152 > /dev/null + +java -Xms2G -Xmx4G -XX:BiasedLockingStartupDelay=0 \ +-Daeron.mtu.length=16384 -Daeron.rcv.buffer.length=16384 -Daeron.socket.so_sndbuf=2097152 \ +-Daeron.socket.so_rcvbuf=2097152 -Daeron.rcv.initial.window.length=2097152 -Dagrona.disable.bounds.checks=true \ +-cp ".:../build/production/Helios:../build/test/Helios:../lib/*" \ +echo.EchoEmbeddedIpc \ No newline at end of file diff --git a/test/org/helios/HeliosTest.java b/test/org/helios/HeliosTest.java index 291daef..583ee99 100644 --- a/test/org/helios/HeliosTest.java +++ b/test/org/helios/HeliosTest.java @@ -11,6 +11,10 @@ import org.helios.gateway.GatewayHandler; import org.helios.gateway.GatewayHandlerFactory; import org.helios.infra.MessageTypes; +import org.helios.mmb.DataMessage; +import org.helios.mmb.sbe.ComponentDecoder; +import org.helios.mmb.sbe.ComponentType; +import org.helios.mmb.sbe.DataDecoder; import org.helios.service.Service; import org.helios.service.ServiceHandler; import org.helios.util.RingBufferPool; @@ -26,9 +30,6 @@ public class HeliosTest { - private static final int EMBEDDED_INPUT_STREAM_ID = 10; - private static final int EMBEDDED_OUTPUT_STREAM_ID = 11; - private static final String INPUT_CHANNEL = "aeron:udp?endpoint=localhost:40123"; private static final String OUTPUT_CHANNEL = "aeron:udp?endpoint=localhost:40124"; private static final int INPUT_STREAM_ID = 10; @@ -37,20 +38,20 @@ public class HeliosTest private static final int NUM_GATEWAYS = 2; //2 @Test - public void shouldAssociateOneEmbeddedServiceWithOneEmbeddedGateway() throws Exception + public void shouldAssociateOneServiceWithOneGatewayIPC() throws Exception { try(final Helios helios = new Helios()) { final CountDownLatch s2gAssociationLatch = new CountDownLatch(1); final CountDownLatch g2sAssociationLatch = new CountDownLatch(1); - final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID); - final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID); + final AeronStream ipcInputStream = helios.newIpcStream(INPUT_STREAM_ID); + final AeronStream ipcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID); helios.addService(NullServiceHandler::new, s2gAssociationLatch::countDown, ()->{}, - embeddedInputStream, embeddedOutputStream); + ipcInputStream, ipcOutputStream); helios.addGateway(NullGatewayHandler::new, g2sAssociationLatch::countDown, ()->{}, - embeddedInputStream, embeddedOutputStream); + ipcInputStream, ipcOutputStream); helios.start(); @@ -71,15 +72,15 @@ public void shouldAssociateOneServiceWithOneGateway() throws Exception final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); - helios.addService(NullServiceHandler::new) + helios.addService(NullServiceHandler::new, svcInputStream) .availableAssociationHandler(s2gAssociationLatch::countDown) - .addEndPoint(svcInputStream, svcOutputStream); + .addEndPoint(svcOutputStream); final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); - helios.addGateway(NullGatewayHandler::new) + helios.addGateway() .availableAssociationHandler(g2sAssociationLatch::countDown) - .addEndPoint(gwOutputStream, gwInputStream); + .addEndPoint(gwOutputStream, gwInputStream, NullGatewayHandler::new); helios.start(); @@ -91,26 +92,26 @@ public void shouldAssociateOneServiceWithOneGateway() throws Exception } @Test - public void shouldAssociateOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Exception + public void shouldAssociateOneServiceWithMultipleGatewaysIPC() throws Exception { try(final Helios helios = new Helios()) { final CountDownLatch s2gAssociationLatch = new CountDownLatch(NUM_GATEWAYS); final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS); - final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID); + final AeronStream ipcInputStream = helios.newIpcStream(INPUT_STREAM_ID); final Service svc = helios.addService(NullServiceHandler::new, - s2gAssociationLatch::countDown, () -> {}); + ipcInputStream, s2gAssociationLatch::countDown, () -> {}); for (int i = 0; i < NUM_GATEWAYS; i++) { - final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID+i); + final AeronStream ipcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID+i); - svc.addEndPoint(embeddedInputStream, embeddedOutputStream); + svc.addEndPoint(ipcOutputStream); helios.addGateway(NullGatewayHandler::new, g2sAssociationLatch::countDown, () -> {}, - embeddedInputStream, embeddedOutputStream); + ipcInputStream, ipcOutputStream); } helios.start(); @@ -130,22 +131,23 @@ public void shouldAssociateOneServiceWithMultipleGateways() throws Exception final CountDownLatch s2gAssociationLatch = new CountDownLatch(NUM_GATEWAYS); final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS); - final Service svc = helios.addService(NullServiceHandler::new) + final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); + final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); + + final Service svc = helios.addService(NullServiceHandler::new, svcInputStream) .availableAssociationHandler(s2gAssociationLatch::countDown); for (int i = 0; i < NUM_GATEWAYS; i++) { - final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); - final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i); - svc.addEndPoint(svcInputStream, svcOutputStream); + final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i); + svc.addEndPoint(svcOutputStream); - final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); - final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i); + final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i); helios.addGateway(NullGatewayHandler::new, g2sAssociationLatch::countDown, () -> {}, gwOutputStream, gwInputStream); } - assertTrue(helios.numServiceSubscriptions() == NUM_GATEWAYS); + assertTrue(helios.numServiceSubscriptions() == 1); assertTrue(helios.numGatewaySubscriptions() == NUM_GATEWAYS); helios.start(); @@ -158,7 +160,7 @@ public void shouldAssociateOneServiceWithMultipleGateways() throws Exception } @Test - public void shouldPingOneServiceWithOneGateway() throws Exception + public void shouldPingOneServiceWithOneGatewayIPC() throws Exception { try(final Helios helios = new Helios()) { @@ -166,44 +168,43 @@ public void shouldPingOneServiceWithOneGateway() throws Exception final CountDownLatch g2sAssociationLatch = new CountDownLatch(1); final CountDownLatch pingLatch = new CountDownLatch(1); - final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); - final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); - final Service svc = helios.addService(PingServiceHandler::new) - .availableAssociationHandler(s2gAssociationLatch::countDown) - .addEndPoint(svcInputStream, svcOutputStream); + final AeronStream svcInputStream = helios.newIpcStream(INPUT_STREAM_ID); + final AeronStream svcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID); + + final Service svc = helios.addService(PingServiceHandler::new, + s2gAssociationLatch::countDown, () -> {}, + svcInputStream, svcOutputStream); final int gatewayId = 1; svc.handler().addOutputStream(gatewayId, svcOutputStream);// FIXME: define Helios Gateway-Service (HGS) protocol - final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); - final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); - final Gateway gw = - helios.addGateway( - (outputBufferPool) -> new PingGatewayHandler(gatewayId, - outputBufferPool, gwOutputStream, - (msgTypeId, buffer, index, length) -> { - if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) - { - // APPLICATION message type, ignore */ - assertTrue(buffer.getInt(index) == gatewayId); - assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH); - pingLatch.countDown(); - } - /*else - { - // ADMINISTRATIVE message type, ignore - }*/ - }) - ); + final AeronStream gwOutputStream = helios.newIpcStream(INPUT_STREAM_ID); + final AeronStream gwInputStream = helios.newIpcStream(OUTPUT_STREAM_ID); + final Gateway gw = helios.addGateway(); gw.availableAssociationHandler(g2sAssociationLatch::countDown); - gw.addEndPoint(gwOutputStream, gwInputStream); + final PingGatewayHandler pingHandler = gw.addEndPoint(gwOutputStream, gwInputStream, + (outputBufferPool) -> new PingGatewayHandler(gatewayId, + outputBufferPool, gwOutputStream, + (msgTypeId, buffer, index, length) -> { + if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) + { + assertTrue(buffer.getInt(index) == gatewayId); + assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH); + pingLatch.countDown(); + } + /*else + { + // ADMINISTRATIVE message type, ignore + }*/ + }) + ); helios.start(); s2gAssociationLatch.await(); g2sAssociationLatch.await(); - gw.handler().sendPing(); + pingHandler.sendPing(); pingLatch.await(); } @@ -212,7 +213,7 @@ public void shouldPingOneServiceWithOneGateway() throws Exception } @Test - public void shouldPingOneEmbeddedServiceWithOneEmbeddedGateway() throws Exception + public void shouldPingOneServiceWithOneGateway() throws Exception { try(final Helios helios = new Helios()) { @@ -220,43 +221,43 @@ public void shouldPingOneEmbeddedServiceWithOneEmbeddedGateway() throws Exceptio final CountDownLatch g2sAssociationLatch = new CountDownLatch(1); final CountDownLatch pingLatch = new CountDownLatch(1); - final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID); - final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID); - - final Service svc = helios.addService(PingServiceHandler::new, - s2gAssociationLatch::countDown, () -> {}, - embeddedInputStream, embeddedOutputStream); + final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); + final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); + final Service svc = helios.addService(PingServiceHandler::new, svcInputStream) + .availableAssociationHandler(s2gAssociationLatch::countDown) + .addEndPoint(svcOutputStream); final int gatewayId = 1; - svc.handler().addOutputStream(gatewayId, embeddedOutputStream);// FIXME: define Helios Gateway-Service (HGS) protocol - - final Gateway gw = - helios.addGateway( - (outputBufferPool) -> new PingGatewayHandler(gatewayId, - outputBufferPool, embeddedInputStream, - (msgTypeId, buffer, index, length) -> { - if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) - { - // APPLICATION message type, ignore */ - assertTrue(buffer.getInt(index) == gatewayId); - assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH); - pingLatch.countDown(); - } - /*else - { - // ADMINISTRATIVE message type, ignore - }*/ - }) - ); + svc.handler().addOutputStream(gatewayId, svcOutputStream);// FIXME: define Helios Gateway-Service (HGS) protocol + + final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); + final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID); + final Gateway gw = helios.addGateway(); gw.availableAssociationHandler(g2sAssociationLatch::countDown); - gw.addEndPoint(embeddedInputStream, embeddedOutputStream); + final PingGatewayHandler pingHandler = gw.addEndPoint(gwOutputStream, gwInputStream, + (outputBufferPool) -> new PingGatewayHandler(gatewayId, + outputBufferPool, gwOutputStream, + (msgTypeId, buffer, index, length) -> { + if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) + { + // APPLICATION message type, ignore */ + assertTrue(buffer.getInt(index) == gatewayId); + assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH); + pingLatch.countDown(); + } + /*else + { + // ADMINISTRATIVE message type, ignore + }*/ + }) + ); helios.start(); s2gAssociationLatch.await(); g2sAssociationLatch.await(); - gw.handler().sendPing(); + pingHandler.sendPing(); pingLatch.await(); } @@ -265,7 +266,7 @@ public void shouldPingOneEmbeddedServiceWithOneEmbeddedGateway() throws Exceptio } @Test - public void shouldPingOneServiceWithMultipleGateways() throws Exception + public void shouldPingOneServiceWithMultipleGatewaysIPC() throws Exception { try(final Helios helios = new Helios()) { @@ -273,28 +274,26 @@ public void shouldPingOneServiceWithMultipleGateways() throws Exception final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS); final CountDownLatch pingLatch = new CountDownLatch(NUM_GATEWAYS); - final Service svc = helios.addService(PingServiceHandler::new, + final AeronStream ipcInputStream = helios.newIpcStream(INPUT_STREAM_ID); + + final Service svc = helios.addService(PingServiceHandler::new, ipcInputStream, s2gAssociationLatch::countDown, () -> {}); - final List> gateways = new ArrayList<>(); + final List gatewayHandlers = new ArrayList<>(); for (int i = 0; i < NUM_GATEWAYS; i++) { - final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID+i); - final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i); + final AeronStream ipcOutputStream = helios.newIpcStream(OUTPUT_STREAM_ID+i); - svc.addEndPoint(svcInputStream, svcOutputStream); + svc.addEndPoint(ipcOutputStream); - svc.handler().addOutputStream(i, svcOutputStream); // FIXME: define Helios Gateway-Service (HGS) protocol - - final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID+i); - final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+NUM_GATEWAYS+i); + svc.handler().addOutputStream(i, ipcOutputStream); // FIXME: define Helios Gateway-Service (HGS) protocol - final Gateway gw = helios.addGateway( - new PingGatewayHandlerFactory(i, gwOutputStream, new PingMessageHandler(i, pingLatch))); + final Gateway gw = helios.addGateway(); gw.availableAssociationHandler(g2sAssociationLatch::countDown); - gw.addEndPoint(gwOutputStream, gwInputStream); + final PingGatewayHandler gwHandler = gw.addEndPoint(ipcInputStream, ipcOutputStream, + new PingGatewayHandlerFactory(i, ipcInputStream, new PingMessageHandler(i, pingLatch))); - gateways.add(gw); + gatewayHandlers.add(gwHandler); } helios.start(); @@ -302,7 +301,7 @@ public void shouldPingOneServiceWithMultipleGateways() throws Exception s2gAssociationLatch.await(); g2sAssociationLatch.await(); - gateways.forEach((gw) -> gw.handler().sendPing()); + gatewayHandlers.forEach(PingGatewayHandler::sendPing); pingLatch.await(); } @@ -310,8 +309,8 @@ public void shouldPingOneServiceWithMultipleGateways() throws Exception assertTrue(true); } - /*@Test - public void shouldPingOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Exception + @Test + public void shouldPingOneServiceWithMultipleGateways() throws Exception { try(final Helios helios = new Helios()) { @@ -319,42 +318,29 @@ public void shouldPingOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Ex final CountDownLatch g2sAssociationLatch = new CountDownLatch(NUM_GATEWAYS); final CountDownLatch pingLatch = new CountDownLatch(NUM_GATEWAYS); - final AeronStream embeddedInputStream = helios.newEmbeddedStream(EMBEDDED_INPUT_STREAM_ID); + final AeronStream svcInputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); - final Service svc = helios.addService(PingServiceHandler::new, + final Service svc = helios.addService(PingServiceHandler::new, svcInputStream, s2gAssociationLatch::countDown, () -> {}); - final List> gateways = new ArrayList<>(); + final List gatewayHandlers = new ArrayList<>(); for (int i = 0; i < NUM_GATEWAYS; i++) { - final AeronStream embeddedOutputStream = helios.newEmbeddedStream(EMBEDDED_OUTPUT_STREAM_ID+i); - - svc.addEndPoint(embeddedInputStream, embeddedOutputStream); - - final int gatewayId = i; - final Gateway gw = helios.addGateway( - (outputBufferPool) -> new PingGatewayHandler(outputBufferPool, - (msgTypeId, buffer, index, length) -> { - if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) - { - System.out.println("PingGatewayHandler::onMessage buffer.getInt(index)=" + buffer.getInt(index) + " gatewayId=" + gatewayId); - boolean b = buffer.getInt(index) == gatewayId; - System.out.println("PingGatewayHandler::onMessage b=" + b); - // APPLICATION message type, ignore - assertTrue(buffer.getInt(index) == gatewayId); - assertTrue(length == PingGatewayHandler.MESSAGE_LENGTH); - pingLatch.countDown(); - } - //else - //{ - // ADMINISTRATIVE message type, ignore - //} - }, - gatewayId)); + final AeronStream svcOutputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i); + + svc.addEndPoint(svcOutputStream); + + svc.handler().addOutputStream(i, svcOutputStream); // FIXME: define Helios Gateway-Service (HGS) protocol + + final AeronStream gwInputStream = helios.newStream(OUTPUT_CHANNEL, OUTPUT_STREAM_ID+i); + final AeronStream gwOutputStream = helios.newStream(INPUT_CHANNEL, INPUT_STREAM_ID); + + final Gateway gw = helios.addGateway(); gw.availableAssociationHandler(g2sAssociationLatch::countDown); - gw.addEndPoint(embeddedInputStream, embeddedOutputStream); + final PingGatewayHandler pingHandler = gw.addEndPoint(gwOutputStream, gwInputStream, + new PingGatewayHandlerFactory(i, gwOutputStream, new PingMessageHandler(i, pingLatch))); - gateways.add(gw); + gatewayHandlers.add(pingHandler); } helios.start(); @@ -362,13 +348,13 @@ public void shouldPingOneEmbeddedServiceWithMultipleEmbeddedGateways() throws Ex s2gAssociationLatch.await(); g2sAssociationLatch.await(); - gateways.forEach((gw) -> gw.handler().sendPing()); + gatewayHandlers.forEach(PingGatewayHandler::sendPing); pingLatch.await(); } assertTrue(true); - }*/ + } @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenOnlyContextIsNull() @@ -421,7 +407,7 @@ public void shouldThrowExceptionWhenServiceRequestStreamIsNull() { try(final Helios helios = new Helios()) { - helios.addService((outputBuffers) -> null).addEndPoint(null, helios.newStream(OUTPUT_CHANNEL, 0)); + helios.addService((outputBuffers) -> null, null).addEndPoint(helios.newStream(OUTPUT_CHANNEL, 0)); } } @@ -431,7 +417,7 @@ public void shouldThrowExceptionWhenServiceResponseStreamIsNull() { try(final Helios helios = new Helios()) { - helios.addService((outputBuffers) -> null).addEndPoint(helios.newStream(INPUT_CHANNEL, 0), null); + helios.addService((outputBuffers) -> null, helios.newStream(INPUT_CHANNEL, 0)).addEndPoint(null); } } @@ -440,7 +426,7 @@ public void shouldThrowExceptionWhenServiceFactoryIsNull() { try(final Helios helios = new Helios()) { - helios.addService(null); + helios.addService(null, helios.newStream(INPUT_CHANNEL, 0)); } } @@ -449,7 +435,7 @@ public void shouldThrowExceptionWhenGatewayRequestStreamIsNull() { try(final Helios helios = new Helios()) { - helios.addGateway((outputBuffers) -> null).addEndPoint(null, helios.newStream(INPUT_CHANNEL, 0)); + helios.addGateway().addEndPoint(null, helios.newStream(INPUT_CHANNEL, 0), (outputBuffers) -> null); } } @@ -458,7 +444,7 @@ public void shouldThrowExceptionWhenGatewayResponseStreamIsNull() { try(final Helios helios = new Helios()) { - helios.addGateway((outputBuffers) -> null).addEndPoint(helios.newStream(INPUT_CHANNEL, 0), null); + helios.addGateway().addEndPoint(helios.newStream(OUTPUT_CHANNEL, 0), null, (outputBuffers) -> null); } } @@ -467,7 +453,7 @@ public void shouldThrowExceptionWhenGatewayFactoryIsNull() { try(final Helios helios = new Helios()) { - helios.addGateway(null); + helios.addGateway().addEndPoint(helios.newStream(OUTPUT_CHANNEL, 0), helios.newStream(INPUT_CHANNEL, 0), null); } } @@ -512,6 +498,7 @@ private class PingServiceHandler implements ServiceHandler private final RingBufferPool ringBufferPool; private final IdleStrategy idleStrategy; private Int2ObjectHashMap gatewayId2StreamMap; // FIXME: temporary workaround, use HGS protocol + private final DataMessage incomingDataMessage = new DataMessage(); PingServiceHandler(final RingBufferPool ringBufferPool) { @@ -530,7 +517,15 @@ public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int { if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) { - final int gatewayId = buffer.getInt(index); // FIXME: temporary workaround, use HGS protocol header + final DataDecoder dataDecoder = incomingDataMessage.wrap(buffer, index, length); + final ComponentDecoder componentDecoder = dataDecoder.mmbHeader().component(); + + final ComponentType componentType = componentDecoder.componentType(); + assertTrue(componentType == ComponentType.Gateway); + + final int gatewayId = componentDecoder.componentId(); + + //final int gatewayId = buffer.getInt(index); // FIXME: temporary workaround, use HGS protocol header final AeronStream outputStream = gatewayId2StreamMap.get(gatewayId); final RingBuffer outputBuffer = ringBufferPool.getOutputRingBuffer(outputStream);// FIXME: refactoring to avoid this API @@ -562,7 +557,9 @@ private class PingGatewayHandler implements GatewayHandler private final AeronStream gwOutputStream; private final MessageHandler delegate; private final IdleStrategy idleStrategy; - private final UnsafeBuffer echoBuffer; + //private final UnsafeBuffer echoBuffer; + private final DataMessage outgoingDataMessage = new DataMessage(); + private final DataMessage incomingDataMessage = new DataMessage(); PingGatewayHandler(final int gatewayId, final RingBufferPool ringBufferPool, final AeronStream gwOutputStream, final MessageHandler delegate) @@ -572,16 +569,22 @@ private class PingGatewayHandler implements GatewayHandler this.gwOutputStream = gwOutputStream; this.delegate = delegate; this.idleStrategy = new BusySpinIdleStrategy(); - this.echoBuffer = new UnsafeBuffer(ByteBuffer.allocate(MESSAGE_LENGTH)); + //this.echoBuffer = new UnsafeBuffer(ByteBuffer.allocate(MESSAGE_LENGTH)); + + outgoingDataMessage.allocate(ComponentType.Gateway, (short)gatewayId, MESSAGE_LENGTH); } void sendPing() { final RingBuffer outputBuffer = ringBufferPool.getOutputRingBuffer(gwOutputStream); // FIXME: refactoring to avoid this API - echoBuffer.putInt(0, gatewayId); // FIXME: temporary workaround, use HGS protocol header + //echoBuffer.putInt(0, gatewayId); // FIXME: temporary workaround, use HGS protocol header + final MutableDirectBuffer dataBuffer = outgoingDataMessage.dataBuffer(); + final int dataBufferOffset = outgoingDataMessage.dataBufferOffset(); + + dataBuffer.putInt(dataBufferOffset, gatewayId); - while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, echoBuffer, 0, MESSAGE_LENGTH)) + while (!outputBuffer.write(MessageTypes.APPLICATION_MSG_ID, dataBuffer, 0, dataBuffer.capacity())) { idleStrategy.idle(0); } @@ -590,7 +593,23 @@ void sendPing() @Override public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) { - delegate.onMessage(msgTypeId, buffer, index, length); + if (msgTypeId == MessageTypes.APPLICATION_MSG_ID) + { + incomingDataMessage.wrap(buffer, index, length); + + final MutableDirectBuffer dataBuffer = incomingDataMessage.dataBuffer(); + final int dataBufferOffset = incomingDataMessage.dataBufferOffset(); + final int dataBufferLength = incomingDataMessage.dataBufferLength(); + + // + delegate.onMessage(msgTypeId, dataBuffer, dataBufferOffset, dataBufferLength); + } + /*else + { + // ADMINISTRATIVE message type, ignore + }*/ + + //delegate.onMessage(msgTypeId, buffer, index, length); } @Override diff --git a/test/org/helios/snapshot/SnapshotTest.java b/test/org/helios/mmb/SnapshotMessageTest.java similarity index 87% rename from test/org/helios/snapshot/SnapshotTest.java rename to test/org/helios/mmb/SnapshotMessageTest.java index eafa42e..fe2733c 100644 --- a/test/org/helios/snapshot/SnapshotTest.java +++ b/test/org/helios/mmb/SnapshotMessageTest.java @@ -1,10 +1,11 @@ -package org.helios.snapshot; +package org.helios.mmb; import org.agrona.concurrent.BusySpinIdleStrategy; import org.agrona.concurrent.UnsafeBuffer; import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer; import org.agrona.concurrent.ringbuffer.RingBuffer; import org.helios.infra.MessageTypes; +import org.helios.mmb.SnapshotMessage; import org.helios.mmb.sbe.*; import org.helios.util.DirectBufferAllocator; import org.junit.Test; @@ -12,13 +13,14 @@ import static org.agrona.concurrent.ringbuffer.RingBufferDescriptor.TRAILER_LENGTH; import static org.junit.Assert.assertTrue; -public class SnapshotTest +public class SnapshotMessageTest { private final int BUFFER_SIZE = (16 * 1024) + TRAILER_LENGTH; private final RingBuffer ringBuffer = new OneToOneRingBuffer( new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(BUFFER_SIZE))); + private final SnapshotMessage snapshotMessage = new SnapshotMessage(); private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder(); private final LoadSnapshotDecoder loadSnapshotDecoder = new LoadSnapshotDecoder(); private final SaveSnapshotDecoder saveSnapshotDecoder = new SaveSnapshotDecoder(); @@ -26,7 +28,7 @@ public class SnapshotTest @Test public void shouldWriteLoadSnapshotMessage() { - Snapshot.writeLoadMessage(ringBuffer, new BusySpinIdleStrategy()); + snapshotMessage.writeLoadMessage(ringBuffer, new BusySpinIdleStrategy()); int readBytes; do @@ -59,7 +61,7 @@ public void shouldWriteLoadSnapshotMessage() @Test public void shouldWriteSaveSnapshotMessage() { - Snapshot.writeSaveMessage(ringBuffer, new BusySpinIdleStrategy()); + snapshotMessage.writeSaveMessage(ringBuffer, new BusySpinIdleStrategy()); int readBytes; do @@ -92,19 +94,19 @@ public void shouldWriteSaveSnapshotMessage() @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenRingBufferIsNullInLoad() { - Snapshot.writeLoadMessage(null, new BusySpinIdleStrategy()); + snapshotMessage.writeLoadMessage(null, new BusySpinIdleStrategy()); } @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenRingBufferIsNullInSave() { - Snapshot.writeSaveMessage(null, new BusySpinIdleStrategy()); + snapshotMessage.writeSaveMessage(null, new BusySpinIdleStrategy()); } @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenIdleStrategyIsNullInLoad() { - Snapshot.writeLoadMessage( + snapshotMessage.writeLoadMessage( new OneToOneRingBuffer(new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(BUFFER_SIZE))), null); } @@ -112,7 +114,7 @@ public void shouldThrowExceptionWhenIdleStrategyIsNullInLoad() @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenIdleStrategyIsNullInSave() { - Snapshot.writeSaveMessage( + snapshotMessage.writeSaveMessage( new OneToOneRingBuffer(new UnsafeBuffer(DirectBufferAllocator.allocateCacheAligned(BUFFER_SIZE))), null); } diff --git a/test/org/helios/service/ServiceReportTest.java b/test/org/helios/service/ServiceReportTest.java index 64044d1..07f1e14 100644 --- a/test/org/helios/service/ServiceReportTest.java +++ b/test/org/helios/service/ServiceReportTest.java @@ -5,6 +5,7 @@ import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer; import org.agrona.concurrent.ringbuffer.RingBuffer; import org.helios.*; +import org.helios.infra.AssociationHandler; import org.helios.infra.InputMessageProcessor; import org.helios.infra.OutputMessageProcessor; import org.helios.util.DirectBufferAllocator; @@ -26,16 +27,18 @@ public class ServiceReportTest @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenInputMessageProcessorIsNull() { - try (final Helios helios = new Helios()) - { - new ServiceReport(null, new OutputMessageProcessor(ringBuffer, helios.newEmbeddedStream(0), - new BusySpinIdleStrategy(), "")); - } + new ServiceReport(null); } @Test(expected = NullPointerException.class) public void shouldThrowExceptionWhenOutputMessageProcessorIsNull() { - new ServiceReport(new InputMessageProcessor(ringBuffer, new BusySpinIdleStrategy(), 0, ""), null); + try (final Helios helios = new Helios()) + { + new ServiceReport( + new InputMessageProcessor(ringBuffer, new BusySpinIdleStrategy(), 0, 0, helios.newIpcStream(0), + null, "") + ).addResponseProcessor(null); + } } } diff --git a/test/org/helios/status/StatusCounterTest.java b/test/org/helios/status/StatusCounterTest.java new file mode 100644 index 0000000..6f46e46 --- /dev/null +++ b/test/org/helios/status/StatusCounterTest.java @@ -0,0 +1,60 @@ +package org.helios.status; + +import org.agrona.concurrent.AtomicBuffer; +import org.agrona.concurrent.UnsafeBuffer; +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersManager; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class StatusCounterTest +{ + private static final int VALUES_BUFFER_SIZE = 1024; + private static final AtomicBuffer METADATA_BUFFER = new UnsafeBuffer(ByteBuffer.allocate(VALUES_BUFFER_SIZE * 2)); + private static final AtomicBuffer VALUES_BUFFER = new UnsafeBuffer(ByteBuffer.allocate(VALUES_BUFFER_SIZE)); + private static final CountersManager COUNTERS_MANAGER = new CountersManager(METADATA_BUFFER, VALUES_BUFFER); + + @Test(expected = NullPointerException.class) + public void shouldThrowExceptionWhenCountersManagerIsNull() + { + new StatusCounters(null); + } + + @Test + public void shouldReturnNullCounterWhenStatusCounterDescriptorIsNull() + { + try(final StatusCounters statusCounters = new StatusCounters(COUNTERS_MANAGER)) + { + final AtomicCounter counter = statusCounters.get(null); + assertNull(counter); + } + } + + @Test + public void shouldReturnCorrectStatusCounterDescriptor() + { + try(final StatusCounters statusCounters = new StatusCounters(COUNTERS_MANAGER)) + { + for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values()) + { + final AtomicCounter counter = statusCounters.get(descriptor); + assertNotNull(counter); + + counter.add(descriptor.id()); + } + + for (final StatusCounterDescriptor descriptor : StatusCounterDescriptor.values()) + { + final AtomicCounter counter = statusCounters.get(descriptor); + assertNotNull(counter); + + assertTrue(counter.get() == descriptor.id()); + } + } + } +} diff --git a/test/org/helios/util/ProcessorHelperTest.java b/test/org/helios/util/ProcessorHelperTest.java index e3ebbe2..7938806 100644 --- a/test/org/helios/util/ProcessorHelperTest.java +++ b/test/org/helios/util/ProcessorHelperTest.java @@ -36,6 +36,12 @@ private class ProcessorStub implements Processor this.runnableCode = runnableCode; } + @Override + public String name() + { + return this.toString(); + } + @Override public void start() {