diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/BoltServerAddress.java b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java
similarity index 99%
rename from driver/src/main/java/org/neo4j/driver/internal/async/BoltServerAddress.java
rename to driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java
index 09319d24d4..28ab4c097d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/BoltServerAddress.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal;
import java.net.InetAddress;
import java.net.InetSocketAddress;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
index 50eceaa612..e9ff1f5043 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
@@ -20,7 +20,6 @@
import java.util.concurrent.CompletionStage;
-import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index e2e9cf5003..633abd2651 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -25,7 +25,6 @@
import java.net.URI;
import java.security.GeneralSecurityException;
-import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.ChannelConnector;
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
@@ -143,11 +142,9 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address,
protected InternalDriver createDirectDriver( BoltServerAddress address, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic, ConnectionPool connectionPool )
{
- ConnectionProvider connectionProvider =
- new DirectConnectionProvider( address, connectionPool );
- SessionFactory sessionFactory =
- createSessionFactory( connectionProvider, retryLogic, config );
- return createDriver( config, securityPlan, sessionFactory );
+ ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
+ SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
+ return createDriver( sessionFactory, securityPlan, config );
}
/**
@@ -166,7 +163,7 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, eventExecutorGroup,
config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
- return createDriver( config, securityPlan, sessionFactory );
+ return createDriver( sessionFactory, securityPlan, config );
}
/**
@@ -174,9 +171,9 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect
*
* This method is protected only for testing
*/
- protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
+ protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config )
{
- return new InternalDriver( securityPlan, sessionFactory );
+ return new InternalDriver( securityPlan, sessionFactory, config.logging() );
}
/**
diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
index a9d2548c17..134db0401e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
@@ -25,7 +25,6 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import org.neo4j.driver.internal.async.InternalStatementResultCursor;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.async.ResultCursorsHolder;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
index 6597a6467d..57c576c1e8 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
@@ -24,6 +24,8 @@
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.Logger;
+import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Session;
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -33,13 +35,15 @@ public class InternalDriver implements Driver
{
private final SecurityPlan securityPlan;
private final SessionFactory sessionFactory;
+ private final Logger log;
private AtomicBoolean closed = new AtomicBoolean( false );
- InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory )
+ InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
{
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
+ this.log = logging.getLog( Driver.class.getSimpleName() );
}
@Override
@@ -108,6 +112,7 @@ public CompletionStage closeAsync()
{
if ( closed.compareAndSet( false, true ) )
{
+ log.info( "Driver instance is closing" );
return sessionFactory.close();
}
return completedFuture( null );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java
similarity index 99%
rename from driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
rename to driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java
index 1f2e0ae6f2..877301b5f2 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal;
import java.util.ArrayList;
import java.util.List;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
index f40fa53819..7e4279d245 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
@@ -24,9 +24,8 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.neo4j.driver.internal.async.InternalStatementResultCursor;
import org.neo4j.driver.internal.async.QueryRunner;
-import org.neo4j.driver.internal.logging.DelegatingLogger;
+import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionProvider;
@@ -74,7 +73,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
- this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
+ this.logger = new PrefixedLogger( "[" + hashCode() + "]", logging.getLog( LOG_NAME ) );
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingErrorHandler.java
index 0452474ea0..6f90373f84 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/RoutingErrorHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/RoutingErrorHandler.java
@@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal;
-import org.neo4j.driver.internal.async.BoltServerAddress;
-
/**
* Interface used for tracking errors when connected to a cluster.
*/
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java
index 342253fdfd..d9f1dd83e4 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java
@@ -21,6 +21,7 @@
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
+import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.util.ServerVersion;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java
index d709c60aa7..d5997d3c17 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java
@@ -24,6 +24,8 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
@@ -36,6 +38,7 @@ public class ChannelConnectedListener implements ChannelFutureListener
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final Logging logging;
+ private final Logger log;
public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder,
ChannelPromise handshakeCompletedPromise, Logging logging )
@@ -44,6 +47,7 @@ public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuild
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.logging = logging;
+ this.log = logging.getLog( getClass().getSimpleName() );
}
@Override
@@ -53,8 +57,10 @@ public void operationComplete( ChannelFuture future )
if ( future.isSuccess() )
{
+ log.trace( "Channel %s connected, running bolt handshake", channel );
+
ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast( new HandshakeResponseHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
+ pipeline.addLast( new HandshakeHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );
handshakeFuture.addListener( channelFuture ->
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java
index 4584f791d9..025264778b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnector.java
@@ -21,6 +21,8 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
+import org.neo4j.driver.internal.BoltServerAddress;
+
public interface ChannelConnector
{
ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java
index 0da426a1f3..bf44514d37 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java
@@ -26,6 +26,7 @@
import java.util.Map;
+import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ConnectionSettings;
import org.neo4j.driver.internal.security.InternalAuthToken;
import org.neo4j.driver.internal.security.SecurityPlan;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java
index 5201adca7f..7da5424e2e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelPipelineBuilderImpl.java
@@ -20,6 +20,7 @@
import io.netty.channel.ChannelPipeline;
+import org.neo4j.driver.internal.async.inbound.ChannelErrorHandler;
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
@@ -33,7 +34,7 @@ public class ChannelPipelineBuilderImpl implements ChannelPipelineBuilder
public void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging )
{
// inbound handlers
- pipeline.addLast( new ChunkDecoder() );
+ pipeline.addLast( new ChunkDecoder( logging ) );
pipeline.addLast( new MessageDecoder() );
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java
similarity index 70%
rename from driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java
rename to driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java
index 517f2af74c..91f1fe9fa3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeHandler.java
@@ -28,45 +28,83 @@
import java.util.List;
import javax.net.ssl.SSLHandshakeException;
+import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
+import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.SecurityException;
+import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import static org.neo4j.driver.internal.async.ProtocolUtil.HTTP;
import static org.neo4j.driver.internal.async.ProtocolUtil.NO_PROTOCOL_VERSION;
import static org.neo4j.driver.internal.async.ProtocolUtil.PROTOCOL_VERSION_1;
-public class HandshakeResponseHandler extends ReplayingDecoder
+public class HandshakeHandler extends ReplayingDecoder
{
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final Logging logging;
- private final Logger log;
- public HandshakeResponseHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
+ private boolean failed;
+ private Logger log;
+
+ public HandshakeHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
Logging logging )
{
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.logging = logging;
- this.log = logging.getLog( getClass().getSimpleName() );
+ }
+
+ @Override
+ public void handlerAdded( ChannelHandlerContext ctx )
+ {
+ log = new PrefixedLogger( ctx.channel().toString(), logging, getClass() );
+ }
+
+ @Override
+ protected void handlerRemoved0( ChannelHandlerContext ctx )
+ {
+ failed = false;
+ log = null;
+ }
+
+ @Override
+ public void channelInactive( ChannelHandlerContext ctx )
+ {
+ log.debug( "Channel is inactive" );
+
+ if ( !failed )
+ {
+ // channel became inactive while doing bolt handshake, not because of some previous error
+ ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError();
+ fail( ctx, error );
+ }
}
@Override
public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
{
- // todo: test this unwrapping and SSLHandshakeException propagation
- Throwable cause = error instanceof DecoderException ? error.getCause() : error;
- if ( cause instanceof SSLHandshakeException )
+ if ( failed )
{
- fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
+ log.warn( "Another fatal error occurred in the pipeline", error );
}
else
{
- fail( ctx, cause );
+ failed = true;
+
+ Throwable cause = error instanceof DecoderException ? error.getCause() : error;
+ if ( cause instanceof SSLHandshakeException )
+ {
+ fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
+ }
+ else
+ {
+ fail( ctx, cause );
+ }
}
}
@@ -74,7 +112,7 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List