From e6163552c75bb34aacfffde062972a1e021c8e6b Mon Sep 17 00:00:00 2001 From: billowqiu Date: Fri, 21 Jan 2022 17:52:25 +0800 Subject: [PATCH 1/8] Fix closed connection leak --- pulsar/internal/connection_pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index cadeed1fa7..4787ba158d 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -81,6 +81,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U // remove stale/failed connection if conn.closed() { delete(p.connections, key) + conn.Close() p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) conn = nil // set to nil so we create a new one From b949459de7c8155609b245cae9c16c82990a0bec Mon Sep 17 00:00:00 2001 From: billowqiu Date: Fri, 21 Jan 2022 17:52:25 +0800 Subject: [PATCH 2/8] Fix closed connection leak --- pulsar/internal/connection_pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index cadeed1fa7..4787ba158d 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -81,6 +81,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U // remove stale/failed connection if conn.closed() { delete(p.connections, key) + conn.Close() p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) conn = nil // set to nil so we create a new one From 566623b235d5eb8bd42d4cacc9d8c76ded4b9317 Mon Sep 17 00:00:00 2001 From: billowqiu Date: Sat, 5 Feb 2022 17:17:00 +0800 Subject: [PATCH 3/8] bugfix: runEventsLoop for reconnect early exit --- pulsar/producer_partition.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3f1e54b6a5..b5c4e7e9fa 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -352,12 +352,14 @@ func (p *partitionProducer) reconnectToBroker() { func (p *partitionProducer) runEventsLoop() { go func() { - select { - case <-p.closeCh: - return - case <-p.connectClosedCh: - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() + for { + select { + case <-p.closeCh: + return + case <-p.connectClosedCh: + p.log.Info("runEventsLoop will reconnect in producer") + p.reconnectToBroker() + } } }() From 6a1b11b9e935181fc6c808e13061f710d8a06e22 Mon Sep 17 00:00:00 2001 From: billowqiu Date: Sat, 5 Feb 2022 17:26:04 +0800 Subject: [PATCH 4/8] [optimize] add log when reconnect --- pulsar/consumer_partition.go | 3 +++ pulsar/internal/connection_pool.go | 6 +++--- pulsar/producer_partition.go | 3 +++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 9bd4a94556..04a39c5b1b 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -894,6 +894,7 @@ func (pc *partitionConsumer) runEventsLoop() { for { select { case <-pc.closeCh: + pc.log.Info("close consumer, exit reconnect") return case <-pc.connectClosedCh: pc.log.Debug("runEventsLoop will reconnect") @@ -992,6 +993,7 @@ func (pc *partitionConsumer) reconnectToBroker() { for maxRetry != 0 { if pc.getConsumerState() != consumerReady { // Consumer is already closing + pc.log.Info("consumer state not ready, exit reconnect") return } @@ -1005,6 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() { pc.log.Info("Reconnected consumer to broker") return } + pc.log.WithError(err).Error("Failed to create consumer at reconnect") errMsg := err.Error() if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 4787ba158d..ceaa7deec4 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -75,15 +75,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U p.Lock() conn, ok := p.connections[key] if ok { - p.log.Infof("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v", + p.log.Debugf("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) // remove stale/failed connection if conn.closed() { delete(p.connections, key) conn.Close() - p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", - key, conn.logicalAddr, conn.physicalAddr) + p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v, conn ID=%s", + key, conn.logicalAddr, conn.physicalAddr, conn.ID()) conn = nil // set to nil so we create a new one } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b5c4e7e9fa..84703991b7 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -324,6 +324,7 @@ func (p *partitionProducer) reconnectToBroker() { for maxRetry != 0 { if p.getProducerState() != producerReady { // Producer is already closing + p.log.Info("producer state not ready, exit reconnect") return } @@ -337,6 +338,7 @@ func (p *partitionProducer) reconnectToBroker() { p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker") return } + p.log.WithError(err).Error("Failed to create producer at reconnect") errMsg := err.Error() if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. @@ -355,6 +357,7 @@ func (p *partitionProducer) runEventsLoop() { for { select { case <-p.closeCh: + p.log.Info("close producer, exit reconnect") return case <-p.connectClosedCh: p.log.Info("runEventsLoop will reconnect in producer") From bb283a57a1b40ca82e21660b77a59febd354723b Mon Sep 17 00:00:00 2001 From: billowqiu Date: Sat, 5 Feb 2022 17:45:49 +0800 Subject: [PATCH 5/8] [Bugfix]fix panic --- pulsar/internal/connection_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index ceaa7deec4..e717a228dd 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -80,10 +80,10 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U // remove stale/failed connection if conn.closed() { - delete(p.connections, key) - conn.Close() p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v, conn ID=%s", key, conn.logicalAddr, conn.physicalAddr, conn.ID()) + delete(p.connections, key) + conn.Close() conn = nil // set to nil so we create a new one } } From 15d2b07ffa43d5db5139be07d3c7ce114129e143 Mon Sep 17 00:00:00 2001 From: billowqiu Date: Sat, 5 Feb 2022 17:57:04 +0800 Subject: [PATCH 6/8] [Bugfix]remove log conn ID --- pulsar/internal/connection_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index e717a228dd..db67c25cd6 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -80,8 +80,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U // remove stale/failed connection if conn.closed() { - p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v, conn ID=%s", - key, conn.logicalAddr, conn.physicalAddr, conn.ID()) + p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", + key, conn.logicalAddr, conn.physicalAddr) delete(p.connections, key) conn.Close() conn = nil // set to nil so we create a new one From cca8635e7d6b05af4c3bb5d8c4c9677a28c5d464 Mon Sep 17 00:00:00 2001 From: billowqiu Date: Mon, 7 Feb 2022 19:34:41 +0800 Subject: [PATCH 7/8] [optimize]Distinguish failed create producer log --- pulsar/producer_partition.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 84703991b7..913c33c68a 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -142,7 +142,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } err := p.grabCnx() if err != nil { - logger.WithError(err).Error("Failed to create producer") + logger.WithError(err).Error("Failed to create producer at newPartitionProducer") return nil, err } @@ -209,7 +209,7 @@ func (p *partitionProducer) grabCnx() error { } res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { - p.log.WithError(err).Error("Failed to create producer") + p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") return err } From 3862295c708019f7d871ebc8e5fc9a6fdca82205 Mon Sep 17 00:00:00 2001 From: billowqiu Date: Thu, 22 Sep 2022 00:23:59 +0800 Subject: [PATCH 8/8] fix ack/nack use closed consumer --- pulsar/consumer_impl.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2328ca882b..7c57416b5d 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -459,7 +459,9 @@ func (c *consumer) AckID(msgID MessageID) error { } if mid.consumer != nil { - return mid.Ack() + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + return mid.Ack() + } } return c.consumers[mid.partitionIdx].AckID(mid) @@ -522,8 +524,10 @@ func (c *consumer) Nack(msg Message) { } if mid.consumer != nil { - mid.Nack() - return + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + mid.Nack() + return + } } c.consumers[mid.partitionIdx].NackMsg(msg) return @@ -539,8 +543,10 @@ func (c *consumer) NackID(msgID MessageID) { } if mid.consumer != nil { - mid.Nack() - return + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + mid.Nack() + return + } } c.consumers[mid.partitionIdx].NackID(mid)