Skip to content

Commit

Permalink
Added MMB Data message to correctly separate business and administrat…
Browse files Browse the repository at this point in the history
…ive data messages

Added MMB Startup message to address application-level association between GWs and SVCs
Handled MMB Startup/Shutdown messages to address application-level association between GWs and SVCs as per issues #9 and #15
Changed some thread running variables to use volatile instead of atomic primitives
Added initial status counters support
Refactored report classes and functions
Added/modified unit and perf tests
  • Loading branch information
canepat committed Dec 17, 2016
1 parent f757003 commit abab29a
Show file tree
Hide file tree
Showing 55 changed files with 1,680 additions and 731 deletions.
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import com.sun.org.apache.xalan.internal.xsltc.compiler.Copy

apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'signing'
Expand All @@ -10,7 +8,7 @@ apply plugin: 'idea'
defaultTasks 'clean', 'build', 'install'

group = 'org.helios'
version = '0.0.4-RC'
version = '0.0.4'

ext.isReleaseVersion = !version.endsWith("-RC")

Expand Down
24 changes: 16 additions & 8 deletions resources/Helios-MMB.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@

<types>
<composite name="Component" description="MMB message identifiers">
<enum name="ComponentType" encodingType="char">
<validValue name="Service">S</validValue>
<validValue name="Gateway">G</validValue>
<enum name="ComponentType" encodingType="uint8">
<validValue name="Service">0</validValue>
<validValue name="Gateway">1</validValue>
</enum>
<type name="componentId" primitiveType="uint8"/>
</composite>
Expand All @@ -45,24 +45,32 @@
</composite>
</types>

<sbe:message name="Acknowledge" id="0" description="Shutdown message">
<sbe:message name="Acknowledge" id="0" description="Acknowledge message">
<field name="mmbHeader" id="1" type="MMBHeaderType" description="MMB header" offset="0"/>
<field name="isACK" id="2" type="Boolean" description="ACK/NACK flag"/>
</sbe:message>

<sbe:message name="Shutdown" id="1" description="Shutdown message">
<sbe:message name="Startup" id="1" description="Startup message">
<field name="mmbHeader" id="1" type="MMBHeaderType" description="MMB header" offset="0"/>
</sbe:message>

<sbe:message name="Heartbeat" id="2" description="Heartbeat message">
<sbe:message name="Shutdown" id="2" description="Shutdown message">
<field name="mmbHeader" id="1" type="MMBHeaderType" description="MMB header" offset="0"/>
</sbe:message>

<sbe:message name="SaveSnapshot" id="3" description="Save snapshot message">
<sbe:message name="Heartbeat" id="3" description="Heartbeat message">
<field name="mmbHeader" id="1" type="MMBHeaderType" description="MMB header" offset="0"/>
</sbe:message>

<sbe:message name="LoadSnapshot" id="4" description="Load snapshot message">
<sbe:message name="SaveSnapshot" id="4" description="Save snapshot message">
<field name="mmbHeader" id="1" type="MMBHeaderType" description="MMB header" offset="0"/>
</sbe:message>

<sbe:message name="LoadSnapshot" id="5" description="Load snapshot message">
<field name="mmbHeader" id="1" type="MMBHeaderType" description="MMB header" offset="0"/>
</sbe:message>

<sbe:message name="Data" id="6" description="Data message">
<field name="mmbHeader" id="1" type="MMBHeaderType" description="MMB header" offset="0"/>
</sbe:message>
</sbe:messageSchema>
12 changes: 4 additions & 8 deletions src/org/helios/AeronStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.aeron.Aeron;
import io.aeron.Image;
import org.helios.mmb.sbe.ComponentType;

import java.util.Objects;

Expand All @@ -24,20 +25,15 @@ static AeronStream fromImage(final Aeron aeron, final Image image)
public final Aeron aeron;
public final String channel;
public final int streamId;
public ComponentType componentType;
public short componentId;

private final String key;

@Override
public boolean equals(Object obj)
{
if (obj != null && obj instanceof AeronStream)
{
return ((AeronStream)obj).key.equals(key);
}
else
{
return false;
}
return obj != null && obj instanceof AeronStream && ((AeronStream) obj).key.equals(key);
}

@Override
Expand Down
54 changes: 27 additions & 27 deletions src/org/helios/Helios.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.helios.gateway.GatewayHandler;
import org.helios.gateway.GatewayHandlerFactory;
import org.helios.infra.AvailableAssociationHandler;
import org.helios.infra.RateReporter;
import org.helios.infra.ReportProcessor;
import org.helios.infra.UnavailableAssociationHandler;
import org.helios.service.Service;
import org.helios.service.ServiceHandler;
Expand All @@ -46,7 +46,7 @@ public class Helios implements AutoCloseable, ErrorHandler, AvailableImageHandle
private final Long2ObjectHashMap<HeliosService<?>> serviceSubscriptionRepository;
private final Long2ObjectHashMap<HeliosGateway<?>> gatewaySubscriptionRepository;

private RateReporter reporter;
private ReportProcessor reporter;

public Helios()
{
Expand All @@ -65,6 +65,7 @@ public Helios(final HeliosContext context, final HeliosDriver driver)

final Aeron.Context aeronContext = new Aeron.Context()
.errorHandler(this).availableImageHandler(this).unavailableImageHandler(this);

if (context.isMediaDriverEmbedded())
{
aeronContext.aeronDirectoryName(driver.mediaDriver().aeronDirectoryName());
Expand All @@ -78,7 +79,7 @@ public Helios(final HeliosContext context, final HeliosDriver driver)
serviceSubscriptionRepository = new Long2ObjectHashMap<>();
gatewaySubscriptionRepository = new Long2ObjectHashMap<>();

reporter = context.isReportingEnabled() ? new RateReporter() : null;
reporter = context.isReportingEnabled() ? new ReportProcessor(1_000_000_000L, null) : null; // TODO: configure
}

public void start()
Expand Down Expand Up @@ -159,21 +160,22 @@ public AeronStream newStream(final String channel, final int streamId)
return new AeronStream(aeron, channel, streamId);
}

public AeronStream newEmbeddedStream(final int streamId)
public AeronStream newIpcStream(final int streamId)
{
return newStream(CommonContext.IPC_CHANNEL, streamId);
}

public <T extends ServiceHandler> Service<T> addService(final ServiceHandlerFactory<T> factory)
public <T extends ServiceHandler> Service<T> addService(final ServiceHandlerFactory<T> factory, final AeronStream reqStream)
{
Objects.requireNonNull(factory, "factory");
Objects.requireNonNull(reqStream, "reqStream");

final HeliosService<T> svc = new HeliosService<>(this, factory);
final HeliosService<T> svc = new HeliosService<>(this, factory, reqStream);
serviceList.add(svc);

if (reporter != null)
{
reporter.addAll(svc.reportList());
reporter.add(svc.report());
}

return svc;
Expand All @@ -182,15 +184,15 @@ public <T extends ServiceHandler> Service<T> addService(final ServiceHandlerFact
public <T extends ServiceHandler> Service<T> addService(final ServiceHandlerFactory<T> factory,
final AeronStream reqStream, final AeronStream rspStream)
{
final Service<T> svc = addService(factory);
final Service<T> svc = addService(factory, reqStream);

return svc.addEndPoint(reqStream, rspStream);
return svc.addEndPoint(rspStream);
}

public <T extends ServiceHandler> Service<T> addService(final ServiceHandlerFactory<T> factory,
public <T extends ServiceHandler> Service<T> addService(final ServiceHandlerFactory<T> factory, final AeronStream reqStream,
final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler)
{
final Service<T> svc = addService(factory);
final Service<T> svc = addService(factory, reqStream);

svc.availableAssociationHandler(availableHandler).unavailableAssociationHandler(unavailableHandler);

Expand All @@ -201,51 +203,49 @@ public <T extends ServiceHandler> Service<T> addService(final ServiceHandlerFact
final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler,
final AeronStream reqStream, final AeronStream rspStream)
{
final Service<T> svc = addService(factory, availableHandler, unavailableHandler);
final Service<T> svc = addService(factory, reqStream, availableHandler, unavailableHandler);

return svc.addEndPoint(reqStream, rspStream);
return svc.addEndPoint(rspStream);
}

public <T extends GatewayHandler> Gateway<T> addGateway(final GatewayHandlerFactory<T> factory)
public <T extends GatewayHandler> Gateway<T> addGateway()
{
Objects.requireNonNull(factory, "factory");

final HeliosGateway<T> gw = new HeliosGateway<>(this, factory);
final HeliosGateway<T> gw = new HeliosGateway<>(this);
gatewayList.add(gw);

if (reporter != null)
{
reporter.addAll(gw.reportList());
reporter.add(gw.report());
}

return gw;
}

public <T extends GatewayHandler> Gateway<T> addGateway(final GatewayHandlerFactory<T> factory,
public <T extends GatewayHandler> T addGateway(final GatewayHandlerFactory<T> factory,
final AeronStream reqStream, final AeronStream rspStream)
{
final Gateway<T> gw = addGateway(factory);
final Gateway<T> gw = addGateway();

return gw.addEndPoint(reqStream, rspStream);
return gw.addEndPoint(reqStream, rspStream, factory);
}

public <T extends GatewayHandler> Gateway<T> addGateway(final GatewayHandlerFactory<T> factory,
final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler)
public <T extends GatewayHandler> Gateway<T> addGateway(final AvailableAssociationHandler availableHandler,
final UnavailableAssociationHandler unavailableHandler)
{
final Gateway<T> gw = addGateway(factory);
final Gateway<T> gw = addGateway();

gw.availableAssociationHandler(availableHandler).unavailableAssociationHandler(unavailableHandler);

return gw;
}

public <T extends GatewayHandler> Gateway<T> addGateway(final GatewayHandlerFactory<T> factory,
public <T extends GatewayHandler> T addGateway(final GatewayHandlerFactory<T> factory,
final AvailableAssociationHandler availableHandler, final UnavailableAssociationHandler unavailableHandler,
final AeronStream reqStream, final AeronStream rspStream)
{
final Gateway<T> gw = addGateway(factory, availableHandler, unavailableHandler);
final Gateway<T> gw = addGateway(availableHandler, unavailableHandler);

return gw.addEndPoint(reqStream, rspStream);
return gw.addEndPoint(reqStream, rspStream, factory);
}

HeliosContext context()
Expand Down
12 changes: 8 additions & 4 deletions src/org/helios/HeliosConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import org.helios.journal.Journalling;
import org.helios.journal.strategy.PositionalJournalling;
import org.agrona.LangUtil;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.SleepingIdleStrategy;
import org.helios.journal.Journalling;
import org.helios.journal.strategy.PositionalJournalling;

import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -49,6 +49,9 @@ public class HeliosConfiguration
public static final long MIN_PARK_NS = getLong("helios.core.back_off.idle.strategy.min_park_ns", 1000);
public static final long MAX_PARK_NS = getLong("helios.core.back_off.idle.strategy.max_park_ns", 100000);

public static final int HEARTBEAT_INTERVAL_MS = getInteger("helios.mmb.heartbeat_interval", 1000000); //1000
public static final int HEARTBEAT_LIVENESS = getInteger("helios.mmb.heartbeat_liveness", 3);

public static Journalling journalStrategy()
{
return newJournalStrategy(JOURNAL_STRATEGY);
Expand Down Expand Up @@ -117,7 +120,8 @@ private static IdleStrategy newIdleStrategy(final String strategyClassName)

if (strategyClassName == null)
{
idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS);
//idleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS, MIN_PARK_NS, MAX_PARK_NS);
idleStrategy = new SleepingIdleStrategy(MAX_PARK_NS);
}
else
{
Expand Down
28 changes: 28 additions & 0 deletions src/org/helios/HeliosContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class HeliosContext
private IdleStrategy publisherIdleStrategy;
private IdleStrategy subscriberIdleStrategy;

private int heartbeatInterval;
private int heartbeatLiveness;

public HeliosContext()
{
setMediaDriverConf(HeliosConfiguration.MEDIA_DRIVER_CONF);
Expand All @@ -38,6 +41,9 @@ public HeliosContext()
setWriteIdleStrategy(HeliosConfiguration.writeIdleStrategy());
setPublisherIdleStrategy(HeliosConfiguration.publisherIdleStrategy());
setSubscriberIdleStrategy(HeliosConfiguration.subscriberIdleStrategy());

setHeartbeatInterval(HeliosConfiguration.HEARTBEAT_INTERVAL_MS);
setHeartbeatLiveness(HeliosConfiguration.HEARTBEAT_LIVENESS);
}

public HeliosContext setMediaDriverConf(String mediaDriverConf)
Expand Down Expand Up @@ -118,6 +124,18 @@ public HeliosContext setSubscriberIdleStrategy(IdleStrategy subscriberIdleStrate
return this;
}

public HeliosContext setHeartbeatInterval(int heartbeatInterval)
{
this.heartbeatInterval = heartbeatInterval;
return this;
}

public HeliosContext setHeartbeatLiveness(int heartbeatLiveness)
{
this.heartbeatLiveness = heartbeatLiveness;
return this;
}

public String getMediaDriverConf()
{
return mediaDriverConf;
Expand Down Expand Up @@ -182,4 +200,14 @@ public IdleStrategy subscriberIdleStrategy()
{
return subscriberIdleStrategy;
}

public int heartbeatInterval()
{
return heartbeatInterval;
}

public int heartbeatLiveness()
{
return heartbeatLiveness;
}
}
8 changes: 7 additions & 1 deletion src/org/helios/HeliosDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ public class HeliosDriver implements Closeable
private final MediaDriver mediaDriver;

public HeliosDriver(final HeliosContext context)
{
this(context, AERON_DIR_NAME_DEFAULT);
}

public HeliosDriver(final HeliosContext context, final String aeronDirectoryName)
{
this(context, new MediaDriver.Context()
.aeronDirectoryName(AERON_DIR_NAME_DEFAULT)
.aeronDirectoryName(aeronDirectoryName)
.termBufferSparseFile(false)
.threadingMode(ThreadingMode.SHARED)
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1))
.receiverIdleStrategy(new NoOpIdleStrategy())
Expand Down
Loading

0 comments on commit abab29a

Please sign in to comment.