From 8d76432595e2325b65daa3686f6a88de59147098 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Sat, 23 Sep 2023 18:37:22 +0800 Subject: [PATCH 1/3] bugfix collector can not auto reconnect when channel idle --- .../dispatch/entrance/CollectServer.java | 16 ++++++---- .../manager/scheduler/netty/ManageServer.java | 29 ++++++++++++++----- .../process/CollectorOnlineProcessor.java | 1 + .../netty/process/HeartbeatProcessor.java | 5 ++-- .../remoting/netty/NettyRemotingServer.java | 2 +- 5 files changed, 35 insertions(+), 18 deletions(-) diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java index 9f6876d0bfd..aec4576f046 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java @@ -132,12 +132,16 @@ public void onChannelActive(Channel channel) { scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); // schedule send heartbeat message scheduledExecutor.scheduleAtFixedRate(() -> { - ClusterMsg.Message heartbeat = ClusterMsg.Message.newBuilder() - .setIdentity(identity) - .setType(ClusterMsg.MessageType.HEARTBEAT) - .build(); - CollectServer.this.sendMsg(heartbeat); - log.info("collector send cluster server heartbeat, time: {}.", System.currentTimeMillis()); + try { + ClusterMsg.Message heartbeat = ClusterMsg.Message.newBuilder() + .setIdentity(identity) + .setType(ClusterMsg.MessageType.HEARTBEAT) + .build(); + CollectServer.this.sendMsg(heartbeat); + log.info("collector send cluster server heartbeat, time: {}.", System.currentTimeMillis()); + } catch (Exception e) { + log.error(e.getMessage()); + } }, 5, 5, TimeUnit.SECONDS); } } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java index d52fe527396..6451d9b4963 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java @@ -66,7 +66,7 @@ private void init(final SchedulerProperties schedulerProperties, final CommonThr this.remotingServer.registerHook(Lists.newArrayList(new NettyHook() { @Override public void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message message) { - ManageServer.this.clientChannelTable.put(message.getIdentity(), ctx.channel()); + // do something before processor list } })); @@ -84,13 +84,17 @@ public void start() { this.remotingServer.start(); this.channelSchedule.scheduleAtFixedRate(() -> { - ManageServer.this.clientChannelTable.forEach((collector, channel) -> { - if (!channel.isActive()) { - channel.closeFuture(); - ManageServer.this.clientChannelTable.remove(collector); - ManageServer.this.collectorAndJobScheduler.collectorGoOffline(collector); - } - }); + try { + this.clientChannelTable.forEach((collector, channel) -> { + if (!channel.isActive()) { + channel.closeFuture(); + this.clientChannelTable.remove(collector); + this.collectorAndJobScheduler.collectorGoOffline(collector); + } + }); + } catch (Exception e) { + log.error(e.getMessage(), e); + } }, 10, 3, TimeUnit.SECONDS); } @@ -113,6 +117,14 @@ public Channel getChannel(final String identity) { return channel; } + public void addChannel(final String identity, Channel channel) { + Channel preChannel = this.clientChannelTable.get(identity); + if (preChannel != null && channel.isActive()) { + preChannel.close(); + } + this.clientChannelTable.put(identity, channel); + } + public void closeChannel(final String identity) { Channel channel = this.getChannel(identity); if (channel != null) { @@ -165,6 +177,7 @@ public void onChannelIdle(Channel channel) { if (identity != null) { ManageServer.this.clientChannelTable.remove(identity); ManageServer.this.collectorAndJobScheduler.collectorGoOffline(identity); + channel.close(); log.info("handle idle event triggered. the client {} is going offline.", identity); } } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java index 7e2292654c9..cf58b3edb85 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java @@ -24,6 +24,7 @@ public ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message m String collector = message.getIdentity(); log.info("the collector {} actively requests to go online.", collector); CollectorInfo collectorInfo = JsonUtil.fromJson(message.getMsg(), CollectorInfo.class); + this.manageServer.addChannel(collector, ctx.channel()); this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(collector, collectorInfo); return null; } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java index 67e3f1fb0ab..9c0a30116c6 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java @@ -23,11 +23,10 @@ public ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message m String identity = message.getIdentity(); boolean isChannelExist = this.manageServer.isChannelExist(identity); if (!isChannelExist) { - log.info("the collector {} has reconnected and to go online.", identity); - this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(identity, null); + log.info("the collector {} is not online.", identity); } if (log.isDebugEnabled()) { - log.debug("server receive collector heartbeat"); + log.debug("server receive collector {} heartbeat", message.getIdentity()); } return ClusterMsg.Message.newBuilder() .setType(ClusterMsg.MessageType.HEARTBEAT) diff --git a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java index 2f2948de834..ead6b11e2fe 100644 --- a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java @@ -138,7 +138,7 @@ private void initChannel(final SocketChannel channel) { pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); // idle state - pipeline.addLast(new IdleStateHandler(0, 0, 30)); + pipeline.addLast(new IdleStateHandler(0, 0, 100)); pipeline.addLast(new NettyServerHandler()); } From 0206bcebc88069a8b5136b7a8086bf8e89969de2 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Sun, 24 Sep 2023 00:32:58 +0800 Subject: [PATCH 2/3] add log and remove hook --- .../collector/dispatch/entrance/CollectServer.java | 3 ++- .../manager/scheduler/netty/ManageServer.java | 13 +------------ 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java index aec4576f046..bb88fac962c 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java @@ -135,12 +135,13 @@ public void onChannelActive(Channel channel) { try { ClusterMsg.Message heartbeat = ClusterMsg.Message.newBuilder() .setIdentity(identity) + .setDirection(ClusterMsg.Direction.REQUEST) .setType(ClusterMsg.MessageType.HEARTBEAT) .build(); CollectServer.this.sendMsg(heartbeat); log.info("collector send cluster server heartbeat, time: {}.", System.currentTimeMillis()); } catch (Exception e) { - log.error(e.getMessage()); + log.error("schedule send heartbeat to server error.{}", e.getMessage()); } }, 5, 5, TimeUnit.SECONDS); } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java index 6451d9b4963..0652551dfd8 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java @@ -1,8 +1,6 @@ package org.dromara.hertzbeat.manager.scheduler.netty; -import com.google.common.collect.Lists; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.dromara.hertzbeat.common.entity.message.ClusterMsg; import org.dromara.hertzbeat.common.support.CommonThreadPool; @@ -15,7 +13,6 @@ import org.dromara.hertzbeat.manager.scheduler.SchedulerProperties; import org.dromara.hertzbeat.remoting.RemotingServer; import org.dromara.hertzbeat.remoting.event.NettyEventListener; -import org.dromara.hertzbeat.remoting.netty.NettyHook; import org.dromara.hertzbeat.remoting.netty.NettyRemotingServer; import org.dromara.hertzbeat.remoting.netty.NettyServerConfig; import org.springframework.boot.CommandLineRunner; @@ -61,15 +58,7 @@ private void init(final SchedulerProperties schedulerProperties, final CommonThr nettyServerConfig.setPort(schedulerProperties.getServer().getPort()); NettyEventListener nettyEventListener = new ManageNettyEventListener(); this.remotingServer = new NettyRemotingServer(nettyServerConfig, nettyEventListener, threadPool); - - // register hook - this.remotingServer.registerHook(Lists.newArrayList(new NettyHook() { - @Override - public void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message message) { - // do something before processor list - } - })); - + // register processor this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new HeartbeatProcessor(this)); this.remotingServer.registerProcessor(ClusterMsg.MessageType.GO_ONLINE, new CollectorOnlineProcessor(this)); From 9e4ee842f3aa429df794154033aea33e44d62019 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Sun, 24 Sep 2023 00:47:05 +0800 Subject: [PATCH 3/3] support config IdleStateEventTriggerTime --- .../manager/scheduler/SchedulerProperties.java | 15 +++++++++++++++ .../manager/scheduler/netty/ManageServer.java | 1 + .../remoting/netty/NettyRemotingServer.java | 2 +- .../remoting/netty/NettyServerConfig.java | 2 ++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java index 454155e845e..a094b3081df 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java @@ -26,6 +26,13 @@ public static class ServerProperties { private boolean enabled = true; private int port = 1158; + + /** + * an IdleStateEvent whose state is IdleState.ALL_IDLE will be triggered when neither read nor write + * was performed for the specified period of time. + * unit: s + */ + private int idleStateEventTriggerTime = 100; public boolean isEnabled() { return enabled; @@ -42,6 +49,14 @@ public int getPort() { public void setPort(int port) { this.port = port; } + + public int getIdleStateEventTriggerTime() { + return idleStateEventTriggerTime; + } + + public void setIdleStateEventTriggerTime(int idleStateEventTriggerTime) { + this.idleStateEventTriggerTime = idleStateEventTriggerTime; + } } } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java index 0652551dfd8..3111ed7ba9e 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java @@ -56,6 +56,7 @@ public ManageServer(final SchedulerProperties schedulerProperties, private void init(final SchedulerProperties schedulerProperties, final CommonThreadPool threadPool) { NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setPort(schedulerProperties.getServer().getPort()); + nettyServerConfig.setIdleStateEventTriggerTime(schedulerProperties.getServer().getIdleStateEventTriggerTime()); NettyEventListener nettyEventListener = new ManageNettyEventListener(); this.remotingServer = new NettyRemotingServer(nettyServerConfig, nettyEventListener, threadPool); diff --git a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java index ead6b11e2fe..a2723ac87d4 100644 --- a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java @@ -138,7 +138,7 @@ private void initChannel(final SocketChannel channel) { pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); // idle state - pipeline.addLast(new IdleStateHandler(0, 0, 100)); + pipeline.addLast(new IdleStateHandler(0, 0, nettyServerConfig.getIdleStateEventTriggerTime())); pipeline.addLast(new NettyServerHandler()); } diff --git a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java index 51c3160297a..369117b372e 100644 --- a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java @@ -27,4 +27,6 @@ public class NettyServerConfig { private Integer port; + private Integer idleStateEventTriggerTime = 100; + }