From 7879721016a01510a210c9dbac9c345fafda1cf1 Mon Sep 17 00:00:00 2001 From: kcudnik Date: Fri, 23 Jul 2021 18:13:05 +0200 Subject: [PATCH 1/3] [logger] Make map access thread safe and proper terminate thread Signed-off-by: kcudnik --- common/concurentmap.h | 73 +++++++++++++++++++++++++++ common/logger.cpp | 111 +++++++++++++++++++++++++++--------------- common/logger.h | 43 +++++++++------- 3 files changed, 171 insertions(+), 56 deletions(-) create mode 100644 common/concurentmap.h diff --git a/common/concurentmap.h b/common/concurentmap.h new file mode 100644 index 000000000..cebb6ae25 --- /dev/null +++ b/common/concurentmap.h @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include + +namespace swss +{ + template + class ConcurentMap + { + public: + + ConcurentMap() = default; + + public: + + size_t size() + { + std::lock_guard _lock(m_mutex); + + return m_map.size(); + } + + bool contains(const K& key) + { + std::lock_guard _lock(m_mutex); + + return m_map.find(key) != m_map.end(); + } + + void insert(const std::pair& pair) + { + std::lock_guard _lock(m_mutex); + + m_map.insert(pair); + } + + void set(const K& key, const V& value) + { + std::lock_guard _lock(m_mutex); + + m_map[key] = value; + } + + // return copy + V get(const K& key) + { + std::lock_guard _lock(m_mutex); + + return m_map[key]; + } + + // return copy + std::map getCopy() + { + std::lock_guard _lock(m_mutex); + + return m_map; + } + + private: + + ConcurentMap(const ConcurentMap&); + ConcurentMap& operator=(const ConcurentMap&); + + private: + + std::map m_map; + + std::mutex m_mutex; + }; +} diff --git a/common/logger.cpp b/common/logger.cpp index 82ed21540..0711753e8 100644 --- a/common/logger.cpp +++ b/common/logger.cpp @@ -14,9 +14,11 @@ #include "consumerstatetable.h" #include "producerstatetable.h" -namespace swss { +using namespace swss; -void err_exit(const char *fn, int ln, int e, const char *fmt, ...) +#define MUTEX std::lock_guard _lock(getInstance().m_mutex); + +void swss::err_exit(const char *fn, int ln, int e, const char *fmt, ...) { va_list ap; char buff[1024]; @@ -31,13 +33,34 @@ void err_exit(const char *fn, int ln, int e, const char *fmt, ...) abort(); } -Logger::~Logger() { - if (m_settingThread) { - terminateSettingThread = true; +Logger::~Logger() +{ + terminateSettingThread(); +} + +void Logger::terminateSettingThread() +{ + // can't be executed under mutex, since it can cause deadlock + + if (m_settingThread) + { + m_runSettingThread = false; + m_settingThread->join(); + + m_settingThread = nullptr; + + m_runSettingThread = true; } } +void Logger::restartSettingThread() +{ + terminateSettingThread(); + + m_settingThread.reset(new std::thread(&Logger::settingThread, this)); +} + const Logger::PriorityStringMap Logger::priorityStringMap = { { "EMERG", SWSS_EMERG }, { "ALERT", SWSS_ALERT }, @@ -49,7 +72,7 @@ const Logger::PriorityStringMap Logger::priorityStringMap = { { "DEBUG", SWSS_DEBUG } }; -void Logger::swssPrioNotify(const std::string &component, const std::string &prioStr) +void Logger::swssPrioNotify(const std::string& component, const std::string& prioStr) { auto& logger = getInstance(); @@ -70,7 +93,7 @@ const Logger::OutputStringMap Logger::outputStringMap = { { "STDERR", SWSS_STDERR } }; -void Logger::swssOutputNotify(const std::string &component, const std::string &outputStr) +void Logger::swssOutputNotify(const std::string& component, const std::string& outputStr) { auto& logger = getInstance(); @@ -85,14 +108,19 @@ void Logger::swssOutputNotify(const std::string &component, const std::string &o } } -void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio, const OutputChangeNotify& outputNotify, const std::string& defOutput) +void Logger::linkToDbWithOutput( + const std::string& dbName, + const PriorityChangeNotify& prioNotify, + const std::string& defPrio, + const OutputChangeNotify& outputNotify, + const std::string& defOutput) { auto& logger = getInstance(); // Initialize internal DB with observer logger.m_settingChangeObservers.insert(std::make_pair(dbName, std::make_pair(prioNotify, outputNotify))); + DBConnector db("LOGLEVEL_DB", 0); - auto keys = db.keys("*"); std::string key = dbName + ":" + dbName; std::string prio, output; @@ -100,7 +128,7 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN auto prioPtr = db.hget(key, DAEMON_LOGLEVEL); auto outputPtr = db.hget(key, DAEMON_LOGOUTPUT); - if ( prioPtr == nullptr ) + if (prioPtr == nullptr) { prio = defPrio; doUpdate = true; @@ -110,7 +138,7 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN prio = *prioPtr; } - if ( outputPtr == nullptr ) + if (outputPtr == nullptr) { output = defOutput; doUpdate = true; @@ -130,26 +158,26 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN table.set(dbName, fieldValues); } - logger.m_currentPrios[dbName] = prio; - logger.m_currentOutputs[dbName] = output; + logger.m_currentPrios.set(dbName, prio); + logger.m_currentOutputs.set(dbName, output); + prioNotify(dbName, prio); outputNotify(dbName, output); } -void Logger::linkToDb(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio) +void Logger::linkToDb(const std::string& dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio) { linkToDbWithOutput(dbName, prioNotify, defPrio, swssOutputNotify, "SYSLOG"); } -void Logger::linkToDbNative(const std::string &dbName, const char * defPrio) +void Logger::linkToDbNative(const std::string& dbName, const char * defPrio) { - auto& logger = getInstance(); - linkToDb(dbName, swssPrioNotify, defPrio); - logger.m_settingThread.reset(new std::thread(&Logger::settingThread, &logger)); + + getInstance().restartSettingThread(); } -Logger &Logger::getInstance() +Logger& Logger::getInstance() { static Logger m_logger; return m_logger; @@ -171,13 +199,13 @@ void Logger::settingThread() DBConnector db("LOGLEVEL_DB", 0); std::map> selectables; - while (!terminateSettingThread) + while (m_runSettingThread) { if (selectables.size() < m_settingChangeObservers.size()) { - for (const auto& i : m_settingChangeObservers) + for (const auto& i : m_settingChangeObservers.getCopy()) { - const std::string &dbName = i.first; + const std::string& dbName = i.first; if (selectables.find(dbName) == selectables.end()) { auto table = std::make_shared(&db, dbName); @@ -208,27 +236,30 @@ void Logger::settingThread() dynamic_cast(selectable)->pop(koValues); std::string key = kfvKey(koValues), op = kfvOp(koValues); - if ((op != SET_COMMAND) || (m_settingChangeObservers.find(key) == m_settingChangeObservers.end())) + if (op != SET_COMMAND || !m_settingChangeObservers.contains(key)) { continue; } - auto values = kfvFieldsValues(koValues); - for (const auto& i : values) + const auto& values = kfvFieldsValues(koValues); + + for (auto& i : values) { - const std::string &field = fvField(i), &value = fvValue(i); - if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios[key])) + auto& field = fvField(i); + auto& value = fvValue(i); + + if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios.get(key))) { - m_currentPrios[key] = value; - m_settingChangeObservers[key].first(key, value); + m_currentPrios.set(key, value); + + m_settingChangeObservers.get(key).first(key, value); } - else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs[key])) + else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs.get(key))) { - m_currentOutputs[key] = value; - m_settingChangeObservers[key].second(key, value); - } + m_currentOutputs.set(key, value); - break; + m_settingChangeObservers.get(key).second(key, value); + } } } } @@ -246,14 +277,16 @@ void Logger::write(Priority prio, const char *fmt, ...) if (m_output == SWSS_SYSLOG) { - vsyslog(prio, fmt, ap); + vsyslog(prio, fmt, ap); } else { std::stringstream ss; ss << std::setw(6) << std::right << priorityToString(prio); ss << fmt << std::endl; - std::lock_guard lock(m_mutex); + + MUTEX; + if (m_output == SWSS_STDOUT) { vprintf(ss.str().c_str(), ap); @@ -283,7 +316,9 @@ void Logger::wthrow(Priority prio, const char *fmt, ...) std::stringstream ss; ss << std::setw(6) << std::right << priorityToString(prio); ss << fmt << std::endl; - std::lock_guard lock(m_mutex); + + MUTEX; + if (m_output == SWSS_STDOUT) { vprintf(ss.str().c_str(), ap); @@ -363,5 +398,3 @@ Logger::ScopeTimer::~ScopeTimer() Logger::getInstance().write(swss::Logger::SWSS_NOTICE, ":- %s: %s took %lf sec", m_fun, m_msg.c_str(), duration); } - -}; diff --git a/common/logger.h b/common/logger.h index 63f4b83c9..503de99fa 100644 --- a/common/logger.h +++ b/common/logger.h @@ -1,5 +1,4 @@ -#ifndef SWSS_COMMON_LOGGER_H -#define SWSS_COMMON_LOGGER_H +#pragma once #include #include @@ -10,6 +9,8 @@ #include #include +#include "concurentmap.h" + namespace swss { #define SWSS_LOG_ERROR(MSG, ...) swss::Logger::getInstance().write(swss::Logger::SWSS_ERROR, ":- %s: " MSG, __FUNCTION__, ##__VA_ARGS__) @@ -30,8 +31,7 @@ void err_exit(const char *fn, int ln, int e, const char *fmt, ...) #endif ; - -#define ABORT_IF_NOT(x, fmt, args...) \ +#define ABORT_IF_NOT(x, fmt, args...) \ if (!(x)) { \ int e = errno; \ err_exit(__FUNCTION__, __LINE__, e, (fmt), ##args); \ @@ -68,18 +68,25 @@ class Logger SWSS_STDOUT, SWSS_STDERR }; + typedef std::map OutputStringMap; static const OutputStringMap outputStringMap; typedef std::function OutputChangeNotify; - typedef std::map> LogSettingChangeObservers; - static Logger &getInstance(); + static Logger& getInstance(); static void setMinPrio(Priority prio); static Priority getMinPrio(); - static void linkToDbWithOutput(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio, const OutputChangeNotify& outputNotify, const std::string& defOutput); - static void linkToDb(const std::string &dbName, const PriorityChangeNotify& notify, const std::string& defPrio); + + static void linkToDbWithOutput( + const std::string& dbName, + const PriorityChangeNotify& prioNotify, + const std::string& defPrio, + const OutputChangeNotify& outputNotify, + const std::string& defOutput); + + static void linkToDb(const std::string& dbName, const PriorityChangeNotify& notify, const std::string& defPrio); // Must be called after all linkToDb to start select from DB - static void linkToDbNative(const std::string &dbName, const char * defPrio="NOTICE"); + static void linkToDbNative(const std::string& dbName, const char * defPrio="NOTICE"); void write(Priority prio, const char *fmt, ...) #ifdef __GNUC__ __attribute__ ((format (printf, 3, 4))) @@ -131,23 +138,25 @@ class Logger Logger() = default; ~Logger(); Logger(const Logger&); - Logger &operator=(const Logger&); + Logger& operator=(const Logger&); - static void swssPrioNotify(const std::string &component, const std::string &prioStr); - static void swssOutputNotify(const std::string &component, const std::string &outputStr); + static void swssPrioNotify(const std::string& component, const std::string& prioStr); + static void swssOutputNotify(const std::string& component, const std::string& outputStr); void settingThread(); + void terminateSettingThread(); + void restartSettingThread(); + + typedef ConcurentMap> LogSettingChangeObservers; LogSettingChangeObservers m_settingChangeObservers; - std::map m_currentPrios; + ConcurentMap m_currentPrios; std::atomic m_minPrio = { SWSS_NOTICE }; - std::map m_currentOutputs; + ConcurentMap m_currentOutputs; std::atomic m_output = { SWSS_SYSLOG }; std::unique_ptr m_settingThread; std::mutex m_mutex; - volatile bool terminateSettingThread = false; + volatile bool m_runSettingThread = true; }; } - -#endif /* SWSS_COMMON_LOGGER_H */ From 7f230e5d244f5a6c88abe96ab3e386f5dbe9338b Mon Sep 17 00:00:00 2001 From: kcudnik Date: Tue, 27 Jul 2021 13:13:02 +0200 Subject: [PATCH 2/3] Address comments --- common/logger.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/common/logger.cpp b/common/logger.cpp index 0711753e8..b27cbed93 100644 --- a/common/logger.cpp +++ b/common/logger.cpp @@ -49,8 +49,6 @@ void Logger::terminateSettingThread() m_settingThread->join(); m_settingThread = nullptr; - - m_runSettingThread = true; } } @@ -58,6 +56,8 @@ void Logger::restartSettingThread() { terminateSettingThread(); + m_runSettingThread = true; + m_settingThread.reset(new std::thread(&Logger::settingThread, this)); } @@ -251,13 +251,11 @@ void Logger::settingThread() if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios.get(key))) { m_currentPrios.set(key, value); - m_settingChangeObservers.get(key).first(key, value); } else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs.get(key))) { m_currentOutputs.set(key, value); - m_settingChangeObservers.get(key).second(key, value); } } From dac68cc94e9e7973771626e4a3c25c25a9dfa505 Mon Sep 17 00:00:00 2001 From: kcudnik Date: Fri, 6 Aug 2021 12:04:48 +0200 Subject: [PATCH 3/3] Fix spelling --- common/{concurentmap.h => concurrentmap.h} | 8 ++++---- common/logger.h | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) rename common/{concurentmap.h => concurrentmap.h} (88%) diff --git a/common/concurentmap.h b/common/concurrentmap.h similarity index 88% rename from common/concurentmap.h rename to common/concurrentmap.h index cebb6ae25..56d1411ff 100644 --- a/common/concurentmap.h +++ b/common/concurrentmap.h @@ -7,11 +7,11 @@ namespace swss { template - class ConcurentMap + class ConcurrentMap { public: - ConcurentMap() = default; + ConcurrentMap() = default; public: @@ -61,8 +61,8 @@ namespace swss private: - ConcurentMap(const ConcurentMap&); - ConcurentMap& operator=(const ConcurentMap&); + ConcurrentMap(const ConcurrentMap&); + ConcurrentMap& operator=(const ConcurrentMap&); private: diff --git a/common/logger.h b/common/logger.h index 503de99fa..1f541bc81 100644 --- a/common/logger.h +++ b/common/logger.h @@ -9,7 +9,7 @@ #include #include -#include "concurentmap.h" +#include "concurrentmap.h" namespace swss { @@ -147,12 +147,12 @@ class Logger void terminateSettingThread(); void restartSettingThread(); - typedef ConcurentMap> LogSettingChangeObservers; + typedef ConcurrentMap> LogSettingChangeObservers; LogSettingChangeObservers m_settingChangeObservers; - ConcurentMap m_currentPrios; + ConcurrentMap m_currentPrios; std::atomic m_minPrio = { SWSS_NOTICE }; - ConcurentMap m_currentOutputs; + ConcurrentMap m_currentOutputs; std::atomic m_output = { SWSS_SYSLOG }; std::unique_ptr m_settingThread; std::mutex m_mutex;