From 686057826d38228300def407a5e8b0841074f904 Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 6 Apr 2018 15:07:04 +0200 Subject: [PATCH 1/6] Removed couple unused classes --- .../handlers/BookmarkResponseHandler.java | 56 ----------- .../driver/v1/util/RecordingByteChannel.java | 92 ------------------- 2 files changed, 148 deletions(-) delete mode 100644 driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java delete mode 100644 driver/src/test/java/org/neo4j/driver/v1/util/RecordingByteChannel.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java deleted file mode 100644 index 4b6eb94593..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/BookmarkResponseHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.handlers; - -import java.util.Map; - -import org.neo4j.driver.internal.Bookmark; -import org.neo4j.driver.internal.ExplicitTransaction; -import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.v1.Value; - -public class BookmarkResponseHandler implements ResponseHandler -{ - private final ExplicitTransaction tx; - - public BookmarkResponseHandler( ExplicitTransaction tx ) - { - this.tx = tx; - } - - @Override - public void onSuccess( Map metadata ) - { - Value bookmarkValue = metadata.get( "bookmark" ); - if ( bookmarkValue != null ) - { - tx.setBookmark( Bookmark.from( bookmarkValue.asString() ) ); - } - } - - @Override - public void onFailure( Throwable error ) - { - } - - @Override - public void onRecord( Value[] fields ) - { - } -} diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/RecordingByteChannel.java b/driver/src/test/java/org/neo4j/driver/v1/util/RecordingByteChannel.java deleted file mode 100644 index 9ea5d45117..0000000000 --- a/driver/src/test/java/org/neo4j/driver/v1/util/RecordingByteChannel.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.v1.util; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; - -public class RecordingByteChannel implements WritableByteChannel, ReadableByteChannel -{ - private final ByteBuffer buffer = ByteBuffer.allocate( 16 * 1024 ); - private int writePosition = 0; - private int readPosition = 0; - private boolean eof; - - @Override - public boolean isOpen() - { - return true; - } - - @Override - public void close() throws IOException - { - - } - - @Override - public int write( ByteBuffer src ) throws IOException - { - buffer.position( writePosition ); - int originalPosition = writePosition; - - buffer.put( src ); - - writePosition = buffer.position(); - return writePosition - originalPosition; - } - - @Override - public int read( ByteBuffer dst ) throws IOException - { - if ( readPosition == writePosition ) - { - return eof ? -1 : 0; - } - buffer.position( readPosition ); - int originalPosition = readPosition; - int originalLimit = buffer.limit(); - - buffer.limit( Math.min( buffer.position() + (dst.limit() - dst.position()), writePosition ) ); - dst.put( buffer ); - - readPosition = buffer.position(); - buffer.limit( originalLimit ); - return readPosition - originalPosition; - } - - public byte[] getBytes() - { - byte[] bytes = new byte[buffer.position()]; - buffer.position( 0 ); - buffer.get( bytes ); - return bytes; - } - - /** - * Mark this buffer as ended. Once whatever is currently unread in it is consumed, - * it will start yielding -1 responses. - */ - public void eof() - { - eof = true; - } -} From dc9b272cc65e02d335fc43dc6398ceb25e300815 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 9 Apr 2018 17:46:11 +0200 Subject: [PATCH 2/6] Wrap IOException in ServiceUnavailableException When establishing connection it was possible for `IOException` to get propagated directly to the user without being wrapped in a `ServiceUnavailableException`. This happened because `HandshakeHandler` only wrapped security-related exceptions with driver exceptions. It was problematic because rest of the codebase expects to deal with `ServiceUnavailableException`s. For example rediscovery and retries code can deal nicely with `ServiceUnavailableException`. This commit fixes the problem by making `HandshakeHandler` wrap all non-security related exceptions in `ServiceUnavailableException`. --- .../internal/async/HandshakeHandler.java | 29 +++++++++----- .../async/inbound/ChannelErrorHandler.java | 2 +- .../async/ChannelConnectorImplIT.java | 39 +++++++++++++++++++ .../internal/async/HandshakeHandlerTest.java | 39 +++++++++++++++---- 4 files changed, 91 insertions(+), 18 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java index 16afecb32d..2c2b34d849 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java @@ -95,16 +95,8 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error ) else { failed = true; - - Throwable cause = error instanceof DecoderException ? error.getCause() : error; - if ( cause instanceof SSLHandshakeException ) - { - fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) ); - } - else - { - fail( ctx, cause ); - } + Throwable cause = transformError( error ); + fail( ctx, cause ); } } @@ -161,4 +153,21 @@ private static Throwable protocolNoSupportedByDriverError( int suggestedProtocol return new ClientException( "Protocol error, server suggested unexpected protocol version: " + suggestedProtocolVersion ); } + + private static Throwable transformError( Throwable error ) + { + Throwable cause = error instanceof DecoderException ? error.getCause() : error; + if ( cause instanceof ServiceUnavailableException ) + { + return cause; + } + else if ( cause instanceof SSLHandshakeException ) + { + return new SecurityException( "Failed to establish secured connection with the server", cause ); + } + else + { + return new ServiceUnavailableException( "Failed to establish connection with the server", cause ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java index 4fda29998f..fbff09ba0a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java @@ -101,7 +101,7 @@ private void fail( ChannelHandlerContext ctx, Throwable error ) ctx.close(); } - private Throwable transformError( Throwable error ) + private static Throwable transformError( Throwable error ) { if ( error instanceof CodecException ) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java index 02cac4d285..f94a2da47c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplIT.java @@ -29,7 +29,9 @@ import org.junit.Test; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.ServerSocket; +import java.net.Socket; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -44,6 +46,7 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.util.TestNeo4j; +import static java.util.concurrent.CompletableFuture.runAsync; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; @@ -186,6 +189,42 @@ public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() ); } + @Test + public void shouldThrowServiceUnavailableExceptionOnFailureDuringConnect() throws Exception + { + ServerSocket server = new ServerSocket( 0 ); + BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() ); + + runAsync( () -> + { + try + { + // wait for a connection + Socket socket = server.accept(); + // and terminate it immediately so that client gets a "reset by peer" IOException + socket.close(); + server.close(); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } ); + + ChannelConnector connector = newConnector( neo4j.authToken() ); + ChannelFuture channelFuture = connector.connect( address, bootstrap ); + + // connect operation should fail with ServiceUnavailableException + try + { + await( channelFuture ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException ignore ) + { + } + } + private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException { try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java index b572f869c0..2aee56d048 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeHandlerTest.java @@ -46,10 +46,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.HTTP; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.NO_PROTOCOL_VERSION; import static org.neo4j.driver.internal.async.BoltProtocolV1Util.PROTOCOL_VERSION_1; +import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -85,9 +85,34 @@ public void shouldFailGivenPromiseWhenExceptionCaught() await( handshakeCompletedPromise ); fail( "Exception expected" ); } - catch ( Exception e ) + catch ( ServiceUnavailableException e ) + { + assertEquals( cause, e.getCause() ); + } + + // channel should be closed + assertNull( await( channel.closeFuture() ) ); + } + + @Test + public void shouldFailGivenPromiseWhenServiceUnavailableExceptionCaught() + { + ChannelPromise handshakeCompletedPromise = channel.newPromise(); + HandshakeHandler handler = newHandler( handshakeCompletedPromise ); + channel.pipeline().addLast( handler ); + + ServiceUnavailableException error = new ServiceUnavailableException( "Bad error" ); + channel.pipeline().fireExceptionCaught( error ); + + try { - assertEquals( cause, e ); + // promise should fail + await( handshakeCompletedPromise ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( error, e ); } // channel should be closed @@ -112,9 +137,9 @@ public void shouldFailGivenPromiseWhenMultipleExceptionsCaught() await( handshakeCompletedPromise ); fail( "Exception expected" ); } - catch ( RuntimeException e ) + catch ( ServiceUnavailableException e ) { - assertEquals( error1, e ); + assertEquals( error1, e.getCause() ); } // channel should be closed @@ -147,9 +172,9 @@ public void shouldUnwrapDecoderException() await( handshakeCompletedPromise ); fail( "Exception expected" ); } - catch ( Exception e ) + catch ( ServiceUnavailableException e ) { - assertEquals( cause, e ); + assertEquals( cause, e.getCause() ); } // channel should be closed From 87a46c8da79266a78b1f5600754a0ed35c462795 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 9 Apr 2018 19:24:21 +0200 Subject: [PATCH 3/6] Check for fatal errors last in the transaction Fatal errors terminate the transaction. It will fail to commit when terminated and rollback will be a no-op. All outstanding errors from previous queries should still be propagated. Previously transaction checked it's terminated status too early and sometimes did not propagate pending errors. This commit fixes the problem by moving the termination check to a place where all result cursors are fully fetched. So errors from cursors will always get propagated. Also added a constant for default `RoutingSettings` for convenience. --- .../driver/internal/ExplicitTransaction.java | 21 +++---- .../internal/cluster/RoutingSettings.java | 4 ++ .../main/java/org/neo4j/driver/v1/Config.java | 4 +- .../driver/v1/integration/SessionResetIT.java | 54 +++++++++++----- .../v1/integration/TransactionAsyncIT.java | 61 +++++++++++++++++++ 5 files changed, 116 insertions(+), 28 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index f2397c3460..7436745db2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -173,11 +173,6 @@ else if ( state == State.ROLLED_BACK ) { return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) ); } - else if ( state == State.TERMINATED ) - { - transactionClosed( State.ROLLED_BACK ); - return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) ); - } else { return resultCursors.retrieveNotConsumedError() @@ -197,12 +192,6 @@ else if ( state == State.ROLLED_BACK ) { return completedWithNull(); } - else if ( state == State.TERMINATED ) - { - // no need for explicit rollback, transaction should've been rolled back by the database - transactionClosed( State.ROLLED_BACK ); - return completedWithNull(); - } else { return resultCursors.retrieveNotConsumedError() @@ -344,6 +333,11 @@ public void setBookmark( Bookmark bookmark ) private CompletionStage doCommitAsync() { + if ( state == State.TERMINATED ) + { + return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) ); + } + CompletableFuture commitFuture = new CompletableFuture<>(); ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this ); connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); @@ -352,6 +346,11 @@ private CompletionStage doCommitAsync() private CompletionStage doRollbackAsync() { + if ( state == State.TERMINATED ) + { + return completedWithNull(); + } + CompletableFuture rollbackFuture = new CompletableFuture<>(); ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture ); connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java index d20f58d622..80280a5edf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java @@ -18,8 +18,12 @@ */ package org.neo4j.driver.internal.cluster; +import static java.util.concurrent.TimeUnit.SECONDS; + public class RoutingSettings { + public static final RoutingSettings DEFAULT = new RoutingSettings( 1, SECONDS.toMillis( 5 ) ); + private final int maxRoutingFailures; private final long retryTimeoutDelay; private final RoutingContext routingContext; diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index 1971ec3300..b29f8a1374 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -267,8 +267,8 @@ public static class ConfigBuilder private boolean encrypted = true; private TrustStrategy trustStrategy = trustAllCertificates(); private LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LEAST_CONNECTED; - private int routingFailureLimit = 1; - private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 ); + private int routingFailureLimit = RoutingSettings.DEFAULT.maxRoutingFailures(); + private long routingRetryDelayMillis = RoutingSettings.DEFAULT.retryTimeoutDelay(); private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 ); private RetrySettings retrySettings = RetrySettings.DEFAULT; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java index 8b66616e35..e56e4cd189 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java @@ -183,8 +183,7 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable { Transaction tx1 = session.beginTransaction(); - tx1.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); + StatementResult result = tx1.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); session.reset(); @@ -210,6 +209,17 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable assertThat( e.getMessage(), containsString( "Cannot run more statements in this transaction, it has been terminated" ) ); } + + // Make sure failure from the terminated long running statement is propagated + try + { + result.consume(); + fail( "Exception expected" ); + } + catch ( Neo4jException e ) + { + assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); + } } } @@ -584,25 +594,33 @@ public void shouldHandleResetFromMultipleThreads() throws Throwable CountDownLatch beforeCommit = new CountDownLatch( 1 ); CountDownLatch afterReset = new CountDownLatch( 1 ); - executor.submit( () -> + Future txFuture = executor.submit( () -> { - try ( Transaction tx1 = session.beginTransaction() ) + Transaction tx1 = session.beginTransaction(); + tx1.run( "CREATE (n:FirstNode)" ); + beforeCommit.countDown(); + afterReset.await(); + + // session has been reset, it should not be possible to commit the transaction + try + { + tx1.success(); + tx1.close(); + } + catch ( Neo4jException ignore ) { - tx1.run( "CREATE (n:FirstNode)" ); - beforeCommit.countDown(); - afterReset.await(); } try ( Transaction tx2 = session.beginTransaction() ) { - tx2.run( "CREATE (n:FirstNode)" ); + tx2.run( "CREATE (n:SecondNode)" ); tx2.success(); } return null; } ); - executor.submit( () -> + Future resetFuture = executor.submit( () -> { beforeCommit.await(); session.reset(); @@ -611,12 +629,13 @@ public void shouldHandleResetFromMultipleThreads() throws Throwable } ); executor.shutdown(); - executor.awaitTermination( 10, SECONDS ); + executor.awaitTermination( 20, SECONDS ); + + txFuture.get( 20, SECONDS ); + resetFuture.get( 20, SECONDS ); - // Then the outcome of both statements should be visible - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - assertThat( nodes, equalTo( 1L ) ); + assertEquals( 0, countNodes( "FirstNode" ) ); + assertEquals( 1, countNodes( "SecondNode" ) ); } private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception @@ -812,10 +831,15 @@ private void awaitActiveQueriesToContain( String value ) } private long countNodes() + { + return countNodes( null ); + } + + private long countNodes( String label ) { try ( Session session = neo4j.driver().session() ) { - StatementResult result = session.run( "MATCH (n) RETURN count(n) AS result" ); + StatementResult result = session.run( "MATCH (n" + (label == null ? "" : ":" + label) + ") RETURN count(n) AS result" ); return result.single().get( 0 ).asLong(); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index c9ebc981cd..bc5953226a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -18,6 +18,9 @@ */ package org.neo4j.driver.v1.integration; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.util.concurrent.Future; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -37,6 +40,12 @@ import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.async.EventLoopGroupFactory; +import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.retry.RetrySettings; +import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -67,6 +76,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError; import static org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter; @@ -1308,6 +1318,18 @@ public void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated() assertEquals( 1, countNodes( 42 ) ); } + @Test + public void shouldPropagateCommitFailureAfterFatalError() + { + testCommitAndRollbackFailurePropagation( true ); + } + + @Test + public void shouldPropagateRollbackFailureAfterFatalError() + { + testCommitAndRollbackFailurePropagation( false ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); @@ -1356,6 +1378,45 @@ private void testConsume( String query ) assertNull( await( cursor.nextAsync() ) ); } + private void testCommitAndRollbackFailurePropagation( boolean commit ) + { + ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM ); + Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); + + try ( Driver driver = driverFactory.newInstance( neo4j.uri(), neo4j.authToken(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) ) + { + try ( Session session = driver.session() ) + { + Transaction tx = session.beginTransaction(); + + // run query but do not consume the result + tx.run( "UNWIND range(0, 10000) AS x RETURN x + 1" ); + + IOException ioError = new IOException( "Connection reset by peer" ); + for ( Channel channel : driverFactory.channels() ) + { + // make channel experience a fatal network error + // run in the event loop thread and wait for the whole operation to complete + Future future = channel.eventLoop().submit( () -> channel.pipeline().fireExceptionCaught( ioError ) ); + await( future ); + } + + CompletionStage commitOrRollback = commit ? tx.commitAsync() : tx.rollbackAsync(); + + // commit/rollback should fail and propagate the network error + try + { + await( commitOrRollback ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( ioError, e.getCause() ); + } + } + } + } + private void assumeDatabaseSupportsBookmarks() { assumeTrue( "Neo4j " + neo4j.version() + " does not support bookmarks", From bf253bd1b7dd2153eedfbb394e533aba816d65e2 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 10 Apr 2018 17:45:28 +0200 Subject: [PATCH 4/6] Improve waiting for leader to step down in CC tests Use direct drivers to fetch cluster overview from cores, instead of routing driver and write session. Wait for at most 60 seconds. --- .../v1/integration/CausalClusteringIT.java | 123 +++++++++++++----- .../org/neo4j/driver/v1/util/cc/Cluster.java | 8 ++ .../neo4j/driver/v1/util/cc/ClusterRule.java | 5 +- 3 files changed, 101 insertions(+), 35 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index e13dd2b8fd..10c735a775 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; @@ -58,6 +59,7 @@ import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Values; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.exceptions.TransientException; @@ -413,11 +415,12 @@ public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception try ( Driver driver = createDriver( leader.getRoutingUri() ) ) { + Set cores = cluster.cores(); for ( ClusterMember follower : cluster.followers() ) { cluster.kill( follower ); } - awaitLeaderToStepDown( driver ); + awaitLeaderToStepDown( cores ); // now we should be unable to write because majority of cores is down for ( int i = 0; i < 10; i++ ) @@ -462,11 +465,12 @@ public Integer execute( Transaction tx ) ensureNodeVisible( cluster, "Star Lord", bookmark ); + Set cores = cluster.cores(); for ( ClusterMember follower : cluster.followers() ) { cluster.kill( follower ); } - awaitLeaderToStepDown( driver ); + awaitLeaderToStepDown( cores ); // now we should be unable to write because majority of cores is down try ( Session session = driver.session( AccessMode.WRITE ) ) @@ -913,44 +917,27 @@ public Integer execute( Transaction tx ) } } - private void awaitLeaderToStepDown( Driver driver ) + private void awaitLeaderToStepDown( Set cores ) { - int leadersCount; - int followersCount; - int readReplicasCount; + long deadline = System.currentTimeMillis() + DEFAULT_TIMEOUT_MS; + ClusterOverview overview = null; do { - try ( Session session = driver.session() ) + for ( ClusterMember core : cores ) { - int newLeadersCount = 0; - int newFollowersCount = 0; - int newReadReplicasCount = 0; - for ( Record record : session.run( "CALL dbms.cluster.overview()" ).list() ) + overview = fetchClusterOverview( core ); + if ( overview != null ) { - ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() ); - if ( role == ClusterMemberRole.LEADER ) - { - newLeadersCount++; - } - else if ( role == ClusterMemberRole.FOLLOWER ) - { - newFollowersCount++; - } - else if ( role == ClusterMemberRole.READ_REPLICA ) - { - newReadReplicasCount++; - } - else - { - throw new AssertionError( "Unknown role: " + role ); - } + break; } - leadersCount = newLeadersCount; - followersCount = newFollowersCount; - readReplicasCount = newReadReplicasCount; } } - while ( !(leadersCount == 0 && followersCount == 1 && readReplicasCount == 2) ); + while ( !isSingleFollowerWithReadReplicas( overview ) && System.currentTimeMillis() <= deadline ); + + if ( System.currentTimeMillis() > deadline ) + { + throw new IllegalStateException( "Leader did not step down in " + DEFAULT_TIMEOUT_MS + "ms. Last seen cluster overview: " + overview ); + } } private Driver createDriver( URI boltUri ) @@ -968,6 +955,43 @@ private Driver discoverDriver( List routingUris ) return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), configWithoutLogging() ); } + private ClusterOverview fetchClusterOverview( ClusterMember member ) + { + int leaderCount = 0; + int followerCount = 0; + int readReplicaCount = 0; + + Driver driver = clusterRule.getCluster().getDirectDriver( member ); + try ( Session session = driver.session() ) + { + for ( Record record : session.run( "CALL dbms.cluster.overview()" ).list() ) + { + ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() ); + if ( role == ClusterMemberRole.LEADER ) + { + leaderCount++; + } + else if ( role == ClusterMemberRole.FOLLOWER ) + { + followerCount++; + } + else if ( role == ClusterMemberRole.READ_REPLICA ) + { + readReplicaCount++; + } + else + { + throw new AssertionError( "Unknown role: " + role ); + } + } + return new ClusterOverview( leaderCount, followerCount, readReplicaCount ); + } + catch ( Neo4jException ignore ) + { + return null; + } + } + private static void createNodesInDifferentThreads( int count, final Driver driver ) throws Exception { final CountDownLatch beforeRunLatch = new CountDownLatch( count ); @@ -1133,6 +1157,17 @@ private static ExecutorService newExecutor() return Executors.newCachedThreadPool( daemon( CausalClusteringIT.class.getSimpleName() + "-thread-" ) ); } + private static boolean isSingleFollowerWithReadReplicas( ClusterOverview overview ) + { + if ( overview == null ) + { + return false; + } + return overview.leaderCount == 0 && + overview.followerCount == 1 && + overview.readReplicaCount == ClusterRule.READ_REPLICA_COUNT; + } + private static class RecordAndSummary { final Record record; @@ -1144,4 +1179,28 @@ private static class RecordAndSummary this.summary = summary; } } + + private static class ClusterOverview + { + final int leaderCount; + final int followerCount; + final int readReplicaCount; + + ClusterOverview( int leaderCount, int followerCount, int readReplicaCount ) + { + this.leaderCount = leaderCount; + this.followerCount = followerCount; + this.readReplicaCount = readReplicaCount; + } + + @Override + public String toString() + { + return "ClusterOverview{" + + "leaderCount=" + leaderCount + + ", followerCount=" + followerCount + + ", readReplicaCount=" + readReplicaCount + + '}'; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java index 034fb7bee9..10485b1da0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java @@ -123,6 +123,14 @@ public ClusterMember anyReadReplica() return randomOf( readReplicas() ); } + public Set cores() + { + Set readReplicas = membersWithRole( ClusterMemberRole.READ_REPLICA ); + Set cores = new HashSet<>( members ); + cores.removeAll( readReplicas ); + return cores; + } + public Set readReplicas() { return membersWithRole( ClusterMemberRole.READ_REPLICA ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java index b4f8782644..8e56f4eb9f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java @@ -39,9 +39,8 @@ public class ClusterRule extends ExternalResource private static final String PASSWORD = "test"; private static final int INITIAL_PORT = 20_000; - // todo: should be possible to configure (dynamically add/remove) cores and read replicas - private static final int CORE_COUNT = 3; - private static final int READ_REPLICA_COUNT = 2; + public static final int CORE_COUNT = 3; + public static final int READ_REPLICA_COUNT = 2; public Cluster getCluster() { From 19fc1e027e904ee1999f18fceeeb812d6f167051 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 10 Apr 2018 17:46:47 +0200 Subject: [PATCH 5/6] Use same password for test single instance and cluster --- .../test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java index 8e56f4eb9f..06f05c9a53 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java @@ -30,13 +30,14 @@ import org.neo4j.driver.v1.util.Neo4jRunner; import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.v1.util.Neo4jRunner.PASSWORD; import static org.neo4j.driver.v1.util.Neo4jRunner.TARGET_DIR; +import static org.neo4j.driver.v1.util.Neo4jRunner.USER; import static org.neo4j.driver.v1.util.cc.CommandLineUtil.boltKitAvailable; public class ClusterRule extends ExternalResource { private static final Path CLUSTER_DIR = Paths.get( TARGET_DIR, "test-cluster" ).toAbsolutePath(); - private static final String PASSWORD = "test"; private static final int INITIAL_PORT = 20_000; public static final int CORE_COUNT = 3; @@ -49,7 +50,7 @@ public Cluster getCluster() public AuthToken getDefaultAuthToken() { - return AuthTokens.basic( "neo4j", PASSWORD ); + return AuthTokens.basic( USER, PASSWORD ); } public static void stopSharedCluster() From f2eec3f656c8adb5753281fe1d687ef89a3ab6fc Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 13 Apr 2018 14:00:37 +0200 Subject: [PATCH 6/6] Temporarily ignore couple CC integration tests --- .../org/neo4j/driver/v1/integration/CausalClusteringIT.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index 10c735a775..4c99252ac0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import org.junit.After; import org.junit.AfterClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -408,6 +409,7 @@ public String apply( Session session ) } @Test + @Ignore public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception { Cluster cluster = clusterRule.getCluster(); @@ -439,6 +441,7 @@ public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception } @Test + @Ignore public void shouldServeReadsWhenMajorityOfCoresAreDead() throws Exception { Cluster cluster = clusterRule.getCluster();