Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent subscription - Connection Closed #71

Open
jselamy opened this issue Apr 19, 2021 · 3 comments
Open

Persistent subscription - Connection Closed #71

jselamy opened this issue Apr 19, 2021 · 3 comments

Comments

@jselamy
Copy link

jselamy commented Apr 19, 2021

We are using this library in production.

We have an event handler that dispatches to downstream apps subscribed to a stream. We have a stream in particular that has a huge load of incoming event (IoT)

After a long period of time, for some reasons looks like we're dealing with an issue whereas it's not responding anymore any idea why ?
tmp_1618855140738

We are facing this issue for quitte a time now.

@jselamy
Copy link
Author

jselamy commented May 8, 2021

Any news ?

This is the error we get after a period of time on an active Persistent Subscription streaming events constantly

com.github.msemys.esjc.ConnectionClosedException: Connection was closed.
	at com.github.msemys.esjc.subscription.AbstractSubscriptionOperation.connectionClosed(AbstractSubscriptionOperation.java:219) ~[esjc-2.3.0.jar:2.3.0]
	at com.github.msemys.esjc.subscription.manager.SubscriptionManager.lambda$purgeSubscribedAndDropped$2(SubscriptionManager.java:55) ~[esjc-2.3.0.jar:2.3.0]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[na:na]
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[na:na]
	at java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[na:na]
	at com.github.msemys.esjc.subscription.manager.SubscriptionManager.purgeSubscribedAndDropped(SubscriptionManager.java:54) ~[esjc-2.3.0.jar:2.3.0]
	at com.github.msemys.esjc.EventStoreTcp.onTcpConnectionClosed(EventStoreTcp.java:818) ~[esjc-2.3.0.jar:2.3.0]
	at com.github.msemys.esjc.EventStoreTcp.lambda$null$12(EventStoreTcp.java:870) ~[esjc-2.3.0.jar:2.3.0]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1152) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:768) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:744) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1350) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:71) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:71) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.ChannelDuplexHandler.close(ChannelDuplexHandler.java:73) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:465) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at com.github.msemys.esjc.tcp.handler.HeartbeatHandler.lambda$userEventTriggered$0(HeartbeatHandler.java:61) ~[esjc-2.3.0.jar:2.3.0]
	at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

@msemys
Copy link
Owner

msemys commented May 12, 2021

hi, looks like tcp connection was dropped due es server was not responded to ping, that esjc client was sent. if es server is on heavy load it is possible that responses from server side lags.

try to increase esjc heartbeat interval/timeout e.g.:

EventStoreBuilder.newBuilder()
    .heartbeatInterval(Duration.ofSeconds(2))
    .heartbeatTimeout(Duration.ofSeconds(5))
    .build();

@jselamy
Copy link
Author

jselamy commented May 12, 2021

Ok thanks,

Since there's an automatic reconnection set to -1 the client reconnect to the ES.

Perhaps in this same app there's connections to Persistent Subscription and it's not re-subscribing to Persistent Subscriptions. We've added a little mechanism by putting a listener to EventStore instance

es.addListener(event -> {
            if(event instanceof ClientConnected) {
                if (log.isDebugEnabled()) {
                    log.debug("Received EventStore Event: {}", event.getClass().getSimpleName());
                }

                if (log.isInfoEnabled()) {
                    log.info("ClientConnected event received from TCP client sending a ApplicationEvent to notify");
                }

                publisher.publishEvent(new ESClientConnectedEvent(this));
            }
        })
@EventListener
    public void onApplicationEvent(ESClientConnectedEvent event) {
        if(event.isInitialized()) {
            final Map<String, PersistentSubscriber> subscribers = ctx.getBeansOfType(PersistentSubscriber.class);

            subscribers
                    .values()
                    .stream()
                    .peek(ps -> {
                        if(log.isInfoEnabled()) {
                            log.info("Performing a subscribe for stream {} with subscription_name {}", ps.getStream(), ps.getGroupName());
                        }
                    })
                    .forEach(PersistentSubscriber::subscribe);
        }
    }

Would there be a better way to do such a thing ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants