From 6004ed9892f891684f58e855e0c11008bc221a06 Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Wed, 11 Dec 2024 22:50:37 +0000 Subject: [PATCH 1/7] Recreate notification consumer queue when empty Signed-off-by: Lawrence Lee --- common/notificationconsumer.cpp | 34 +++++++++++++++++++++++---------- common/notificationconsumer.h | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index 4edfbfee4..501d7a54c 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -17,6 +17,7 @@ swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, const st { SWSS_LOG_ENTER(); + m_queue = std::make_shared>(); while (true) { try @@ -105,12 +106,12 @@ uint64_t swss::NotificationConsumer::readData() bool swss::NotificationConsumer::hasData() { - return m_queue.size() > 0; + return m_queue->size() > 0; } bool swss::NotificationConsumer::hasCachedData() { - return m_queue.size() > 1; + return m_queue->size() > 1; } void swss::NotificationConsumer::processReply(redisReply *reply) @@ -138,21 +139,34 @@ void swss::NotificationConsumer::processReply(redisReply *reply) SWSS_LOG_DEBUG("got message: %s", msg.c_str()); - m_queue.push(msg); + m_queue->push(msg); } void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector &values) { SWSS_LOG_ENTER(); - if (m_queue.empty()) + if (m_queue->empty()) { SWSS_LOG_ERROR("notification queue is empty, can't pop"); throw std::runtime_error("notification queue is empty, can't pop"); } - std::string msg = m_queue.front(); - m_queue.pop(); + std::string msg = m_queue->front(); + m_queue->pop(); + + if (m_queue->empty()) + { + /*** + * If there is a burst of notifications that causes the queue to grow in size, + * memory allocated by the queue will not be released even after the all items + * have been popped. + * + * Force the memory to be released by destroying existing queue and creating a new one. + */ + m_queue = nullptr; + m_queue = std::make_shared>(); + } values.clear(); JSon::readJson(msg, values); @@ -170,9 +184,9 @@ void swss::NotificationConsumer::pops(std::deque &vkco) SWSS_LOG_ENTER(); vkco.clear(); - while(!m_queue.empty()) + while(!m_queue->empty()) { - while(!m_queue.empty()) + while(!m_queue->empty()) { std::string op; std::string data; @@ -198,7 +212,7 @@ void swss::NotificationConsumer::pops(std::deque &vkco) int swss::NotificationConsumer::peek() { SWSS_LOG_ENTER(); - if (m_queue.empty()) + if (m_queue->empty()) { // Peek for more data in redis socket int rc = swss::peekRedisContext(m_subscribe->getContext()); @@ -209,5 +223,5 @@ int swss::NotificationConsumer::peek() readData(); } - return m_queue.empty() ? 0 : 1; + return m_queue->empty() ? 0 : 1; } diff --git a/common/notificationconsumer.h b/common/notificationconsumer.h index f5011ffc3..5300ef9be 100644 --- a/common/notificationconsumer.h +++ b/common/notificationconsumer.h @@ -55,7 +55,7 @@ class NotificationConsumer : public Selectable swss::DBConnector *m_db; swss::DBConnector *m_subscribe; std::string m_channel; - std::queue m_queue; + std::shared_ptr> m_queue; }; } From 8091aca41fd4f445c8921a1d3460874cb132d690 Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Fri, 10 Jan 2025 20:19:21 +0000 Subject: [PATCH 2/7] add logging Signed-off-by: Lawrence Lee --- common/notificationconsumer.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index 501d7a54c..5a31aa193 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -17,6 +17,7 @@ swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, const st { SWSS_LOG_ENTER(); + SWSS_LOG_NOTICE("Creating notification consumer"); m_queue = std::make_shared>(); while (true) { @@ -140,6 +141,7 @@ void swss::NotificationConsumer::processReply(redisReply *reply) SWSS_LOG_DEBUG("got message: %s", msg.c_str()); m_queue->push(msg); + SWSS_LOG_INFO("got message: %s, queue size is %zu", msg.c_str(), m_queue->size()); } void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector &values) @@ -164,6 +166,7 @@ void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::ve * * Force the memory to be released by destroying existing queue and creating a new one. */ + SWSS_LOG_INFO("Queue is empty, recreating"); m_queue = nullptr; m_queue = std::make_shared>(); } From 729426da1d5063c89cb960905fa4dcf4f242f844 Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Fri, 10 Jan 2025 21:50:59 +0000 Subject: [PATCH 3/7] explicitly delete old queue Signed-off-by: Lawrence Lee --- common/notificationconsumer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index 5a31aa193..a7ee94a7d 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -167,6 +167,7 @@ void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::ve * Force the memory to be released by destroying existing queue and creating a new one. */ SWSS_LOG_INFO("Queue is empty, recreating"); + m_queue.reset(); m_queue = nullptr; m_queue = std::make_shared>(); } From b0a1021f541759d84db341e865267a7515eb9d5c Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Sat, 11 Jan 2025 02:51:25 +0000 Subject: [PATCH 4/7] improve logging Signed-off-by: Lawrence Lee --- common/notificationconsumer.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index a7ee94a7d..7b117d691 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -17,7 +17,7 @@ swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, const st { SWSS_LOG_ENTER(); - SWSS_LOG_NOTICE("Creating notification consumer"); + SWSS_LOG_NOTICE("Creating notification consumer for %s", m_channel.c_str()); m_queue = std::make_shared>(); while (true) { @@ -141,7 +141,7 @@ void swss::NotificationConsumer::processReply(redisReply *reply) SWSS_LOG_DEBUG("got message: %s", msg.c_str()); m_queue->push(msg); - SWSS_LOG_INFO("got message: %s, queue size is %zu", msg.c_str(), m_queue->size()); + SWSS_LOG_INFO("%s queue size is %zu", m_channel.c_str(), m_queue->size()); } void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector &values) @@ -166,7 +166,7 @@ void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::ve * * Force the memory to be released by destroying existing queue and creating a new one. */ - SWSS_LOG_INFO("Queue is empty, recreating"); + SWSS_LOG_INFO("%s queue is empty, recreating", m_channel.c_str()); m_queue.reset(); m_queue = nullptr; m_queue = std::make_shared>(); From 54519551bd4da7c54d15f38a6fdc7c4dbadb8759 Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Mon, 13 Jan 2025 20:29:05 +0000 Subject: [PATCH 5/7] call malloc trim Signed-off-by: Lawrence Lee --- common/notificationconsumer.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index 7b117d691..c88f201c3 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -2,6 +2,7 @@ #include #include +#include #include "redisapi.h" #define NOTIFICATION_SUBSCRIBE_TIMEOUT (1000) @@ -167,6 +168,15 @@ void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::ve * Force the memory to be released by destroying existing queue and creating a new one. */ SWSS_LOG_INFO("%s queue is empty, recreating", m_channel.c_str()); + int rv = malloc_trim(0); + if (rv == 1) + { + SWSS_LOG_INFO("Memory released successfully"); + } + else + { + SWSS_LOG_INFO("No memory released by malloc_trim"); + } m_queue.reset(); m_queue = nullptr; m_queue = std::make_shared>(); From a060b1c925898013d1aef1950d1c5633b64350fe Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Mon, 13 Jan 2025 22:05:01 +0000 Subject: [PATCH 6/7] revert queue changes Signed-off-by: Lawrence Lee --- common/notificationconsumer.cpp | 34 ++++++++++++++------------------- common/notificationconsumer.h | 2 +- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index c88f201c3..7b324aba7 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -18,8 +18,6 @@ swss::NotificationConsumer::NotificationConsumer(swss::DBConnector *db, const st { SWSS_LOG_ENTER(); - SWSS_LOG_NOTICE("Creating notification consumer for %s", m_channel.c_str()); - m_queue = std::make_shared>(); while (true) { try @@ -108,12 +106,12 @@ uint64_t swss::NotificationConsumer::readData() bool swss::NotificationConsumer::hasData() { - return m_queue->size() > 0; + return m_queue.size() > 0; } bool swss::NotificationConsumer::hasCachedData() { - return m_queue->size() > 1; + return m_queue.size() > 1; } void swss::NotificationConsumer::processReply(redisReply *reply) @@ -141,24 +139,23 @@ void swss::NotificationConsumer::processReply(redisReply *reply) SWSS_LOG_DEBUG("got message: %s", msg.c_str()); - m_queue->push(msg); - SWSS_LOG_INFO("%s queue size is %zu", m_channel.c_str(), m_queue->size()); + m_queue.push(msg); } void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::vector &values) { SWSS_LOG_ENTER(); - if (m_queue->empty()) + if (m_queue.empty()) { SWSS_LOG_ERROR("notification queue is empty, can't pop"); throw std::runtime_error("notification queue is empty, can't pop"); } - std::string msg = m_queue->front(); - m_queue->pop(); + std::string msg = m_queue.front(); + m_queue.pop(); - if (m_queue->empty()) + if (m_queue.empty()) { /*** * If there is a burst of notifications that causes the queue to grow in size, @@ -167,19 +164,16 @@ void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::ve * * Force the memory to be released by destroying existing queue and creating a new one. */ - SWSS_LOG_INFO("%s queue is empty, recreating", m_channel.c_str()); + SWSS_LOG_DEBUG("%s queue is empty, calling malloc_trim()", m_channel.c_str()); int rv = malloc_trim(0); if (rv == 1) { - SWSS_LOG_INFO("Memory released successfully"); + SWSS_LOG_DEBUG("Memory released successfully"); } else { - SWSS_LOG_INFO("No memory released by malloc_trim"); + SWSS_LOG_DEBUG("No memory released by malloc_trim"); } - m_queue.reset(); - m_queue = nullptr; - m_queue = std::make_shared>(); } values.clear(); @@ -198,9 +192,9 @@ void swss::NotificationConsumer::pops(std::deque &vkco) SWSS_LOG_ENTER(); vkco.clear(); - while(!m_queue->empty()) + while(!m_queue.empty()) { - while(!m_queue->empty()) + while(!m_queue.empty()) { std::string op; std::string data; @@ -226,7 +220,7 @@ void swss::NotificationConsumer::pops(std::deque &vkco) int swss::NotificationConsumer::peek() { SWSS_LOG_ENTER(); - if (m_queue->empty()) + if (m_queue.empty()) { // Peek for more data in redis socket int rc = swss::peekRedisContext(m_subscribe->getContext()); @@ -237,5 +231,5 @@ int swss::NotificationConsumer::peek() readData(); } - return m_queue->empty() ? 0 : 1; + return m_queue.empty() ? 0 : 1; } diff --git a/common/notificationconsumer.h b/common/notificationconsumer.h index 5300ef9be..f5011ffc3 100644 --- a/common/notificationconsumer.h +++ b/common/notificationconsumer.h @@ -55,7 +55,7 @@ class NotificationConsumer : public Selectable swss::DBConnector *m_db; swss::DBConnector *m_subscribe; std::string m_channel; - std::shared_ptr> m_queue; + std::queue m_queue; }; } From 290cbfc0b43cf0aff1f145e700994c1ebf247dec Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Mon, 13 Jan 2025 22:19:53 +0000 Subject: [PATCH 7/7] change log level Signed-off-by: Lawrence Lee --- common/notificationconsumer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/notificationconsumer.cpp b/common/notificationconsumer.cpp index 7b324aba7..edfe7f3be 100644 --- a/common/notificationconsumer.cpp +++ b/common/notificationconsumer.cpp @@ -164,7 +164,7 @@ void swss::NotificationConsumer::pop(std::string &op, std::string &data, std::ve * * Force the memory to be released by destroying existing queue and creating a new one. */ - SWSS_LOG_DEBUG("%s queue is empty, calling malloc_trim()", m_channel.c_str()); + SWSS_LOG_INFO("%s queue is empty, calling malloc_trim()", m_channel.c_str()); int rv = malloc_trim(0); if (rv == 1) {