diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java index b856df6ddb..88d14b54f3 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramer.java @@ -14,11 +14,14 @@ import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.BreachOfProtocolException; import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.IncompatiblePeerException; +import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.PeerDisconnectedException; import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer; import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes; @@ -73,7 +76,9 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L MessageData message; while ((message = framer.deframe(in)) != null) { - if (!hellosExchanged && message.getCode() == WireMessageCodes.HELLO) { + if (hellosExchanged) { + out.add(message); + } else if (message.getCode() == WireMessageCodes.HELLO) { hellosExchanged = true; // Decode first hello and use the payload to modify pipeline final PeerInfo peerInfo; @@ -103,7 +108,6 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L connectFuture.completeExceptionally( new IncompatiblePeerException("No shared capabilities")); connection.disconnect(DisconnectReason.USELESS_PEER); - return; } // Setup next stage @@ -116,8 +120,25 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L new ApiHandler(capabilityMultiplexer, connection, callbacks, waitingForPong), new MessageFramer(capabilityMultiplexer, framer)); connectFuture.complete(connection); + } else if (message.getCode() == WireMessageCodes.DISCONNECT) { + DisconnectMessage disconnectMessage = DisconnectMessage.readFrom(message); + LOG.debug( + "Peer disconnected before sending HELLO. Reason: " + disconnectMessage.getReason()); + ctx.close(); + connectFuture.completeExceptionally( + new PeerDisconnectedException(disconnectMessage.getReason())); } else { - out.add(message); + // Unexpected message - disconnect + LOG.debug( + "Message received before HELLO's exchanged, disconnecting. Code: {}, Data: {}", + message.getCode(), + message.getData().toString()); + ctx.writeAndFlush( + new OutboundMessage( + null, DisconnectMessage.create(DisconnectReason.BREACH_OF_PROTOCOL))) + .addListener((f) -> ctx.close()); + connectFuture.completeExceptionally( + new BreachOfProtocolException("Message received before HELLO's exchanged")); } } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/BreachOfProtocolException.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/BreachOfProtocolException.java new file mode 100644 index 0000000000..f514d3ca98 --- /dev/null +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/BreachOfProtocolException.java @@ -0,0 +1,20 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.netty.exceptions; + +public class BreachOfProtocolException extends RuntimeException { + + public BreachOfProtocolException(final String message) { + super(message); + } +} diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/PeerDisconnectedException.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/PeerDisconnectedException.java new file mode 100644 index 0000000000..0c34e113c9 --- /dev/null +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/netty/exceptions/PeerDisconnectedException.java @@ -0,0 +1,22 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.netty.exceptions; + +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; + +public class PeerDisconnectedException extends RuntimeException { + + public PeerDisconnectedException(final DisconnectReason reason) { + super("Peer disconnected for reason: " + reason.toString()); + } +} diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java index d371b7a9b1..b5c9be6d5b 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/wire/RawMessage.java @@ -27,4 +27,9 @@ public RawMessage(final int code, final BytesValue data) { public int getCode() { return code; } + + @Override + public String toString() { + return "RawMessage{" + "code=" + code + ", data=" + data + '}'; + } } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java index 62887ed8d8..37a40f249a 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java @@ -12,42 +12,100 @@ */ package tech.pegasys.pantheon.ethereum.p2p.netty; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.BreachOfProtocolException; +import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.IncompatiblePeerException; +import tech.pegasys.pantheon.ethereum.p2p.netty.exceptions.PeerDisconnectedException; +import tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers.NettyMocks; +import tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers.SubProtocolMock; +import tech.pegasys.pantheon.ethereum.p2p.peers.Peer; import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer; import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException; +import tech.pegasys.pantheon.ethereum.p2p.wire.Capability; import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; +import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage; import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.HelloMessage; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.PingMessage; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.WireMessageCodes; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.util.bytes.BytesValue; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; import io.netty.handler.codec.DecoderException; +import io.netty.util.concurrent.ScheduledFuture; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; public class DeFramerTest { private final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + private final Channel channel = mock(Channel.class); + private final ChannelPipeline pipeline = mock(ChannelPipeline.class); + private final EventLoop eventLoop = mock(EventLoop.class); private final Framer framer = mock(Framer.class); private final Callbacks callbacks = mock(Callbacks.class); private final PeerConnection peerConnection = mock(PeerConnection.class); private final CompletableFuture connectFuture = new CompletableFuture<>(); + private final PeerInfo peerInfo = + new PeerInfo( + 5, + "abc", + Arrays.asList(Capability.create("eth", 63)), + 0, + BytesValue.fromHexString("0x01")); private final DeFramer deFramer = new DeFramer( framer, - Collections.emptyList(), - new PeerInfo(5, "abc", Collections.emptyList(), 0, BytesValue.fromHexString("0x01")), + Arrays.asList(SubProtocolMock.create("eth")), + peerInfo, callbacks, connectFuture, NoOpMetricsSystem.NO_OP_LABELLED_3_COUNTER); + @Before + @SuppressWarnings("unchecked") + public void setup() { + when(ctx.channel()).thenReturn(channel); + + when(channel.pipeline()).thenReturn(pipeline); + when(pipeline.addLast(any())).thenReturn(pipeline); + when(pipeline.addFirst(any())).thenReturn(pipeline); + + when(channel.eventLoop()).thenReturn(eventLoop); + when(eventLoop.schedule(any(Callable.class), anyLong(), any())) + .thenReturn(mock(ScheduledFuture.class)); + } + @Test - public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() throws Exception { + public void exceptionCaught_shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() + throws Exception { connectFuture.complete(peerConnection); deFramer.exceptionCaught(ctx, new DecoderException(new FramingException("Test"))); @@ -56,7 +114,8 @@ public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() thro } @Test - public void shouldHandleFramingExceptionWhenFutureCompletedExceptionally() throws Exception { + public void exceptionCaught_shouldHandleFramingExceptionWhenFutureCompletedExceptionally() + throws Exception { connectFuture.completeExceptionally(new Exception()); deFramer.exceptionCaught(ctx, new DecoderException(new FramingException("Test"))); @@ -65,11 +124,127 @@ public void shouldHandleFramingExceptionWhenFutureCompletedExceptionally() throw } @Test - public void shouldHandleGenericExceptionWhenFutureCompletedExceptionally() throws Exception { + public void exceptionCaught_shouldHandleGenericExceptionWhenFutureCompletedExceptionally() + throws Exception { connectFuture.completeExceptionally(new Exception()); deFramer.exceptionCaught(ctx, new DecoderException(new RuntimeException("Test"))); verify(ctx).close(); } + + @Test + public void decode_handlesHello() throws ExecutionException, InterruptedException { + ChannelFuture future = NettyMocks.channelFuture(false); + when(channel.closeFuture()).thenReturn(future); + + PeerInfo remotePeerInfo = + new PeerInfo( + peerInfo.getVersion(), + peerInfo.getClientId(), + peerInfo.getCapabilities(), + peerInfo.getPort(), + Peer.randomId()); + HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().extractArray()); + when(framer.deframe(eq(data))) + .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) + .thenReturn(null); + List out = new ArrayList<>(); + deFramer.decode(ctx, data, out); + + assertThat(connectFuture).isDone(); + assertThat(connectFuture).isNotCompletedExceptionally(); + PeerConnection peerConnection = connectFuture.get(); + assertThat(peerConnection.getPeer()).isEqualTo(remotePeerInfo); + assertThat(out).isEmpty(); + + // Next phase of pipeline should be setup + verify(pipeline, times(1)).addLast(any()); + + // Next message should be pushed out + PingMessage nextMessage = PingMessage.get(); + ByteBuf nextData = Unpooled.wrappedBuffer(nextMessage.getData().extractArray()); + when(framer.deframe(eq(nextData))) + .thenReturn(new RawMessage(nextMessage.getCode(), nextMessage.getData())) + .thenReturn(null); + verify(pipeline, times(1)).addLast(any()); + deFramer.decode(ctx, nextData, out); + assertThat(out.size()).isEqualTo(1); + } + + @Test + public void decode_handlesNoSharedCaps() throws ExecutionException, InterruptedException { + ChannelFuture future = NettyMocks.channelFuture(false); + when(channel.closeFuture()).thenReturn(future); + + PeerInfo remotePeerInfo = + new PeerInfo( + peerInfo.getVersion(), + peerInfo.getClientId(), + Arrays.asList(Capability.create("eth", 254)), + peerInfo.getPort(), + Peer.randomId()); + HelloMessage helloMessage = HelloMessage.create(remotePeerInfo); + ByteBuf data = Unpooled.wrappedBuffer(helloMessage.getData().extractArray()); + when(framer.deframe(eq(data))) + .thenReturn(new RawMessage(helloMessage.getCode(), helloMessage.getData())) + .thenReturn(null); + List out = new ArrayList<>(); + deFramer.decode(ctx, data, out); + + assertThat(connectFuture).isDone(); + assertThat(connectFuture).isCompletedExceptionally(); + assertThatThrownBy(connectFuture::get).hasCauseInstanceOf(IncompatiblePeerException.class); + assertThat(out).isEmpty(); + + // Next phase of pipeline should be setup + verify(pipeline, times(1)).addLast(any()); + } + + @Test + public void decode_shouldHandleImmediateDisconnectMessage() { + DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.TOO_MANY_PEERS); + ByteBuf disconnectData = Unpooled.wrappedBuffer(disconnectMessage.getData().extractArray()); + when(framer.deframe(eq(disconnectData))) + .thenReturn(new RawMessage(disconnectMessage.getCode(), disconnectMessage.getData())) + .thenReturn(null); + List out = new ArrayList<>(); + deFramer.decode(ctx, disconnectData, out); + + assertThat(connectFuture).isDone(); + assertThatThrownBy(connectFuture::get) + .hasCauseInstanceOf(PeerDisconnectedException.class) + .hasMessageContaining(disconnectMessage.getReason().toString()); + verify(ctx).close(); + assertThat(out).isEmpty(); + } + + @Test + public void decode_shouldHandleInvalidMessage() { + MessageData messageData = PingMessage.get(); + ByteBuf data = Unpooled.wrappedBuffer(messageData.getData().extractArray()); + when(framer.deframe(eq(data))) + .thenReturn(new RawMessage(messageData.getCode(), messageData.getData())) + .thenReturn(null); + ChannelFuture future = NettyMocks.channelFuture(true); + when(ctx.writeAndFlush(any())).thenReturn(future); + List out = new ArrayList<>(); + deFramer.decode(ctx, data, out); + + ArgumentCaptor outboundMessageArgumentCaptor = + ArgumentCaptor.forClass(OutboundMessage.class); + verify(ctx, times(1)).writeAndFlush(outboundMessageArgumentCaptor.capture()); + OutboundMessage outboundMessage = (OutboundMessage) outboundMessageArgumentCaptor.getValue(); + assertThat(outboundMessage.getCapability()).isNull(); + MessageData outboundMessageData = outboundMessage.getData(); + assertThat(outboundMessageData.getCode()).isEqualTo(WireMessageCodes.DISCONNECT); + assertThat(DisconnectMessage.readFrom(outboundMessageData).getReason()) + .isEqualTo(DisconnectReason.BREACH_OF_PROTOCOL); + + assertThat(connectFuture).isDone(); + assertThatThrownBy(connectFuture::get).hasCauseInstanceOf(BreachOfProtocolException.class); + verify(ctx).close(); + assertThat(out).isEmpty(); + } } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/NettyMocks.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/NettyMocks.java new file mode 100644 index 0000000000..a5bf07c9d6 --- /dev/null +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/NettyMocks.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +public class NettyMocks { + public static ChannelFuture channelFuture(final boolean completeImmediately) { + ChannelFuture channelFuture = mock(ChannelFuture.class); + when(channelFuture.addListener(any())) + .then( + (invocation) -> { + if (completeImmediately) { + GenericFutureListener> listener = invocation.getArgument(0); + listener.operationComplete(mock(Future.class)); + } + return channelFuture; + }); + + return channelFuture; + } +} diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/SubProtocolMock.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/SubProtocolMock.java new file mode 100644 index 0000000000..ce25d707ce --- /dev/null +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/testhelpers/SubProtocolMock.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.netty.testhelpers; + +import tech.pegasys.pantheon.ethereum.p2p.wire.SubProtocol; + +public class SubProtocolMock { + public static SubProtocol create() { + return create("eth"); + } + + public static SubProtocol create(final String name) { + return new SubProtocol() { + @Override + public String getName() { + return name; + } + + @Override + public int messageSpace(final int protocolVersion) { + return 8; + } + + @Override + public boolean isValidMessageCode(final int protocolVersion, final int code) { + return true; + } + + @Override + public String messageName(final int protocolVersion, final int code) { + return INVALID_MESSAGE_NAME; + } + }; + } +} diff --git a/gradle.properties b/gradle.properties index 4e45436780..7f2a19aa86 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ org.gradle.jvmargs=-Xmx1g -version=1.0.3-SNAPSHOT +version=1.1.0-SNAPSHOT