diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/BatchExecutorQueue.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/BatchExecutorQueue.java new file mode 100644 index 000000000..d8bdcd458 --- /dev/null +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/BatchExecutorQueue.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * 批量执行队列 + * + * @author chengming + * @version BatchExecutorQueue.java, v 0.1 2024年02月28日 5:36 PM chengming + */ +public class BatchExecutorQueue { + + static final int DEFAULT_QUEUE_SIZE = 128; + + private final Queue queue; + + private final AtomicBoolean scheduled; + + private final int chunkSize; + + public BatchExecutorQueue() { + this(DEFAULT_QUEUE_SIZE); + } + + public BatchExecutorQueue(int chunkSize) { + this.queue = new ConcurrentLinkedQueue<>(); + this.scheduled = new AtomicBoolean(false); + this.chunkSize = chunkSize; + } + + public void enqueue(T message, Executor executor) { + queue.add(message); + scheduleFlush(executor); + } + + protected void scheduleFlush(Executor executor) { + if (scheduled.compareAndSet(false, true)) { + executor.execute(() -> this.run(executor)); + } + } + + void run(Executor executor) { + try { + Queue snapshot = new LinkedList<>(); + T item; + while ((item = queue.poll()) != null) { + snapshot.add(item); + } + int i = 0; + boolean flushedOnce = false; + while ((item = snapshot.poll()) != null) { + if (snapshot.size() == 0) { + flushedOnce = false; + break; + } + if (i == chunkSize) { + i = 0; + flush(item); + flushedOnce = true; + } else { + prepare(item); + i++; + } + } + if ((i != 0 || !flushedOnce) && item != null) { + flush(item); + } + } finally { + scheduled.set(false); + if (!queue.isEmpty()) { + scheduleFlush(executor); + } + } + } + + protected void prepare(T item) { + } + + protected void flush(T item) { + } + + /** + * UT only + * @return + */ + @Deprecated + public AtomicBoolean getScheduled() { + return scheduled; + } + + /** + * UT only + * @return + */ + @Deprecated + public Queue getQueue() { + return queue; + } +} \ No newline at end of file diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/BatchExecutorQueueTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/BatchExecutorQueueTest.java new file mode 100644 index 000000000..d77b08b3b --- /dev/null +++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/BatchExecutorQueueTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * @author chengming + * @version BatchExecutorQueueTest.java, v 0.1 2024年03月01日 10:55 AM chengming + */ +public class BatchExecutorQueueTest { + + private BatchExecutorQueue batchExecutorQueue; + + @Mock + private Executor mockExecutor; + + @Captor + private ArgumentCaptor runnableCaptor; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + batchExecutorQueue = spy(new BatchExecutorQueue<>(2)); + } + + @Test + public void testEnqueueAndRun() { + Object message1 = new Object(); + Object message2 = new Object(); + Object message3 = new Object(); + + // 测试 enqueue 方法以及是否通过 executor 调度了 run 方法 + batchExecutorQueue.enqueue(message1, mockExecutor); + batchExecutorQueue.enqueue(message2, mockExecutor); + batchExecutorQueue.enqueue(message3, mockExecutor); + + verify(mockExecutor, atLeastOnce()).execute(runnableCaptor.capture()); + + Runnable scheduledRunnable = runnableCaptor.getValue(); + Assert.assertNotNull(scheduledRunnable); + scheduledRunnable.run(); + + // 验证队列是否为空 + Assert.assertTrue(batchExecutorQueue.getQueue().isEmpty()); + } + + @Test + public void testScheduleFlush() { + AtomicBoolean scheduled = batchExecutorQueue.getScheduled(); + Assert.assertFalse(scheduled.get()); + + batchExecutorQueue.scheduleFlush(mockExecutor); + Assert.assertTrue(scheduled.get()); + + // 验证是否有任务提交到 executor + verify(mockExecutor).execute(any(Runnable.class)); + } + +} diff --git a/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyBatchWriteQueue.java b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyBatchWriteQueue.java new file mode 100644 index 000000000..2ad3dbd1a --- /dev/null +++ b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyBatchWriteQueue.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.transport.netty; + +import com.alipay.sofa.rpc.common.BatchExecutorQueue; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; + +/** + * @author chengming + * @version NettyBatchWriteQueue.java, v 0.1 2024年02月28日 5:42 PM chengming + */ +public class NettyBatchWriteQueue extends BatchExecutorQueue { + + private final Channel channel; + + private final EventLoop eventLoop; + + private NettyBatchWriteQueue(Channel channel) { + this.channel = channel; + this.eventLoop = channel.eventLoop(); + } + + public ChannelFuture enqueue(Object message) { + return enqueue(message, channel.newPromise()); + } + + public ChannelFuture enqueue(Object message, ChannelPromise channelPromise) { + MessageTuple messageTuple = new MessageTuple(message, channelPromise); + super.enqueue(messageTuple, eventLoop); + return messageTuple.channelPromise; + } + + @Override + protected void prepare(MessageTuple item) { + channel.write(item.originMessage, item.channelPromise); + } + + @Override + protected void flush(MessageTuple item) { + prepare(item); + channel.flush(); + } + + public static NettyBatchWriteQueue createWriteQueue(Channel channel) { + return new NettyBatchWriteQueue(channel); + } + + static class MessageTuple { + + private final Object originMessage; + + private final ChannelPromise channelPromise; + + public MessageTuple(Object originMessage, ChannelPromise channelPromise) { + this.originMessage = originMessage; + this.channelPromise = channelPromise; + } + } +} diff --git a/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyChannel.java b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyChannel.java index 4b71a9ccb..590bfde39 100644 --- a/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyChannel.java +++ b/remoting/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/netty/NettyChannel.java @@ -48,8 +48,11 @@ public class NettyChannel extends AbstractChannel { + if (!future1.isSuccess()) { + Throwable throwable = future1.cause(); + LOGGER.error("Failed to send to " + + NetUtils.channelToString(localAddress(), remoteAddress()) + + " for msg : " + obj + + ", Cause by:", throwable); } }); } @@ -98,4 +98,14 @@ public void operationComplete(Future future1) throws Exception { public boolean isAvailable() { return channel.isOpen() && channel.isActive(); } + + /** + * UT only + * @param writeQueue + */ + @Deprecated + public void setWriteQueue(NettyBatchWriteQueue writeQueue) { + this.writeQueue = writeQueue; + } + } diff --git a/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyBatchWriteQueueTest.java b/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyBatchWriteQueueTest.java new file mode 100644 index 000000000..446544cfc --- /dev/null +++ b/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyBatchWriteQueueTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.transport.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +/** + * @author chengming + * @version NettyBatchWriteQueueTest.java, v 0.1 2024年03月01日 11:06 AM chengming + */ +public class NettyBatchWriteQueueTest { + + @Mock + private Channel mockChannel; + + @Mock + private EventLoop mockEventLoop; + + @Mock + private ChannelPromise mockChannelPromise; + + private NettyBatchWriteQueue nettyBatchWriteQueue; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(mockChannel.eventLoop()).thenReturn(mockEventLoop); + when(mockChannel.newPromise()).thenReturn(mockChannelPromise); + nettyBatchWriteQueue = NettyBatchWriteQueue.createWriteQueue(mockChannel); + } + + @Test + public void testEnqueue() { + Object message = new Object(); + ChannelFuture future = nettyBatchWriteQueue.enqueue(message); + Assert.assertNotNull(future); + + Mockito.verify(mockEventLoop).execute(any(Runnable.class)); + } + + @Test + public void testPrepare() { + Object message = new Object(); + NettyBatchWriteQueue.MessageTuple messageTuple = new NettyBatchWriteQueue.MessageTuple(message, + mockChannelPromise); + nettyBatchWriteQueue.prepare(messageTuple); + + Mockito.verify(mockChannel).write(eq(message), eq(mockChannelPromise)); + } + + @Test + public void testFlush() { + Object message = new Object(); + NettyBatchWriteQueue.MessageTuple messageTuple = new NettyBatchWriteQueue.MessageTuple(message, + mockChannelPromise); + nettyBatchWriteQueue.flush(messageTuple); + + Mockito.verify(mockChannel).write(eq(message), eq(mockChannelPromise)); + Mockito.verify(mockChannel).flush(); + } +} \ No newline at end of file diff --git a/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyChannelTest.java b/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyChannelTest.java new file mode 100644 index 000000000..e5be4aec6 --- /dev/null +++ b/remoting/remoting-http/src/test/java/com/alipay/sofa/rpc/transport/netty/NettyChannelTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.transport.netty; + +import com.alipay.sofa.rpc.common.utils.NetUtils; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.net.InetSocketAddress; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +/** + * @author chengming + * @version NettyChannelTest.java, v 0.1 2024年02月29日 3:18 PM chengming + */ +public class NettyChannelTest { + + @Mock + private Channel mockChannel = Mockito.mock(Channel.class); + + @Mock + private ChannelHandlerContext mockContext = Mockito.mock(ChannelHandlerContext.class); + + @Mock + private NettyBatchWriteQueue mockWriteQueue = Mockito.mock(NettyBatchWriteQueue.class); + + @Mock + private ChannelFuture mockFuture = Mockito.mock(ChannelFuture.class); + + private NettyChannel nettyChannel; + + @Before + public void setUp() { + Mockito.when(mockChannel.eventLoop()).thenReturn(Mockito.mock(EventLoop.class)); + Mockito.when(mockChannel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT); + when(mockContext.channel()).thenReturn(mockChannel); + when(mockWriteQueue.enqueue(any())).thenReturn(mockFuture); + nettyChannel = new NettyChannel(mockChannel); + nettyChannel.setWriteQueue(mockWriteQueue); + } + + @Test + public void testRunSuccess() throws Exception { + nettyChannel.writeAndFlush("111"); + + Mockito.verify(mockWriteQueue).enqueue("111"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(GenericFutureListener.class); + Mockito.verify(mockFuture).addListener(captor.capture()); + + // 模拟 FutureListener 的回调 + GenericFutureListener> listener = captor.getValue(); + listener.operationComplete((Future) mockFuture); + + // 验证没有错误日志被记录(因为操作是成功的) + Mockito.verify(Mockito.mock(NetUtils.class), times(10)); + NetUtils.channelToString(any(InetSocketAddress.class), any(InetSocketAddress.class)); + } + +}