diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index c88d4083c4..8c7254440e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -64,7 +64,8 @@ public boolean isStaleFor( AccessMode mode ) mode == AccessMode.WRITE && writers.size() == 0; } - private Set servers() + @Override + public Set servers() { Set servers = new HashSet<>(); servers.addAll( readers.servers() ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index 5679bf2ebf..273e6f0d3b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -110,16 +110,13 @@ private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSe private synchronized void forget( BoltServerAddress address ) { - // First remove from the load balancer, to prevent concurrent threads from making connections to them. + // remove from the routing table, to prevent concurrent threads from making connections to this address routingTable.forget( address ); + if ( PURGE_ON_ERROR ) { connections.purge( address ); } - else - { - connections.deactivate( address ); - } } synchronized void ensureRouting( AccessMode mode ) @@ -153,15 +150,7 @@ private void updateConnectionPool( RoutingTableChange routingTableChange ) } else { - for ( BoltServerAddress addedAddress : routingTableChange.added() ) - { - connections.activate( addedAddress ); - } - for ( BoltServerAddress removedAddress : routingTableChange.removed() ) - { - connections.deactivate( removedAddress ); - } - connections.compact(); + connections.retainAll( routingTable.servers() ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 0e60500011..c12c8668d0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.Set; + import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.v1.AccessMode; @@ -37,5 +39,7 @@ public interface RoutingTable int routerSize(); + Set servers(); + void removeWriter( BoltServerAddress toRemove ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java index dfe5a72042..7241dda4b5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -23,7 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.logging.DelegatingLogger; import org.neo4j.driver.internal.net.BoltServerAddress; @@ -40,15 +40,11 @@ public class BlockingPooledConnectionQueue { public static final String LOG_NAME = "ConnectionQueue"; - private static final int ACTIVE = 1; - private static final int INACTIVE = 2; - private static final int TERMINATED = 3; - /** The backing queue, keeps track of connections currently in queue */ private final BlockingQueue queue; private final Logger logger; - private final AtomicInteger state = new AtomicInteger( ACTIVE ); + private final AtomicBoolean terminated = new AtomicBoolean(); /** Keeps track of acquired connections */ private final Set acquiredConnections = @@ -75,7 +71,7 @@ public boolean offer( PooledConnection pooledConnection ) { disposeSafely( pooledConnection ); } - if ( state.get() != ACTIVE ) + if ( terminated.get() ) { terminateIdleConnections(); } @@ -96,13 +92,11 @@ public PooledConnection acquire( Supplier supplier ) } acquiredConnections.add( connection ); - int poolState = state.get(); - if ( poolState != ACTIVE ) + if ( terminated.get() ) { acquiredConnections.remove( connection ); disposeSafely( connection ); - throw new IllegalStateException( "Pool is " + (poolState == INACTIVE ? "deactivated" : "terminated") + - ", new connections can't be acquired" ); + throw new IllegalStateException( "Pool is terminated, new connections can't be acquired" ); } else { @@ -131,24 +125,6 @@ public boolean contains( PooledConnection pooledConnection ) return queue.contains( pooledConnection ); } - public void activate() - { - state.compareAndSet( INACTIVE, ACTIVE ); - } - - public void deactivate() - { - if ( state.compareAndSet( ACTIVE, INACTIVE ) ) - { - terminateIdleConnections(); - } - } - - public boolean isActive() - { - return state.get() == ACTIVE; - } - /** * Terminates all connections, both those that are currently in the queue as well * as those that have been acquired. @@ -157,7 +133,7 @@ public boolean isActive() */ public void terminate() { - if ( state.getAndSet( TERMINATED ) != TERMINATED ) + if ( terminated.compareAndSet( false, true ) ) { terminateIdleConnections(); terminateAcquiredConnections(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index c197c2ccf3..d9187c38e4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -33,6 +32,7 @@ import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; /** @@ -61,6 +61,7 @@ public class SocketConnectionPool implements ConnectionPool private final ConnectionValidator connectionValidator; private final Clock clock; private final Logging logging; + private final Logger log; public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clock clock, Logging logging ) { @@ -69,6 +70,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo this.connectionValidator = new PooledConnectionValidator( this ); this.clock = clock; this.logging = logging; + this.log = logging.getLog( getClass().getSimpleName() ); } @Override @@ -93,37 +95,25 @@ public void purge( BoltServerAddress address ) } @Override - public void activate( BoltServerAddress address ) - { - BlockingPooledConnectionQueue connectionQueue = pools.get( address ); - if ( connectionQueue != null ) - { - connectionQueue.activate(); - } - } - - @Override - public void deactivate( BoltServerAddress address ) - { - BlockingPooledConnectionQueue connections = pools.get( address ); - if ( connections != null ) - { - connections.deactivate(); - } - } - - @Override - public void compact() + public void retainAll( Set addressesToRetain ) { for ( Map.Entry entry : pools.entrySet() ) { BoltServerAddress address = entry.getKey(); - BlockingPooledConnectionQueue queue = entry.getValue(); + BlockingPooledConnectionQueue pool = entry.getValue(); - if ( !queue.isActive() && queue.activeConnections() == 0 ) + if ( !addressesToRetain.contains( address ) && pool.activeConnections() == 0 ) { - // queue has been in deactivated state and has no open connections by now - pools.remove( address ); + // address is not present in the updated routing table and has no active connections + // it's now safe to terminate corresponding connection pool and forget about it + + BlockingPooledConnectionQueue removedPool = pools.remove( address ); + if ( removedPool != null ) + { + log.info( "Closing connection pool towards %s, it has no active connections " + + "and is not in the routing table", address ); + removedPool.terminate(); + } } } } @@ -131,8 +121,7 @@ public void compact() @Override public boolean hasAddress( BoltServerAddress address ) { - BlockingPooledConnectionQueue connectionQueue = pools.get( address ); - return connectionQueue != null && connectionQueue.isActive(); + return pools.containsKey( address ); } @Override @@ -152,17 +141,7 @@ public void close() public int activeConnections( BoltServerAddress address ) { BlockingPooledConnectionQueue connectionQueue = pools.get( address ); - if ( connectionQueue == null || !connectionQueue.isActive() ) - { - return 0; - } - return connectionQueue.activeConnections(); - } - - // test-only accessor - Set addresses() - { - return Collections.unmodifiableSet( pools.keySet() ); + return connectionQueue == null ? 0 : connectionQueue.activeConnections(); } private BlockingPooledConnectionQueue pool( BoltServerAddress address ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java b/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java index 6886d45fe8..abfca9fe02 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java +++ b/driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java @@ -372,6 +372,8 @@ private HandshakeStatus wrap( ByteBuffer buffer ) throws IOException, ClientExce cipherOut.compact(); } break; + case CLOSED: + throw new IOException( "TLS socket channel is closed" ); default: throw new ClientException( "Got unexpected status " + status ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java index 2efd55e9d4..3481d0d677 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.spi; +import java.util.Set; + import org.neo4j.driver.internal.net.BoltServerAddress; public interface ConnectionPool extends AutoCloseable @@ -36,11 +38,7 @@ public interface ConnectionPool extends AutoCloseable */ void purge( BoltServerAddress address ); - void activate( BoltServerAddress address ); - - void deactivate( BoltServerAddress address ); - - void compact(); + void retainAll( Set addressesToRetain ); boolean hasAddress( BoltServerAddress address ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java index 9cc091ce7a..6d5afa1a37 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java @@ -18,7 +18,7 @@ */ package org.neo4j.driver.internal.cluster; -import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -41,7 +41,7 @@ private ClusterCompositionUtil() {} public static final BoltServerAddress E = new BoltServerAddress( "5555:55" ); public static final BoltServerAddress F = new BoltServerAddress( "6666:66" ); - public static final List EMPTY = new ArrayList<>(); + public static final List EMPTY = Collections.emptyList(); public static final ClusterComposition VALID_CLUSTER_COMPOSITION = createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java index 0c39d63a97..b51ca8cc82 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -20,7 +20,9 @@ import org.junit.Test; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.util.FakeClock; @@ -231,4 +233,25 @@ public void shouldNotRemoveServerIfPreWriterNowReader() assertEquals( 2, change.removed().size() ); assertThat( change.removed(), containsInAnyOrder( A, C ) ); } + + @Test + public void shouldReturnNoServersWhenEmpty() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + + Set servers = routingTable.servers(); + + assertEquals( 0, servers.size() ); + } + + @Test + public void shouldReturnAllServers() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + routingTable.update( createClusterComposition( asList( A, B, C ), asList( B, C, D ), asList( C, D, E, F ) ) ); + + Set servers = routingTable.servers(); + + assertEquals( new HashSet<>( asList( A, B, C, D, E, F ) ), servers ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index cab63b966a..1db51b93dd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -36,6 +36,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.internal.util.SleeplessClock; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; @@ -44,6 +45,7 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.instanceOf; @@ -62,6 +64,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.B; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.C; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.D; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.E; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.EMPTY; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.F; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.createClusterComposition; import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; @@ -71,7 +81,7 @@ public class LoadBalancerTest { @Test - public void ensureRoutingShouldUpdateRoutingTableAndDeactivateConnectionPoolWhenStale() throws Exception + public void ensureRoutingShouldUpdateRoutingTableWhenStale() throws Exception { // given ConnectionPool conns = mock( ConnectionPool.class ); @@ -89,7 +99,6 @@ public void ensureRoutingShouldUpdateRoutingTableAndDeactivateConnectionPoolWhen InOrder inOrder = inOrder( rediscovery, routingTable, conns ); inOrder.verify( rediscovery ).lookupClusterComposition( routingTable, conns ); inOrder.verify( routingTable ).update( any( ClusterComposition.class ) ); - inOrder.verify( conns ).deactivate( new BoltServerAddress( "abc", 12 ) ); } @Test @@ -147,7 +156,7 @@ public void shouldAcquireReaderOrWriterConn() throws Exception } @Test - public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosingTx() + public void shouldForgetAddressOnServiceUnavailableWhileClosingTx() { RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( RoutingTableChange.EMPTY ); @@ -172,11 +181,10 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing } verify( routingTable ).forget( address ); - verify( connectionPool ).deactivate( address ); } @Test - public void shouldForgetAddressAndItsIdleConnectionsOnServiceUnavailableWhileClosingSession() + public void shouldForgetAddressOnServiceUnavailableWhileClosingSession() { RoutingTable routingTable = mock( RoutingTable.class, RETURNS_MOCKS ); ConnectionPool connectionPool = mock( ConnectionPool.class ); @@ -193,7 +201,6 @@ public void shouldForgetAddressAndItsIdleConnectionsOnServiceUnavailableWhileClo session.close(); verify( routingTable ).forget( address ); - verify( connectionPool ).deactivate( address ); } @Test @@ -256,6 +263,27 @@ public void shouldThrowWhenRediscoveryReturnsNoSuitableServers() } } + @Test + public void shouldRetainConnectionsToAllAddressesFromReceivedRoutingTable() + { + ConnectionPool connections = mock( ConnectionPool.class ); + when( connections.acquire( LOCAL_DEFAULT ) ).thenReturn( mock( PooledConnection.class ) ); + + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock(), A ); + // initally a stale routing table without readers + ClusterComposition initalComposition = createClusterComposition( asList( A, B ), asList( B, C ), EMPTY ); + // then valid routing table with everything + ClusterComposition nextComposition = createClusterComposition( asList( A, B, C ), asList( B, C, D ), asList( C, D, E, F ) ); + Rediscovery rediscovery = newRediscoveryMock( initalComposition, nextComposition ); + + LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER ); + loadBalancer.acquireConnection( READ ); // requires rediscovery + + // rediscovery should be performed when load balancer is created and on read connection acquisition + verify( rediscovery, times( 2 ) ).lookupClusterComposition( routingTable, connections ); + verify( connections ).retainAll( new HashSet( asList( A, B, C, D, E, F ) ) ); + } + private void testRediscoveryWhenStale( AccessMode mode ) { ConnectionPool connections = mock( ConnectionPool.class ); @@ -350,11 +378,16 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) private static Rediscovery newRediscoveryMock() { - Rediscovery rediscovery = mock( Rediscovery.class ); - Set noServers = Collections.emptySet(); + Set noServers = Collections.emptySet(); ClusterComposition clusterComposition = new ClusterComposition( 1, noServers, noServers, noServers ); + return newRediscoveryMock( clusterComposition ); + } + + private static Rediscovery newRediscoveryMock( ClusterComposition initialComposition, ClusterComposition... otherCompositions ) + { + Rediscovery rediscovery = mock( Rediscovery.class ); when( rediscovery.lookupClusterComposition( any( RoutingTable.class ), any( ConnectionPool.class ) ) ) - .thenReturn( clusterComposition ); + .thenReturn( initialComposition, otherCompositions ); return rediscovery; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java index 932359fe0e..14d65be2cb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java @@ -46,7 +46,6 @@ import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -232,7 +231,7 @@ private void verifyServiceUnavailableHandling( Connection connection, RoutingTab assertThat( routingTable, not( containsRouter( address ) ) ); assertThat( routingTable, not( containsReader( address ) ) ); assertThat( routingTable, not( containsWriter( address ) ) ); - assertFalse( connectionPool.hasAddress( address ) ); + assertTrue( connectionPool.hasAddress( address ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java index 665b24a69f..264785f602 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -28,14 +28,11 @@ import org.neo4j.driver.v1.Logging; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_MOCKS; @@ -302,139 +299,6 @@ public void shouldDisposeBrokenConnections() verify( connection ).dispose(); } - @Test - public void shouldFailToAcquireConnectionWhenDeactivated() - { - Supplier connectionSupplier = connectionSupplierMock(); - when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.deactivate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is deactivated" ) ); - } - } - - @Test - public void shouldTerminateOfferedConnectionWhenDeactivated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.deactivate(); - - PooledConnection connection = mock( PooledConnection.class ); - queue.offer( connection ); - - verify( connection ).dispose(); - } - - @Test - public void shouldBeActiveWhenNotDeactivatedAndNotTerminated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - } - - @Test - public void shouldNotBeActiveWhenDeactivated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.deactivate(); - assertFalse( queue.isActive() ); - } - - @Test - public void shouldNotBeActiveWhenTerminated() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.terminate(); - assertFalse( queue.isActive() ); - } - - @Test - public void shouldBeActiveAfterDeactivationAndActivation() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.deactivate(); - assertFalse( queue.isActive() ); - queue.activate(); - assertTrue( queue.isActive() ); - } - - @Test - public void shouldNotBeActiveAfterTerminationAndActivation() - { - BlockingPooledConnectionQueue queue = newConnectionQueue( 1 ); - assertTrue( queue.isActive() ); - queue.terminate(); - assertFalse( queue.isActive() ); - queue.activate(); - assertFalse( queue.isActive() ); - } - - @Test - public void shouldBePossibleToAcquireFromActivatedQueue() - { - Supplier connectionSupplier = connectionSupplierMock(); - when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.deactivate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is deactivated" ) ); - } - - queue.activate(); - - assertNotNull( queue.acquire( connectionSupplier ) ); - } - - @Test - public void shouldNotBePossibleToActivateTerminatedQueue() - { - Supplier connectionSupplier = connectionSupplierMock(); - when( connectionSupplier.get() ).thenReturn( mock( PooledConnection.class ) ); - BlockingPooledConnectionQueue queue = newConnectionQueue( 3 ); - queue.terminate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is terminated" ) ); - } - - queue.activate(); - - try - { - queue.acquire( connectionSupplier ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertThat( e.getMessage(), startsWith( "Pool is terminated" ) ); - } - assertFalse( queue.isActive() ); - } - private static BlockingPooledConnectionQueue newConnectionQueue( int capacity ) { return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) ); @@ -444,10 +308,4 @@ private static BlockingPooledConnectionQueue newConnectionQueue( int capacity, L { return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, capacity, logging ); } - - @SuppressWarnings( "unchecked" ) - private static Supplier connectionSupplierMock() - { - return mock( Supplier.class ); - } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java index 7144cf9538..9fe6ff7d18 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java @@ -23,11 +23,14 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -47,7 +50,6 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; -import static java.util.Arrays.asList; import static java.util.Collections.newSetFromMap; import static java.util.Collections.singleton; import static org.hamcrest.Matchers.instanceOf; @@ -79,6 +81,7 @@ public class SocketConnectionPoolTest private static final BoltServerAddress ADDRESS_1 = LOCAL_DEFAULT; private static final BoltServerAddress ADDRESS_2 = new BoltServerAddress( "localhost", DEFAULT_PORT + 42 ); private static final BoltServerAddress ADDRESS_3 = new BoltServerAddress( "localhost", DEFAULT_PORT + 4242 ); + private static final BoltServerAddress ADDRESS_4 = new BoltServerAddress( "localhost", DEFAULT_PORT + 424242 ); @Test public void acquireCreatesNewConnectionWhenPoolIsEmpty() @@ -110,55 +113,6 @@ public void acquireUsesExistingConnectionIfPresent() verify( connector ).connect( ADDRESS_1 ); } - @Test - public void deactivateDoesNothingForNonExistingAddress() - { - Connection connection = newConnectionMock( ADDRESS_1 ); - SocketConnectionPool pool = newPool( newMockConnector( connection ) ); - - pool.acquire( ADDRESS_1 ).close(); - - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - pool.deactivate( ADDRESS_2 ); - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - } - - @Test - public void deactivateRemovesAddress() - { - Connection connection = newConnectionMock( ADDRESS_1 ); - SocketConnectionPool pool = newPool( newMockConnector( connection ) ); - - pool.acquire( ADDRESS_1 ).close(); - - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - pool.deactivate( ADDRESS_1 ); - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - } - - @Test - public void deactivateTerminatesIdleConnectionsInThePoolCorrespondingToTheAddress() - { - Connection connection1 = newConnectionMock( ADDRESS_1 ); - Connection connection2 = newConnectionMock( ADDRESS_1 ); - Connection connection3 = newConnectionMock( ADDRESS_1 ); - SocketConnectionPool pool = newPool( newMockConnector( connection1, connection2, connection3 ) ); - - Connection pooledConnection1 = pool.acquire( ADDRESS_1 ); - Connection pooledConnection2 = pool.acquire( ADDRESS_1 ); - pool.acquire( ADDRESS_1 ); - - // return two connections to the pool - pooledConnection1.close(); - pooledConnection2.close(); - - pool.deactivate( ADDRESS_1 ); - - verify( connection1 ).close(); - verify( connection2 ).close(); - verify( connection3, never() ).close(); - } - @Test public void hasAddressReturnsFalseWhenPoolIsEmpty() { @@ -591,85 +545,37 @@ public void shouldForgetIdleConnection() } @Test - public void shouldDeactivateExistingPool() + public void shouldRetainAllGivenAddresses() { - SocketConnectionPool pool = newPool( newMockConnector() ); - - assertNotNull( pool.acquire( ADDRESS_1 ) ); - assertTrue( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 1, pool.activeConnections( ADDRESS_1 ) ); - - pool.deactivate( ADDRESS_1 ); - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - } - - @Test - public void shouldDeactivateNothingWhenPoolDoesNotExist() - { - SocketConnectionPool pool = newPool( newMockConnector() ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - - pool.deactivate( ADDRESS_1 ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - } + SocketConnectionPool pool = newPool( newMockConnector(), new FakeClock(), 42 ); - @Test - public void shouldActivateExistingPool() - { - SocketConnectionPool pool = newPool( newMockConnector() ); - assertNotNull( pool.acquire( ADDRESS_1 ) ); + pool.acquire( ADDRESS_1 ).close(); + pool.acquire( ADDRESS_2 ).close(); + pool.acquire( ADDRESS_3 ); - pool.deactivate( ADDRESS_1 ); - pool.activate( ADDRESS_1 ); + pool.retainAll( new HashSet( Arrays.asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ) ); assertTrue( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 1, pool.activeConnections( ADDRESS_1 ) ); - } - - @Test - public void shouldActivateNothingWhenPoolDoesNotExist() - { - SocketConnectionPool pool = newPool( newMockConnector() ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); - - pool.activate( ADDRESS_1 ); - - assertFalse( pool.hasAddress( ADDRESS_1 ) ); - assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + assertTrue( pool.hasAddress( ADDRESS_2 ) ); + assertTrue( pool.hasAddress( ADDRESS_3 ) ); } @Test - public void shouldRemoveDeactivatedPoolsWithoutConnectionsWhenCompacting() + public void shouldRemoveAllNonRetainedAddressesWithoutActiveConnections() { - SocketConnectionPool pool = newPool( newMockConnector( newConnectionMock( ADDRESS_1 ), - newConnectionMock( ADDRESS_1 ), newConnectionMock( ADDRESS_2 ), newConnectionMock( ADDRESS_3 ) ) ); - - PooledConnection connection1 = pool.acquire( ADDRESS_1 ); - PooledConnection connection2 = pool.acquire( ADDRESS_1 ); - PooledConnection connection3 = pool.acquire( ADDRESS_2 ); - PooledConnection connection4 = pool.acquire( ADDRESS_3 ); + SocketConnectionPool pool = newPool( newMockConnector(), new FakeClock(), 42 ); - assertEquals( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ), pool.addresses() ); - - pool.deactivate( ADDRESS_1 ); - pool.deactivate( ADDRESS_3 ); - - connection1.close(); - connection2.close(); - connection4.close(); - - assertEquals( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ), pool.addresses() ); + pool.acquire( ADDRESS_1 ).close(); + pool.acquire( ADDRESS_2 ).close(); + pool.acquire( ADDRESS_3 ); + pool.acquire( ADDRESS_4 ).close(); - pool.compact(); + pool.retainAll( singleton( ADDRESS_1 ) ); - assertEquals( singleton( ADDRESS_2 ), pool.addresses() ); + assertTrue( pool.hasAddress( ADDRESS_1 ) ); + assertFalse( pool.hasAddress( ADDRESS_2 ) ); + assertTrue( pool.hasAddress( ADDRESS_3 ) ); + assertFalse( pool.hasAddress( ADDRESS_4 ) ); } private static Answer createConnectionAnswer( final Set createdConnections ) @@ -711,7 +617,23 @@ private static Connector newMockConnector() private static Connector newMockConnector( Connection connection, Connection... otherConnections ) { Connector connector = mock( Connector.class ); - when( connector.connect( any( BoltServerAddress.class ) ) ).thenReturn( connection, otherConnections ); + + final Queue connectionsToReturn = new ArrayDeque<>(); + connectionsToReturn.add( connection ); + Collections.addAll( connectionsToReturn, otherConnections ); + + when( connector.connect( any( BoltServerAddress.class ) ) ).thenAnswer( new Answer() + { + @Override + public Connection answer( InvocationOnMock invocation ) throws Throwable + { + BoltServerAddress address = invocation.getArgumentAt( 0, BoltServerAddress.class ); + Connection connectionToReturn = connectionsToReturn.size() == 1 ? connectionsToReturn.peek() : connectionsToReturn.poll(); + when( connectionToReturn.boltServerAddress() ).thenReturn( address ); + return connectionToReturn; + } + } ); + return connector; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java b/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java index 7369434906..2c98d225f2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/security/TrustOnFirstUseTrustManagerTest.java @@ -54,7 +54,7 @@ public class TrustOnFirstUseTrustManagerTest private String knownServer; @Rule - public TemporaryFolder testDir = new TemporaryFolder(); + public TemporaryFolder testDir = new TemporaryFolder( new File( "target" ) ); private X509Certificate knownCertificate; @Before diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java index da20f1aecb..c093cd177f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal.util; import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +31,7 @@ public class ConnectionTrackingDriverFactory extends DriverFactoryWithClock { - private final Set connections = - Collections.newSetFromMap( new ConcurrentHashMap() ); + private final Set connections = Collections.newSetFromMap( new ConcurrentHashMap() ); public ConnectionTrackingDriverFactory( Clock clock ) { @@ -39,8 +39,7 @@ public ConnectionTrackingDriverFactory( Clock clock ) } @Override - protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan, - Logging logging ) + protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging ) { Connector connector = super.createConnector( connectionSettings, securityPlan, logging ); return new ConnectionTrackingConnector( connector, connections ); @@ -48,10 +47,11 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu public void closeConnections() { - for ( Connection connection : connections ) + Set connectionsSnapshot = new HashSet<>( connections ); + connections.clear(); + for ( Connection connection : connectionsSnapshot ) { connection.close(); } - connections.clear(); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index c8e0e256af..e04f52d023 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.v1.integration; +import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -29,12 +30,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.retry.RetrySettings; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.ConnectionTrackingDriverFactory; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.internal.util.ThrowingConnection; @@ -44,11 +45,10 @@ import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; -import org.neo4j.driver.v1.Logger; -import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.StatementRunner; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Values; @@ -62,17 +62,22 @@ import org.neo4j.driver.v1.util.cc.ClusterMemberRole; import org.neo4j.driver.v1.util.cc.ClusterRule; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.v1.Values.parameters; +import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; public class CausalClusteringIT { @@ -81,6 +86,17 @@ public class CausalClusteringIT @Rule public final ClusterRule clusterRule = new ClusterRule(); + private ExecutorService executor; + + @After + public void tearDown() + { + if ( executor != null ) + { + executor.shutdownNow(); + } + } + @Test public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception { @@ -228,7 +244,7 @@ public void shouldDropBrokenOldSessions() throws Exception int livenessCheckTimeoutMinutes = 2; Config config = Config.build() - .withConnectionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) + .withConnectionLivenessCheckTimeout( livenessCheckTimeoutMinutes, MINUTES ) .withoutEncryption() .toConfig(); @@ -237,10 +253,9 @@ public void shouldDropBrokenOldSessions() throws Exception URI routingUri = cluster.leader().getRoutingUri(); AuthToken auth = clusterRule.getDefaultAuthToken(); - RoutingSettings routingSettings = new RoutingSettings( 1, SECONDS.toMillis( 5 ), null ); RetrySettings retrySettings = RetrySettings.DEFAULT; - try ( Driver driver = driverFactory.newInstance( routingUri, auth, routingSettings, retrySettings, config ) ) + try ( Driver driver = driverFactory.newInstance( routingUri, auth, defaultRoutingSettings(), retrySettings, config ) ) { // create nodes in different threads using different sessions createNodesInDifferentThreads( concurrentSessionsCount, driver ); @@ -248,7 +263,7 @@ public void shouldDropBrokenOldSessions() throws Exception // now pool contains many sessions, make them all invalid driverFactory.closeConnections(); // move clock forward more than configured liveness check timeout - clock.progress( TimeUnit.MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) ); + clock.progress( MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) ); // now all idle connections should be considered too old and will be verified during acquisition // they will appear broken because they were closed and new valid connection will be created @@ -464,7 +479,7 @@ public void shouldAcceptMultipleBookmarks() throws Exception Cluster cluster = clusterRule.getCluster(); ClusterMember leader = cluster.leader(); - ExecutorService executor = Executors.newCachedThreadPool(); + executor = newExecutor(); try ( Driver driver = createDriver( leader.getRoutingUri() ) ) { @@ -486,7 +501,7 @@ public void shouldAcceptMultipleBookmarks() throws Exception try ( Session session = driver.session( AccessMode.READ, bookmarks ) ) { int count = countNodes( session, label, property, value ); - assertEquals( count, threadCount ); + assertEquals( threadCount, count ); } } } @@ -498,11 +513,8 @@ public void shouldAllowExistingTransactionToCompleteAfterDifferentConnectionBrea ClusterMember leader = cluster.leader(); ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory(); - RoutingSettings routingSettings = new RoutingSettings( 1, SECONDS.toMillis( 5 ), null ); - Config config = Config.build().toConfig(); - try ( Driver driver = driverFactory.newInstance( leader.getRoutingUri(), clusterRule.getDefaultAuthToken(), - routingSettings, RetrySettings.DEFAULT, config ) ) + defaultRoutingSettings(), RetrySettings.DEFAULT, configWithoutLogging() ) ) { Session session1 = driver.session(); Transaction tx1 = session1.beginTransaction(); @@ -536,6 +548,111 @@ public void shouldAllowExistingTransactionToCompleteAfterDifferentConnectionBrea } } + @Test + public void shouldRediscoverWhenConnectionsToAllCoresBreak() + { + Cluster cluster = clusterRule.getCluster(); + ClusterMember leader = cluster.leader(); + + ThrowingConnectionDriverFactory driverFactory = new ThrowingConnectionDriverFactory(); + try ( Driver driver = driverFactory.newInstance( leader.getRoutingUri(), clusterRule.getDefaultAuthToken(), + defaultRoutingSettings(), RetrySettings.DEFAULT, configWithoutLogging() ) ) + { + try ( Session session = driver.session() ) + { + createNode( session, "Person", "name", "Vision" ); + + // force driver to connect to every cluster member + for ( int i = 0; i < cluster.members().size(); i++ ) + { + assertEquals( 1, countNodes( session, "Person", "name", "Vision" ) ); + } + } + + // now driver should have connection pools towards every cluster member + // make all those connections throw and seem broken + for ( ThrowingConnection connection : driverFactory.getConnections() ) + { + connection.setNextRunFailure( new ServiceUnavailableException( "Disconnected" ) ); + } + + // observe that connection towards writer is broken + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + try + { + runCreateNode( session, "Person", "name", "Vision" ); + fail( "Exception expected" ); + } + catch ( SessionExpiredException e ) + { + assertEquals( "Disconnected", e.getCause().getMessage() ); + } + } + + // probe connections to all readers + int readersCount = cluster.followers().size() + cluster.readReplicas().size(); + for ( int i = 0; i < readersCount; i++ ) + { + try ( Session session = driver.session( AccessMode.READ ) ) + { + runCountNodes( session, "Person", "name", "Vision" ); + } + catch ( Throwable ignore ) + { + } + } + + try ( Session session = driver.session() ) + { + updateNode( session, "Person", "name", "Vision", "Thanos" ); + assertEquals( 0, countNodes( session, "Person", "name", "Vision" ) ); + assertEquals( 1, countNodes( session, "Person", "name", "Thanos" ) ); + } + } + } + + @Test + public void shouldKeepOperatingWhenConnectionsBreak() throws Exception + { + String label = "Person"; + String property = "name"; + String value = "Tony Stark"; + Cluster cluster = clusterRule.getCluster(); + + ConnectionTrackingDriverFactory driverFactory = new ConnectionTrackingDriverFactory( Clock.SYSTEM ); + AtomicBoolean stop = new AtomicBoolean(); + executor = newExecutor(); + + try ( Driver driver = driverFactory.newInstance( cluster.leader().getRoutingUri(), clusterRule.getDefaultAuthToken(), + defaultRoutingSettings(), RetrySettings.DEFAULT, configWithoutLogging() ) ) + { + List> results = new ArrayList<>(); + + // launch writers and readers that use transaction functions and thus should never fail + for ( int i = 0; i < 3; i++ ) + { + results.add( executor.submit( countNodesCallable( driver, label, property, value, stop ) ) ); + } + for ( int i = 0; i < 2; i++ ) + { + results.add( executor.submit( createNodesCallable( driver, label, property, value, stop ) ) ); + } + + // terminate connections while reads and writes are in progress + long deadline = System.currentTimeMillis() + MINUTES.toMillis( 1 ); + while ( System.currentTimeMillis() < deadline && !stop.get() ) + { + driverFactory.closeConnections(); + SECONDS.sleep( 5 ); // sleep a bit to allow readers and writers to progress + } + stop.set( true ); + + awaitAll( results ); // readers and writers should stop + assertThat( countNodes( driver.session(), label, property, value ), greaterThan( 0 ) ); // some nodes should be created + } + } + private static void closeTx( Transaction tx ) { tx.success(); @@ -713,36 +830,19 @@ else if ( role == ClusterMemberRole.READ_REPLICA ) private Driver createDriver( URI boltUri ) { - Logging devNullLogging = new Logging() - { - @Override - public Logger getLog( String name ) - { - return DevNullLogger.DEV_NULL_LOGGER; - } - }; - - Config config = Config.build() - .withLogging( devNullLogging ) - .toConfig(); - - return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), config ); + return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), configWithoutLogging() ); } private Driver discoverDriver( List routingUris ) { - Config config = Config.build() - .withLogging( DEV_NULL_LOGGING ) - .toConfig(); - - return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), config ); + return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), configWithoutLogging() ); } private static void createNodesInDifferentThreads( int count, final Driver driver ) throws Exception { final CountDownLatch beforeRunLatch = new CountDownLatch( count ); final CountDownLatch runQueryLatch = new CountDownLatch( 1 ); - final ExecutorService executor = Executors.newCachedThreadPool(); + final ExecutorService executor = newExecutor(); for ( int i = 0; i < count; i++ ) { @@ -766,7 +866,7 @@ public Void call() throws Exception runQueryLatch.countDown(); executor.shutdown(); - assertTrue( executor.awaitTermination( 1, TimeUnit.MINUTES ) ); + assertTrue( executor.awaitTermination( 1, MINUTES ) ); } private static void closeAndExpectException( AutoCloseable closeable, Class exceptionClass ) @@ -789,9 +889,7 @@ private static int countNodes( Session session, final String label, final String @Override public Integer execute( Transaction tx ) { - StatementResult result = tx.run( "MATCH (n:" + label + " {" + property + ": $value}) RETURN count(n)", - parameters( "value", value ) ); - return result.single().get( 0 ).asInt(); + return runCountNodes( tx, label, property, value ); } } ); } @@ -827,4 +925,115 @@ public Void execute( Transaction tx ) return localSession.lastBookmark(); } } + + private static Callable createNodesCallable( final Driver driver, final String label, final String property, final String value, + final AtomicBoolean stop ) + { + return new Callable() + { + @Override + public Void call() throws Exception + { + while ( !stop.get() ) + { + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + createNode( session, label, property, value ); + } + catch ( Throwable t ) + { + stop.set( true ); + throw t; + } + } + return null; + } + }; + } + + private static Callable countNodesCallable( final Driver driver, final String label, final String property, final String value, + final AtomicBoolean stop ) + { + return new Callable() + { + @Override + public Void call() throws Exception + { + while ( !stop.get() ) + { + try ( Session session = driver.session( AccessMode.READ ) ) + { + countNodes( session, label, property, value ); + } + catch ( Throwable t ) + { + stop.set( true ); + throw t; + } + } + return null; + } + }; + } + + private static void createNode( final Session session, final String label, final String property, final String value ) + { + session.writeTransaction( new TransactionWork() + { + @Override + public Void execute( Transaction tx ) + { + runCreateNode( tx, label, property, value ); + return null; + } + } ); + } + + private static void updateNode( final Session session, final String label, final String property, final String oldValue, final String newValue ) + { + session.writeTransaction( new TransactionWork() + { + @Override + public Void execute( Transaction tx ) + { + tx.run( "MATCH (n: " + label + '{' + property + ": $oldValue}) SET n." + property + " = $newValue", + parameters( "oldValue", oldValue, "newValue", newValue ) ); + return null; + } + } ); + } + + private static void runCreateNode( StatementRunner statementRunner, String label, String property, String value ) + { + statementRunner.run( "CREATE (n:" + label + ") SET n." + property + " = $value", parameters( "value", value ) ); + } + + private static int runCountNodes( StatementRunner statementRunner, String label, String property, String value ) + { + StatementResult result = statementRunner.run( "MATCH (n:" + label + " {" + property + ": $value}) RETURN count(n)", parameters( "value", value ) ); + return result.single().get( 0 ).asInt(); + } + + private static RoutingSettings defaultRoutingSettings() + { + return new RoutingSettings( 1, SECONDS.toMillis( 1 ), null ); + } + + private static Config configWithoutLogging() + { + return Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); + } + + private static ExecutorService newExecutor() + { + return Executors.newCachedThreadPool( daemon( CausalClusteringIT.class.getSimpleName() + "-thread-" ) ); + } + + private static void awaitAll( List> results ) throws Exception + { + for ( Future result : results ) + { + assertNull( result.get( DEFAULT_TIMEOUT_MS, MILLISECONDS ) ); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java index 2297c4013f..c8992cdd73 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; import java.util.HashMap; import org.neo4j.driver.internal.security.InternalAuthToken; @@ -52,7 +53,7 @@ public class CredentialsIT { @ClassRule - public static TemporaryFolder tempDir = new TemporaryFolder(); + public static TemporaryFolder tempDir = new TemporaryFolder( new File( "target" ) ); @ClassRule public static TestNeo4j neo4j = new TestNeo4j(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java index 64ad23eed5..4b9c691be0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TLSSocketChannelIT.java @@ -66,7 +66,7 @@ public class TLSSocketChannelIT public TestNeo4j neo4j = new TestNeo4j(); @Rule - public TemporaryFolder folder = new TemporaryFolder(); + public TemporaryFolder folder = new TemporaryFolder( new File( "target" ) ); @BeforeClass public static void setup() throws IOException, InterruptedException diff --git a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java index 3d573cb4ca..d7fd766931 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java @@ -20,8 +20,6 @@ import cucumber.api.CucumberOptions; import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import java.io.IOException; @@ -36,9 +34,6 @@ format = {"default_summary"}) public class DriverComplianceIT { - @Rule - TemporaryFolder folder = new TemporaryFolder(); - @ClassRule public static TestNeo4j neo4j = new TestNeo4j(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java index bc21acfb81..e41a4b20f8 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/LocalOrRemoteClusterRule.java @@ -92,7 +92,7 @@ private static void assertValidSystemPropertiesDefined() } if ( uri != null && !BOLT_ROUTING_URI_SCHEME.equals( uri.getScheme() ) ) { - throw new IllegalStateException( "CLuster uri should have bolt+routing scheme: '" + uri + "'" ); + throw new IllegalStateException( "Cluster uri should have bolt+routing scheme: '" + uri + "'" ); } }