Skip to content

Commit

Permalink
avoid aggressive socket connection error callback (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjlu authored Apr 11, 2024
1 parent eaf54b7 commit 41294c7
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 13 deletions.
3 changes: 3 additions & 0 deletions src/main/java/com/hsbc/cranker/connector/ConnectorSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ enum State {
}

/**
* completion state
* @return true if it's in end state
*/
public boolean isCompleted() {
Expand All @@ -68,11 +69,13 @@ public boolean isCompleted() {
}

/**
* connection state
* @return The current state of this connection
*/
State state();

/**
* cranker connection protocol version
* @return The connector socket's version, e.g. "cranker_3.0", "cranker_1.0"
*/
String version();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ public interface CrankerConnector {
boolean stop(long timeout, TimeUnit timeUnit);

/**
* connectorId
* @return A unique ID assigned to this connector that is provided to the router for diagnostic reasons.
*/
String connectorId();

/**
* @return Meta data about the routers that this connector is connected to. Provided for diagnostic purposes.
* list of the router registration
* @return Metadata about the routers that this connector is connected to. Provided for diagnostic purposes.
*/
List<RouterRegistration> routers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
public class CrankerConnectorBuilder {

/**
* prevent constructing CrankerConnectorBuilder, should use <code>CrankerConnectorBuilder.connector()</code> instead.
*/
private CrankerConnectorBuilder() {}

/**
* cranker protocol 1.0
*/
Expand Down Expand Up @@ -241,6 +246,7 @@ public static HttpClient.Builder createHttpClient(boolean trustAll) {
}

/**
* constructing a new connector builder
* @return A new connector builder
*/
public static CrankerConnectorBuilder connector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
*/
public class RegistrationUriSuppliers {

/**
* preventing constructing RegistrationUriSuppliers, as it's a util class.
*/
private RegistrationUriSuppliers() {}

/**
* Creates a supplier that always returns a fixed set of URIs
* @param uris The URIs to return, in the format <code>wss://crankerrouter.example.org</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,23 @@ class ChangeData {
private final List<RouterRegistration> unchanged;

/**
* The routers that have been newly registered
* @return The routers that have been newly registered
*/
public List<RouterRegistration> added() {
return added;
}

/**
* The routers that are no longer being connected to
* @return The routers that are no longer being connected to
*/
public List<RouterRegistration> removed() {
return removed;
}

/**
* The routers that remain unchanged
* @return The routers that remain unchanged
*/
public List<RouterRegistration> unchanged() {
Expand Down
37 changes: 25 additions & 12 deletions src/main/java/com/hsbc/cranker/connector/RouterRegistration.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
import java.net.http.WebSocket;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand All @@ -21,26 +19,31 @@
public interface RouterRegistration {

/**
* The number of expected idle connections to this router
* @return The number of expected idle connections to this router
*/
int expectedWindowSize();

/**
* The current number of idle connections
* @return The current number of idle connections
*/
int idleSocketSize();

/**
* The sockets that are currently connected to the router, ready to process a request
* @return The sockets that are currently connected to the router, ready to process a request
*/
Collection<ConnectorSocket> idleSockets();

/**
* The router's websocket registration URI
* @return The router's websocket registration URI
*/
URI registrationUri();

/**
* The current state of the connection to this router
* @return The current state of the connection to this router
*/
State state();
Expand All @@ -54,6 +57,7 @@ public interface RouterRegistration {
int currentUnsuccessfulConnectionAttempts();

/**
* last connection error
* @return If this socket is not connected due to an error, then this is the reason; otherwise this is null.
*/
Throwable lastConnectionError();
Expand Down Expand Up @@ -172,6 +176,16 @@ private static String[] getLessPreferredProtocol(List<String> protocols) {
return protocols.size() > 1 ? protocols.subList(1, protocols.size()).toArray(new String[0]) : new String[0];
}

private void addAnythingMissingWithBackoff() {
if (isAddMissingScheduled.compareAndSet(false, true)) {
int attempts = connectAttempts.incrementAndGet();
executor.schedule(() -> {
isAddMissingScheduled.set(false);
addAnyMissing();
}, retryAfterMillis(attempts), TimeUnit.MILLISECONDS);
}
}

private void addAnyMissing() {
while (state == State.ACTIVE && idleSockets.size() < windowSize) {

Expand Down Expand Up @@ -200,23 +214,18 @@ private void addAnyMissing() {
if (routerEventListener != null) {
routerEventListener.onSocketConnectionError(this, throwable);
}
if (isAddMissingScheduled.compareAndSet(false, true)) {
connectAttempts.incrementAndGet();
executor.schedule(() -> {
isAddMissingScheduled.set(false);
addAnyMissing();
}, retryAfterMillis(), TimeUnit.MILLISECONDS);
}
addAnythingMissingWithBackoff();
}
});
}
}

/**
* @return Milliseconds to wait until trying again, increasing exponentially, capped at 10 seconds
* @param retryAttempts retry attempts
*/
private int retryAfterMillis() {
return 500 + Math.min(10000, (int) Math.pow(2, connectAttempts.get()));
private static int retryAfterMillis(int retryAttempts) {
return 500 + Math.min(10000, (int) Math.pow(2, retryAttempts));
}

@Override
Expand All @@ -230,7 +239,11 @@ public void onConnectionAcquired(ConnectorSocket socket) {
public void onClose(ConnectorSocket socket, Throwable error) {
runningSockets.remove(socket);
idleSockets.remove(socket);
addAnyMissing();
if (error == null) {
addAnyMissing();
} else {
addAnythingMissingWithBackoff();
}
}

@Override
Expand Down

0 comments on commit 41294c7

Please sign in to comment.