Skip to content

Commit

Permalink
Improve connection release handling and improve flaky test (#1092)
Browse files Browse the repository at this point in the history
This update ensures connection release stages are linked. In addition, it improves stability of flaky `RoutingTableAndConnectionPoolTest.shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool` test.
  • Loading branch information
injectives committed Feb 1, 2022
1 parent cae2b9d commit dc0fa7d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static java.util.Collections.emptyMap;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;

/**
* This connection represents a simple network connection to a remote server. It wraps a channel obtained from a connection pool. The life cycle of this
Expand Down Expand Up @@ -189,10 +190,14 @@ public void terminateAndRelease( String reason )
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
{
setTerminationReason( channel, reason );
channel.close();
channelPool.release( channel );
releaseFuture.complete( null );
metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent );
asCompletionStage( channel.close() )
.exceptionally( throwable -> null )
.thenCompose( ignored -> channelPool.release( channel ) )
.whenComplete( ( ignored, throwable ) ->
{
releaseFuture.complete( null );
metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent );
} );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.neo4j.driver.internal.util.Clock;

import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;

public class ChannelReleasingResetResponseHandler extends ResetResponseHandler
{
Expand All @@ -47,18 +49,20 @@ public ChannelReleasingResetResponseHandler( Channel channel, ExtendedChannelPoo
@Override
protected void resetCompleted( CompletableFuture<Void> completionFuture, boolean success )
{
CompletionStage<Void> closureStage;
if ( success )
{
// update the last-used timestamp before returning the channel back to the pool
setLastUsedTimestamp( channel, clock.millis() );
closureStage = completedWithNull();
}
else
{
// close the channel before returning it back to the pool if RESET failed
channel.close();
closureStage = asCompletionStage( channel.close() );
}

CompletionStage<Void> released = pool.release( channel );
released.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) );
closureStage.exceptionally( throwable -> null )
.thenCompose( ignored -> pool.release( channel ) )
.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -43,6 +45,7 @@
import org.neo4j.driver.exceptions.FatalDiscoveryException;
import org.neo4j.driver.exceptions.ProtocolException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseNameUtil;
import org.neo4j.driver.internal.async.connection.BootstrapFactory;
import org.neo4j.driver.internal.async.pool.NettyChannelHealthChecker;
import org.neo4j.driver.internal.async.pool.NettyChannelTracker;
Expand Down Expand Up @@ -85,7 +88,7 @@ class RoutingTableAndConnectionPoolTest
private static final BoltServerAddress D = new BoltServerAddress( "localhost:30003" );
private static final BoltServerAddress E = new BoltServerAddress( "localhost:30004" );
private static final BoltServerAddress F = new BoltServerAddress( "localhost:30005" );
private static final List<BoltServerAddress> SERVERS = new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) );
private static final List<BoltServerAddress> SERVERS = Collections.synchronizedList( new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) ) );

private static final String[] DATABASES = new String[]{"", SYSTEM_DATABASE_NAME, "my database"};

Expand All @@ -94,7 +97,7 @@ class RoutingTableAndConnectionPoolTest
private final Logging logging = none();

@Test
void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable
void shouldAddServerToRoutingTableAndConnectionPool()
{
// Given
ConnectionPool connectionPool = newConnectionPool();
Expand All @@ -114,7 +117,7 @@ void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable
}

@Test
void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable
void shouldNotAddToRoutingTableWhenFailedWithRoutingError()
{
// Given
ConnectionPool connectionPool = newConnectionPool();
Expand All @@ -133,7 +136,7 @@ void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable
}

@Test
void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable
void shouldNotAddToRoutingTableWhenFailedWithProtocolError()
{
// Given
ConnectionPool connectionPool = newConnectionPool();
Expand All @@ -152,7 +155,7 @@ void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable
}

@Test
void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable
void shouldNotAddToRoutingTableWhenFailedWithSecurityError()
{
// Given
ConnectionPool connectionPool = newConnectionPool();
Expand All @@ -171,7 +174,7 @@ void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable
}

@Test
void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable
void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired()
{
// Given
ConnectionPool connectionPool = newConnectionPool();
Expand All @@ -194,7 +197,7 @@ void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable
}

@Test
void shouldRemoveExpiredRoutingTableAndServers() throws Throwable
void shouldRemoveExpiredRoutingTableAndServers()
{
// Given
ConnectionPool connectionPool = newConnectionPool();
Expand All @@ -219,7 +222,7 @@ void shouldRemoveExpiredRoutingTableAndServers() throws Throwable
}

@Test
void shouldRemoveExpiredRoutingTableButNotServer() throws Throwable
void shouldRemoveExpiredRoutingTableButNotServer()
{
// Given
ConnectionPool connectionPool = newConnectionPool();
Expand Down Expand Up @@ -256,7 +259,7 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl
acquireAndReleaseConnections( loadBalancer );
Set<BoltServerAddress> servers = routingTables.allServers();
BoltServerAddress openServer = null;
for( BoltServerAddress server: servers )
for ( BoltServerAddress server : servers )
{
if ( connectionPool.isOpen( server ) )
{
Expand All @@ -268,6 +271,8 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl

// if we remove the open server from servers, then the connection pool should remove the server from the pool.
SERVERS.remove( openServer );
// ensure rediscovery is necessary on subsequent interaction
Arrays.stream( DATABASES ).map( DatabaseNameUtil::database ).forEach( routingTables::remove );
acquireAndReleaseConnections( loadBalancer );

assertFalse( connectionPool.isOpen( openServer ) );
Expand Down Expand Up @@ -368,7 +373,11 @@ public CompletionStage<ClusterCompositionLookupResult> lookupClusterComposition(
}
if ( servers.size() == 0 )
{
servers.add( A );
BoltServerAddress address = SERVERS.stream()
.filter( Objects::nonNull )
.findFirst()
.orElseThrow( () -> new RuntimeException( "No non null server addresses are available" ) );
servers.add( address );
}
ClusterComposition composition = new ClusterComposition( clock.millis() + 1, servers, servers, servers );
return CompletableFuture.completedFuture( new ClusterCompositionLookupResult( composition ) );
Expand Down

0 comments on commit dc0fa7d

Please sign in to comment.