Skip to content

Commit

Permalink
Optimize performance for h2c protocol (#1400)
Browse files Browse the repository at this point in the history
* optimize performance for h2c protocol

* optimize performance for h2c protocol

---------

Co-authored-by: 呈铭 <beck.wcm@antgroup.com>
  • Loading branch information
wangchengming666 and 呈铭 authored May 14, 2024
1 parent 56ed259 commit 21acf28
Show file tree
Hide file tree
Showing 6 changed files with 479 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.common;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* 批量执行队列
*
* @author chengming
* @version BatchExecutorQueue.java, v 0.1 2024年02月28日 5:36 PM chengming
*/
public class BatchExecutorQueue<T> {

static final int DEFAULT_QUEUE_SIZE = 128;

private final Queue<T> queue;

private final AtomicBoolean scheduled;

private final int chunkSize;

public BatchExecutorQueue() {
this(DEFAULT_QUEUE_SIZE);
}

public BatchExecutorQueue(int chunkSize) {
this.queue = new ConcurrentLinkedQueue<>();
this.scheduled = new AtomicBoolean(false);
this.chunkSize = chunkSize;
}

public void enqueue(T message, Executor executor) {
queue.add(message);
scheduleFlush(executor);
}

protected void scheduleFlush(Executor executor) {
if (scheduled.compareAndSet(false, true)) {
executor.execute(() -> this.run(executor));
}
}

void run(Executor executor) {
try {
Queue<T> snapshot = new LinkedList<>();
T item;
while ((item = queue.poll()) != null) {
snapshot.add(item);
}
int i = 0;
boolean flushedOnce = false;
while ((item = snapshot.poll()) != null) {
if (snapshot.size() == 0) {
flushedOnce = false;
break;
}
if (i == chunkSize) {
i = 0;
flush(item);
flushedOnce = true;
} else {
prepare(item);
i++;
}
}
if ((i != 0 || !flushedOnce) && item != null) {
flush(item);
}
} finally {
scheduled.set(false);
if (!queue.isEmpty()) {
scheduleFlush(executor);
}
}
}

protected void prepare(T item) {
}

protected void flush(T item) {
}

/**
* UT only
* @return
*/
@Deprecated
public AtomicBoolean getScheduled() {
return scheduled;
}

/**
* UT only
* @return
*/
@Deprecated
public Queue<T> getQueue() {
return queue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.common;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

/**
* @author chengming
* @version BatchExecutorQueueTest.java, v 0.1 2024年03月01日 10:55 AM chengming
*/
public class BatchExecutorQueueTest {

private BatchExecutorQueue<Object> batchExecutorQueue;

@Mock
private Executor mockExecutor;

@Captor
private ArgumentCaptor<Runnable> runnableCaptor;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
batchExecutorQueue = spy(new BatchExecutorQueue<>(2));
}

@Test
public void testEnqueueAndRun() {
Object message1 = new Object();
Object message2 = new Object();
Object message3 = new Object();

// 测试 enqueue 方法以及是否通过 executor 调度了 run 方法
batchExecutorQueue.enqueue(message1, mockExecutor);
batchExecutorQueue.enqueue(message2, mockExecutor);
batchExecutorQueue.enqueue(message3, mockExecutor);

verify(mockExecutor, atLeastOnce()).execute(runnableCaptor.capture());

Runnable scheduledRunnable = runnableCaptor.getValue();
Assert.assertNotNull(scheduledRunnable);
scheduledRunnable.run();

// 验证队列是否为空
Assert.assertTrue(batchExecutorQueue.getQueue().isEmpty());
}

@Test
public void testScheduleFlush() {
AtomicBoolean scheduled = batchExecutorQueue.getScheduled();
Assert.assertFalse(scheduled.get());

batchExecutorQueue.scheduleFlush(mockExecutor);
Assert.assertTrue(scheduled.get());

// 验证是否有任务提交到 executor
verify(mockExecutor).execute(any(Runnable.class));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.transport.netty;

import com.alipay.sofa.rpc.common.BatchExecutorQueue;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;

/**
* @author chengming
* @version NettyBatchWriteQueue.java, v 0.1 2024年02月28日 5:42 PM chengming
*/
public class NettyBatchWriteQueue extends BatchExecutorQueue<NettyBatchWriteQueue.MessageTuple> {

private final Channel channel;

private final EventLoop eventLoop;

private NettyBatchWriteQueue(Channel channel) {
this.channel = channel;
this.eventLoop = channel.eventLoop();
}

public ChannelFuture enqueue(Object message) {
return enqueue(message, channel.newPromise());
}

public ChannelFuture enqueue(Object message, ChannelPromise channelPromise) {
MessageTuple messageTuple = new MessageTuple(message, channelPromise);
super.enqueue(messageTuple, eventLoop);
return messageTuple.channelPromise;
}

@Override
protected void prepare(MessageTuple item) {
channel.write(item.originMessage, item.channelPromise);
}

@Override
protected void flush(MessageTuple item) {
prepare(item);
channel.flush();
}

public static NettyBatchWriteQueue createWriteQueue(Channel channel) {
return new NettyBatchWriteQueue(channel);
}

static class MessageTuple {

private final Object originMessage;

private final ChannelPromise channelPromise;

public MessageTuple(Object originMessage, ChannelPromise channelPromise) {
this.originMessage = originMessage;
this.channelPromise = channelPromise;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ public class NettyChannel extends AbstractChannel<ChannelHandlerContext, Channel
*/
private Channel channel;

private NettyBatchWriteQueue writeQueue;

public NettyChannel(Channel channel) {
this.channel = channel;
this.writeQueue = NettyBatchWriteQueue.createWriteQueue(channel);
}

public NettyChannel(ChannelHandlerContext context) {
Expand Down Expand Up @@ -79,17 +82,14 @@ public InetSocketAddress localAddress() {

@Override
public void writeAndFlush(final Object obj) {
Future future = channel.writeAndFlush(obj);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future1) throws Exception {
if (!future1.isSuccess()) {
Throwable throwable = future1.cause();
LOGGER.error("Failed to send to "
+ NetUtils.channelToString(localAddress(), remoteAddress())
+ " for msg : " + obj
+ ", Cause by:", throwable);
}
Future future = writeQueue.enqueue(obj);
future.addListener((FutureListener) future1 -> {
if (!future1.isSuccess()) {
Throwable throwable = future1.cause();
LOGGER.error("Failed to send to "
+ NetUtils.channelToString(localAddress(), remoteAddress())
+ " for msg : " + obj
+ ", Cause by:", throwable);
}
});
}
Expand All @@ -98,4 +98,14 @@ public void operationComplete(Future future1) throws Exception {
public boolean isAvailable() {
return channel.isOpen() && channel.isActive();
}

/**
* UT only
* @param writeQueue
*/
@Deprecated
public void setWriteQueue(NettyBatchWriteQueue writeQueue) {
this.writeQueue = writeQueue;
}

}
Loading

0 comments on commit 21acf28

Please sign in to comment.