Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added timeouts for example service discovery #197

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.net.SocketException;
import java.net.URL;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.stream.Stream;

Expand All @@ -30,8 +31,11 @@ public class Discovery {
private static final ObjectMapper JSON = new ObjectMapper();

private final InetSocketAddress group;
private long advertiseTimeout = -1;
private MulticastSocket listenSocket;
private long listenTimeout = -1;
private volatile boolean shouldStop = false;
private volatile boolean listening = false;

/**
* @param group The multicast address on which to advertise and listen for
Expand All @@ -52,13 +56,37 @@ public void stop() {
}
}

/**
* Checks if we're still listening for dependencies
*
* @return <code>true</code> if there still a chance of finding new dependencies
*/
public boolean listening() {
return listening;
}

/**
* Controls how long our services will be advertised for
*
* @param count The number of units. Supply a negative number to advertise
* forever.
* @param unit The unit duration
* @return <code>this</code>
* @see #advertise(String, int, Set)
*/
public Discovery stoppingAfter( int count, TimeUnit unit ) {
advertiseTimeout = unit.toMillis( count );
return this;
}

/**
* Starts advertising the {@link Service}s offered by this {@link Instance}
*
* @param protocol The protocol to use for the advertised services
* @param port The port on which the advertised services are available
* @param services The set of {@link Service} class names to advertise
* @return <code>this</code>
* @see #stoppingAfter(int, TimeUnit)
*/
public Discovery advertise( String protocol, int port, Set<String> services ) {

Expand All @@ -76,7 +104,11 @@ public Discovery advertise( String protocol, int port, Set<String> services ) {
try( MulticastSocket socket = new MulticastSocket( group.getPort() ) ) {
socket.joinGroup( group.getAddress() );

while( !shouldStop ) {
long limit = System.currentTimeMillis() + advertiseTimeout;
boolean limitBreached = false;
while( !shouldStop && !limitBreached ) {
limitBreached = advertiseTimeout > 0 && System.currentTimeMillis() > limit;

if( LOG.isTraceEnabled() ) {
LOG.trace( "Sending {}", new String( advert.getData(), UTF_8 ) );
}
Expand All @@ -103,12 +135,26 @@ public Discovery advertise( String protocol, int port, Set<String> services ) {
return this;
}

/**
* Controls how long we'll wait for dependencies to be found before giving up.
*
* @param count The number of units. Supply a negative number to wait forever.
* @param unit The unit duration
* @return <code>this</code>
* @see #listen(BiPredicate)
*/
public Discovery abortingAfter( int count, TimeUnit unit ) {
listenTimeout = unit.toMillis( count );
return this;
}

/**
* Starts listening for services advertised on the cluster
*
* @param action What to do with discovered remote {@link Service}s. Return true
* if all dependencies are satisfied
* @return <code>this</code>
* @see #abortingAfter(int, TimeUnit)
*/
public Discovery listen( BiPredicate<String, URL> action ) {

Expand All @@ -120,7 +166,16 @@ public Discovery listen( BiPredicate<String, URL> action ) {
socket.joinGroup( group.getAddress() );
// this will turn to true when all of our dependencies have been satisfied
boolean satisfied = false;
long limit = System.currentTimeMillis() + listenTimeout;
if( listenTimeout > 0 ) {
socket.setSoTimeout( (int) listenTimeout );
}
while( !satisfied && !shouldStop ) {

if( listenTimeout > 0 && System.currentTimeMillis() > limit ) {
throw new IllegalStateException( "Missing dependencies after " + listenTimeout + "ms" );
}

socket.receive( pkt );
if( LOG.isTraceEnabled() ) {
LOG.trace( "Received {}", new String( data, pkt.getOffset(), pkt.getLength(), UTF_8 ) );
Expand All @@ -139,14 +194,18 @@ public Discovery listen( BiPredicate<String, URL> action ) {
}
catch( SocketException se ) {
if( !shouldStop ) {
listening = false;
throw new UncheckedIOException( se );
}
}
catch( IOException ioe ) {
listening = false;
throw new UncheckedIOException( ioe );
}
listening = false;
}, "Discovery listen" );
listen.setDaemon( true );
listening = true;
listen.start();

return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mastercard.test.flow.example.framework;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toCollection;

Expand Down Expand Up @@ -172,6 +173,7 @@ public Instance start() {
// the rest will have to come from our cluster peers.
// start advertising our own services...
Discovery discovery = new Discovery( cluster )
.stoppingAfter( 30, SECONDS )
.advertise(
"http", port(),
services.stream()
Expand All @@ -181,8 +183,9 @@ public Instance start() {

// ... and listening for those that we require
if( !required.isEmpty() ) {
discovery.listen(
( typeName, rmtUrl ) -> {
discovery
.abortingAfter( 10, SECONDS )
.listen( ( typeName, rmtUrl ) -> {
@SuppressWarnings("unchecked")
Class<? extends Service> type = Optional.of( typeName )
.map( n -> {
Expand Down Expand Up @@ -211,7 +214,7 @@ public Instance start() {
}

// now we wait until our dependencies have been satisfied
while( !required.isEmpty() ) {
while( discovery.listening() ) {
try {
synchronized( required ) {
if( !required.isEmpty() ) {
Expand All @@ -221,7 +224,7 @@ public Instance start() {
.sorted()
.collect( joining() ) );
}
required.wait();
required.wait( 1000 );
}
}
}
Expand All @@ -232,6 +235,14 @@ public Instance start() {
}
}

if( !required.isEmpty() ) {
String missing = required.stream()
.map( c -> "\n " + c.getName() )
.sorted()
.collect( joining() );
LOG.error( "Instance incomplete! Missing{}", missing );
throw new IllegalStateException( "Missing dependencies:" + missing );
}
LOG.info( "Instance complete" );
return this;
}
Expand Down