Skip to content

Commit

Permalink
Merge pull request #431 from lutovich/1.5-improve-logging
Browse files Browse the repository at this point in the history
Improve logging in channel pipeline
  • Loading branch information
zhenlineo authored Nov 23, 2017
2 parents 9f27b11 + 05f5915 commit 0bfa545
Show file tree
Hide file tree
Showing 104 changed files with 665 additions and 1,008 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.async;
package org.neo4j.driver.internal;

import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.ChannelConnector;
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
Expand Down Expand Up @@ -143,11 +142,9 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address,
protected InternalDriver createDirectDriver( BoltServerAddress address, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic, ConnectionPool connectionPool )
{
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( sessionFactory, securityPlan, config );
}

/**
Expand All @@ -166,17 +163,17 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, eventExecutorGroup,
config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
return createDriver( sessionFactory, securityPlan, config );
}

/**
* Creates new {@link Driver}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config )
{
return new InternalDriver( securityPlan, sessionFactory );
return new InternalDriver( securityPlan, sessionFactory, config.logging() );
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import org.neo4j.driver.internal.async.InternalStatementResultCursor;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.async.ResultCursorsHolder;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Session;

import static java.util.concurrent.CompletableFuture.completedFuture;
Expand All @@ -33,13 +35,15 @@ public class InternalDriver implements Driver
{
private final SecurityPlan securityPlan;
private final SessionFactory sessionFactory;
private final Logger log;

private AtomicBoolean closed = new AtomicBoolean( false );

InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory )
InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
{
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
this.log = logging.getLog( Driver.class.getSimpleName() );
}

@Override
Expand Down Expand Up @@ -108,6 +112,7 @@ public CompletionStage<Void> closeAsync()
{
if ( closed.compareAndSet( false, true ) )
{
log.info( "Driver instance is closing" );
return sessionFactory.close();
}
return completedFuture( null );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.async;
package org.neo4j.driver.internal;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.async.InternalStatementResultCursor;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.logging.DelegatingLogger;
import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionProvider;
Expand Down Expand Up @@ -74,7 +73,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
this.logger = new PrefixedLogger( "[" + hashCode() + "]", logging.getLog( LOG_NAME ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal;

import org.neo4j.driver.internal.async.BoltServerAddress;

/**
* Interface used for tracking errors when connected to a cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.util.ServerVersion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

Expand All @@ -36,6 +38,7 @@ public class ChannelConnectedListener implements ChannelFutureListener
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final Logging logging;
private final Logger log;

public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder,
ChannelPromise handshakeCompletedPromise, Logging logging )
Expand All @@ -44,6 +47,7 @@ public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuild
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.logging = logging;
this.log = logging.getLog( getClass().getSimpleName() );
}

@Override
Expand All @@ -53,8 +57,10 @@ public void operationComplete( ChannelFuture future )

if ( future.isSuccess() )
{
log.trace( "Channel %s connected, running bolt handshake", channel );

ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast( new HandshakeResponseHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
pipeline.addLast( new HandshakeHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );

handshakeFuture.addListener( channelFuture ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;

import org.neo4j.driver.internal.BoltServerAddress;

public interface ChannelConnector
{
ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.Map;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ConnectionSettings;
import org.neo4j.driver.internal.security.InternalAuthToken;
import org.neo4j.driver.internal.security.SecurityPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.channel.ChannelPipeline;

import org.neo4j.driver.internal.async.inbound.ChannelErrorHandler;
import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
Expand All @@ -33,7 +34,7 @@ public class ChannelPipelineBuilderImpl implements ChannelPipelineBuilder
public void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging )
{
// inbound handlers
pipeline.addLast( new ChunkDecoder() );
pipeline.addLast( new ChunkDecoder( logging ) );
pipeline.addLast( new MessageDecoder() );
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,53 +28,91 @@
import java.util.List;
import javax.net.ssl.SSLHandshakeException;

import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.SecurityException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static org.neo4j.driver.internal.async.ProtocolUtil.HTTP;
import static org.neo4j.driver.internal.async.ProtocolUtil.NO_PROTOCOL_VERSION;
import static org.neo4j.driver.internal.async.ProtocolUtil.PROTOCOL_VERSION_1;

public class HandshakeResponseHandler extends ReplayingDecoder<Void>
public class HandshakeHandler extends ReplayingDecoder<Void>
{
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final Logging logging;
private final Logger log;

public HandshakeResponseHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
private boolean failed;
private Logger log;

public HandshakeHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
Logging logging )
{
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.logging = logging;
this.log = logging.getLog( getClass().getSimpleName() );
}

@Override
public void handlerAdded( ChannelHandlerContext ctx )
{
log = new PrefixedLogger( ctx.channel().toString(), logging, getClass() );
}

@Override
protected void handlerRemoved0( ChannelHandlerContext ctx )
{
failed = false;
log = null;
}

@Override
public void channelInactive( ChannelHandlerContext ctx )
{
log.debug( "Channel is inactive" );

if ( !failed )
{
// channel became inactive while doing bolt handshake, not because of some previous error
ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError();
fail( ctx, error );
}
}

@Override
public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
{
// todo: test this unwrapping and SSLHandshakeException propagation
Throwable cause = error instanceof DecoderException ? error.getCause() : error;
if ( cause instanceof SSLHandshakeException )
if ( failed )
{
fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
log.warn( "Another fatal error occurred in the pipeline", error );
}
else
{
fail( ctx, cause );
failed = true;

Throwable cause = error instanceof DecoderException ? error.getCause() : error;
if ( cause instanceof SSLHandshakeException )
{
fail( ctx, new SecurityException( "Failed to establish secured connection with the server", cause ) );
}
else
{
fail( ctx, cause );
}
}
}

@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
{
int serverSuggestedVersion = in.readInt();
log.debug( "Server suggested protocol version: %s", serverSuggestedVersion );
log.debug( "Server suggested protocol version %s during handshake", serverSuggestedVersion );

ChannelPipeline pipeline = ctx.pipeline();
// this is a one-time handler, remove it when protocol version has been read
Expand All @@ -101,7 +139,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )

private void fail( ChannelHandlerContext ctx, Throwable error )
{
ctx.close().addListener( future -> handshakeCompletedPromise.setFailure( error ) );
ctx.close().addListener( future -> handshakeCompletedPromise.tryFailure( error ) );
}

private static Throwable protocolNoSupportedByServerError()
Expand Down
Loading

0 comments on commit 0bfa545

Please sign in to comment.