Skip to content

Commit

Permalink
Check for fatal errors last in the transaction
Browse files Browse the repository at this point in the history
Fatal errors terminate the transaction. It will fail to commit when
terminated and rollback will be a no-op. All outstanding errors from
previous queries should still be propagated.

Previously transaction checked it's terminated status too early and
sometimes did not propagate pending errors. This commit fixes the
problem by moving the termination check to a place where all result
cursors are fully fetched. So errors from cursors will always get
propagated.

Also added a constant for default `RoutingSettings` for convenience.
  • Loading branch information
lutovich committed Apr 9, 2018
1 parent e5116e1 commit da01e2d
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ else if ( state == State.ROLLED_BACK )
{
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
}
else if ( state == State.TERMINATED )
{
transactionClosed( State.ROLLED_BACK );
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
}
else
{
return resultCursors.retrieveNotConsumedError()
Expand All @@ -197,12 +192,6 @@ else if ( state == State.ROLLED_BACK )
{
return completedWithNull();
}
else if ( state == State.TERMINATED )
{
// no need for explicit rollback, transaction should've been rolled back by the database
transactionClosed( State.ROLLED_BACK );
return completedWithNull();
}
else
{
return resultCursors.retrieveNotConsumedError()
Expand Down Expand Up @@ -344,6 +333,11 @@ public void setBookmark( Bookmark bookmark )

private CompletionStage<Void> doCommitAsync()
{
if ( state == State.TERMINATED )
{
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
}

CompletableFuture<Void> commitFuture = new CompletableFuture<>();
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this );
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
Expand All @@ -352,6 +346,11 @@ private CompletionStage<Void> doCommitAsync()

private CompletionStage<Void> doRollbackAsync()
{
if ( state == State.TERMINATED )
{
return completedWithNull();
}

CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture );
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package org.neo4j.driver.internal.cluster;

import static java.util.concurrent.TimeUnit.SECONDS;

public class RoutingSettings
{
public static final RoutingSettings DEFAULT = new RoutingSettings( 1, SECONDS.toMillis( 5 ) );

private final int maxRoutingFailures;
private final long retryTimeoutDelay;
private final RoutingContext routingContext;
Expand Down
4 changes: 2 additions & 2 deletions driver/src/main/java/org/neo4j/driver/v1/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ public static class ConfigBuilder
private boolean encrypted = true;
private TrustStrategy trustStrategy = trustAllCertificates();
private LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LEAST_CONNECTED;
private int routingFailureLimit = 1;
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
private int routingFailureLimit = RoutingSettings.DEFAULT.maxRoutingFailures();
private long routingRetryDelayMillis = RoutingSettings.DEFAULT.retryTimeoutDelay();
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
private RetrySettings retrySettings = RetrySettings.DEFAULT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable
{
Transaction tx1 = session.beginTransaction();

tx1.run( "CALL test.driver.longRunningStatement({seconds})",
parameters( "seconds", 10 ) );
StatementResult result = tx1.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) );

awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" );
session.reset();
Expand All @@ -210,6 +209,17 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable
assertThat( e.getMessage(),
containsString( "Cannot run more statements in this transaction, it has been terminated" ) );
}

// Make sure failure from the terminated long running statement is propagated
try
{
result.consume();
fail( "Exception expected" );
}
catch ( Neo4jException e )
{
assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) );
}
}
}

Expand Down Expand Up @@ -586,11 +596,20 @@ public void shouldHandleResetFromMultipleThreads() throws Throwable

executor.submit( () ->
{
try ( Transaction tx1 = session.beginTransaction() )
Transaction tx1 = session.beginTransaction();
tx1.run( "CREATE (n:FirstNode)" );
beforeCommit.countDown();
afterReset.await();

// session has been reset, there should be an unconsumed error from #run()
// this error should get propagated when transaction is closed
try
{
tx1.close();
fail( "Exception expected" );
}
catch ( Neo4jException ignore )
{
tx1.run( "CREATE (n:FirstNode)" );
beforeCommit.countDown();
afterReset.await();
}

try ( Transaction tx2 = session.beginTransaction() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.neo4j.driver.v1.integration;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -37,6 +40,12 @@

import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
Expand Down Expand Up @@ -67,6 +76,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.util.Iterables.single;
import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError;
import static org.neo4j.driver.internal.util.Matchers.containsResultAvailableAfterAndResultConsumedAfter;
Expand Down Expand Up @@ -1308,6 +1318,18 @@ public void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated()
assertEquals( 1, countNodes( 42 ) );
}

@Test
public void shouldPropagateCommitFailureAfterFatalError()
{
testCommitAndRollbackFailurePropagation( true );
}

@Test
public void shouldPropagateRollbackFailureAfterFatalError()
{
testCommitAndRollbackFailurePropagation( false );
}

private int countNodes( Object id )
{
StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) );
Expand Down Expand Up @@ -1356,6 +1378,45 @@ private void testConsume( String query )
assertNull( await( cursor.nextAsync() ) );
}

private void testCommitAndRollbackFailurePropagation( boolean commit )
{
ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM );
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig();

try ( Driver driver = driverFactory.newInstance( neo4j.uri(), neo4j.authToken(), RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config ) )
{
try ( Session session = driver.session() )
{
Transaction tx = session.beginTransaction();

// run query but do not consume the result
tx.run( "UNWIND range(0, 10000) AS x RETURN x + 1" );

IOException ioError = new IOException( "Connection reset by peer" );
for ( Channel channel : driverFactory.channels() )
{
// make channel experience a fatal network error
// run in the event loop thread and wait for the whole operation to complete
Future<ChannelPipeline> future = channel.eventLoop().submit( () -> channel.pipeline().fireExceptionCaught( ioError ) );
await( future );
}

CompletionStage<Void> commitOrRollback = commit ? tx.commitAsync() : tx.rollbackAsync();

// commit/rollback should fail and propagate the network error
try
{
await( commitOrRollback );
fail( "Exception expected" );
}
catch ( ServiceUnavailableException e )
{
assertEquals( ioError, e.getCause() );
}
}
}
}

private void assumeDatabaseSupportsBookmarks()
{
assumeTrue( "Neo4j " + neo4j.version() + " does not support bookmarks",
Expand Down

0 comments on commit da01e2d

Please sign in to comment.