Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[logger] Make map access thread safe and proper terminate thread #510

Merged
merged 3 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions common/concurrentmap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include <map>
#include <string>
#include <mutex>

namespace swss
{
template <typename K, typename V>
class ConcurrentMap
{
public:

ConcurrentMap() = default;

public:

size_t size()
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map.size();
}

bool contains(const K& key)
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map.find(key) != m_map.end();
}

void insert(const std::pair<K,V>& pair)
{
std::lock_guard<std::mutex> _lock(m_mutex);

m_map.insert(pair);
}

void set(const K& key, const V& value)
{
std::lock_guard<std::mutex> _lock(m_mutex);

m_map[key] = value;
}

// return copy
V get(const K& key)
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map[key];
}

// return copy
std::map<K,V> getCopy()
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map;
}

private:

ConcurrentMap(const ConcurrentMap&);
ConcurrentMap& operator=(const ConcurrentMap&);

private:

std::map<K,V> m_map;

std::mutex m_mutex;
};
}
109 changes: 70 additions & 39 deletions common/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
#include "consumerstatetable.h"
#include "producerstatetable.h"

namespace swss {
using namespace swss;

void err_exit(const char *fn, int ln, int e, const char *fmt, ...)
#define MUTEX std::lock_guard<std::mutex> _lock(getInstance().m_mutex);

void swss::err_exit(const char *fn, int ln, int e, const char *fmt, ...)
{
va_list ap;
char buff[1024];
Expand All @@ -31,13 +33,34 @@ void err_exit(const char *fn, int ln, int e, const char *fmt, ...)
abort();
}

Logger::~Logger() {
if (m_settingThread) {
terminateSettingThread = true;
Logger::~Logger()
{
terminateSettingThread();
}

void Logger::terminateSettingThread()
{
// can't be executed under mutex, since it can cause deadlock

if (m_settingThread)
{
m_runSettingThread = false;

m_settingThread->join();

m_settingThread = nullptr;
}
}

void Logger::restartSettingThread()
{
terminateSettingThread();

m_runSettingThread = true;

m_settingThread.reset(new std::thread(&Logger::settingThread, this));
}

const Logger::PriorityStringMap Logger::priorityStringMap = {
{ "EMERG", SWSS_EMERG },
{ "ALERT", SWSS_ALERT },
Expand All @@ -49,7 +72,7 @@ const Logger::PriorityStringMap Logger::priorityStringMap = {
{ "DEBUG", SWSS_DEBUG }
};

void Logger::swssPrioNotify(const std::string &component, const std::string &prioStr)
void Logger::swssPrioNotify(const std::string& component, const std::string& prioStr)
{
auto& logger = getInstance();

Expand All @@ -70,7 +93,7 @@ const Logger::OutputStringMap Logger::outputStringMap = {
{ "STDERR", SWSS_STDERR }
};

void Logger::swssOutputNotify(const std::string &component, const std::string &outputStr)
void Logger::swssOutputNotify(const std::string& component, const std::string& outputStr)
{
auto& logger = getInstance();

Expand All @@ -85,22 +108,27 @@ void Logger::swssOutputNotify(const std::string &component, const std::string &o
}
}

void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio, const OutputChangeNotify& outputNotify, const std::string& defOutput)
void Logger::linkToDbWithOutput(
const std::string& dbName,
const PriorityChangeNotify& prioNotify,
const std::string& defPrio,
const OutputChangeNotify& outputNotify,
const std::string& defOutput)
{
auto& logger = getInstance();

// Initialize internal DB with observer
logger.m_settingChangeObservers.insert(std::make_pair(dbName, std::make_pair(prioNotify, outputNotify)));

DBConnector db("LOGLEVEL_DB", 0);
auto keys = db.keys("*");

std::string key = dbName + ":" + dbName;
std::string prio, output;
bool doUpdate = false;
auto prioPtr = db.hget(key, DAEMON_LOGLEVEL);
auto outputPtr = db.hget(key, DAEMON_LOGOUTPUT);

if ( prioPtr == nullptr )
if (prioPtr == nullptr)
{
prio = defPrio;
doUpdate = true;
Expand All @@ -110,7 +138,7 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN
prio = *prioPtr;
}

if ( outputPtr == nullptr )
if (outputPtr == nullptr)
{
output = defOutput;
doUpdate = true;
Expand All @@ -130,26 +158,26 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN
table.set(dbName, fieldValues);
}

logger.m_currentPrios[dbName] = prio;
logger.m_currentOutputs[dbName] = output;
logger.m_currentPrios.set(dbName, prio);
logger.m_currentOutputs.set(dbName, output);

prioNotify(dbName, prio);
outputNotify(dbName, output);
}

void Logger::linkToDb(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio)
void Logger::linkToDb(const std::string& dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio)
{
linkToDbWithOutput(dbName, prioNotify, defPrio, swssOutputNotify, "SYSLOG");
}

void Logger::linkToDbNative(const std::string &dbName, const char * defPrio)
void Logger::linkToDbNative(const std::string& dbName, const char * defPrio)
{
auto& logger = getInstance();

linkToDb(dbName, swssPrioNotify, defPrio);
logger.m_settingThread.reset(new std::thread(&Logger::settingThread, &logger));

getInstance().restartSettingThread();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is restarting the thread actually required if the thread is already started and the data structures are thread-safe? Put another way: is there any reason to maintain a separate linkToDbNative and linkToDb vs consolidating them into one function which starts the thread if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats a good question, im just fixing this instance here, since previously if you call linkToDbNative twice, it will crash, and whether we should have only 1 thread, not sure about that, this PR is just to make those maps thread safe + plus crahs bug fix not changing behavior, @qiluo-msft can you have opinion?

}

Logger &Logger::getInstance()
Logger& Logger::getInstance()
{
static Logger m_logger;
return m_logger;
Expand All @@ -171,13 +199,13 @@ void Logger::settingThread()
DBConnector db("LOGLEVEL_DB", 0);
std::map<std::string, std::shared_ptr<ConsumerStateTable>> selectables;

while (!terminateSettingThread)
while (m_runSettingThread)
{
if (selectables.size() < m_settingChangeObservers.size())
{
for (const auto& i : m_settingChangeObservers)
for (const auto& i : m_settingChangeObservers.getCopy())
{
const std::string &dbName = i.first;
const std::string& dbName = i.first;
if (selectables.find(dbName) == selectables.end())
{
auto table = std::make_shared<ConsumerStateTable>(&db, dbName);
Expand Down Expand Up @@ -208,27 +236,28 @@ void Logger::settingThread()
dynamic_cast<ConsumerStateTable *>(selectable)->pop(koValues);
std::string key = kfvKey(koValues), op = kfvOp(koValues);

if ((op != SET_COMMAND) || (m_settingChangeObservers.find(key) == m_settingChangeObservers.end()))
if (op != SET_COMMAND || !m_settingChangeObservers.contains(key))
{
continue;
}

auto values = kfvFieldsValues(koValues);
for (const auto& i : values)
const auto& values = kfvFieldsValues(koValues);

for (auto& i : values)
{
const std::string &field = fvField(i), &value = fvValue(i);
if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios[key]))
auto& field = fvField(i);
auto& value = fvValue(i);

if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios.get(key)))
{
m_currentPrios[key] = value;
m_settingChangeObservers[key].first(key, value);
m_currentPrios.set(key, value);
m_settingChangeObservers.get(key).first(key, value);
}
else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs[key]))
else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs.get(key)))
{
m_currentOutputs[key] = value;
m_settingChangeObservers[key].second(key, value);
m_currentOutputs.set(key, value);
m_settingChangeObservers.get(key).second(key, value);
}

break;
}
}
}
Expand All @@ -246,14 +275,16 @@ void Logger::write(Priority prio, const char *fmt, ...)

if (m_output == SWSS_SYSLOG)
{
vsyslog(prio, fmt, ap);
vsyslog(prio, fmt, ap);
}
else
{
std::stringstream ss;
ss << std::setw(6) << std::right << priorityToString(prio);
ss << fmt << std::endl;
std::lock_guard<std::mutex> lock(m_mutex);

MUTEX;

if (m_output == SWSS_STDOUT)
{
vprintf(ss.str().c_str(), ap);
Expand Down Expand Up @@ -283,7 +314,9 @@ void Logger::wthrow(Priority prio, const char *fmt, ...)
std::stringstream ss;
ss << std::setw(6) << std::right << priorityToString(prio);
ss << fmt << std::endl;
std::lock_guard<std::mutex> lock(m_mutex);

MUTEX;

if (m_output == SWSS_STDOUT)
{
vprintf(ss.str().c_str(), ap);
Expand Down Expand Up @@ -363,5 +396,3 @@ Logger::ScopeTimer::~ScopeTimer()

Logger::getInstance().write(swss::Logger::SWSS_NOTICE, ":- %s: %s took %lf sec", m_fun, m_msg.c_str(), duration);
}

};
Loading