Skip to content

Commit

Permalink
[logger] Make map access thread safe and proper terminate thread (#510)
Browse files Browse the repository at this point in the history
  • Loading branch information
kcudnik authored Aug 20, 2021
1 parent e4c3d0b commit 9fd7dbf
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 56 deletions.
73 changes: 73 additions & 0 deletions common/concurrentmap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include <map>
#include <string>
#include <mutex>

namespace swss
{
template <typename K, typename V>
class ConcurrentMap
{
public:

ConcurrentMap() = default;

public:

size_t size()
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map.size();
}

bool contains(const K& key)
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map.find(key) != m_map.end();
}

void insert(const std::pair<K,V>& pair)
{
std::lock_guard<std::mutex> _lock(m_mutex);

m_map.insert(pair);
}

void set(const K& key, const V& value)
{
std::lock_guard<std::mutex> _lock(m_mutex);

m_map[key] = value;
}

// return copy
V get(const K& key)
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map[key];
}

// return copy
std::map<K,V> getCopy()
{
std::lock_guard<std::mutex> _lock(m_mutex);

return m_map;
}

private:

ConcurrentMap(const ConcurrentMap&);
ConcurrentMap& operator=(const ConcurrentMap&);

private:

std::map<K,V> m_map;

std::mutex m_mutex;
};
}
109 changes: 70 additions & 39 deletions common/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
#include "consumerstatetable.h"
#include "producerstatetable.h"

namespace swss {
using namespace swss;

void err_exit(const char *fn, int ln, int e, const char *fmt, ...)
#define MUTEX std::lock_guard<std::mutex> _lock(getInstance().m_mutex);

void swss::err_exit(const char *fn, int ln, int e, const char *fmt, ...)
{
va_list ap;
char buff[1024];
Expand All @@ -31,13 +33,34 @@ void err_exit(const char *fn, int ln, int e, const char *fmt, ...)
abort();
}

Logger::~Logger() {
if (m_settingThread) {
terminateSettingThread = true;
Logger::~Logger()
{
terminateSettingThread();
}

void Logger::terminateSettingThread()
{
// can't be executed under mutex, since it can cause deadlock

if (m_settingThread)
{
m_runSettingThread = false;

m_settingThread->join();

m_settingThread = nullptr;
}
}

void Logger::restartSettingThread()
{
terminateSettingThread();

m_runSettingThread = true;

m_settingThread.reset(new std::thread(&Logger::settingThread, this));
}

const Logger::PriorityStringMap Logger::priorityStringMap = {
{ "EMERG", SWSS_EMERG },
{ "ALERT", SWSS_ALERT },
Expand All @@ -49,7 +72,7 @@ const Logger::PriorityStringMap Logger::priorityStringMap = {
{ "DEBUG", SWSS_DEBUG }
};

void Logger::swssPrioNotify(const std::string &component, const std::string &prioStr)
void Logger::swssPrioNotify(const std::string& component, const std::string& prioStr)
{
auto& logger = getInstance();

Expand All @@ -70,7 +93,7 @@ const Logger::OutputStringMap Logger::outputStringMap = {
{ "STDERR", SWSS_STDERR }
};

void Logger::swssOutputNotify(const std::string &component, const std::string &outputStr)
void Logger::swssOutputNotify(const std::string& component, const std::string& outputStr)
{
auto& logger = getInstance();

Expand All @@ -85,22 +108,27 @@ void Logger::swssOutputNotify(const std::string &component, const std::string &o
}
}

void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio, const OutputChangeNotify& outputNotify, const std::string& defOutput)
void Logger::linkToDbWithOutput(
const std::string& dbName,
const PriorityChangeNotify& prioNotify,
const std::string& defPrio,
const OutputChangeNotify& outputNotify,
const std::string& defOutput)
{
auto& logger = getInstance();

// Initialize internal DB with observer
logger.m_settingChangeObservers.insert(std::make_pair(dbName, std::make_pair(prioNotify, outputNotify)));

DBConnector db("LOGLEVEL_DB", 0);
auto keys = db.keys("*");

std::string key = dbName + ":" + dbName;
std::string prio, output;
bool doUpdate = false;
auto prioPtr = db.hget(key, DAEMON_LOGLEVEL);
auto outputPtr = db.hget(key, DAEMON_LOGOUTPUT);

if ( prioPtr == nullptr )
if (prioPtr == nullptr)
{
prio = defPrio;
doUpdate = true;
Expand All @@ -110,7 +138,7 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN
prio = *prioPtr;
}

if ( outputPtr == nullptr )
if (outputPtr == nullptr)
{
output = defOutput;
doUpdate = true;
Expand All @@ -130,26 +158,26 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN
table.set(dbName, fieldValues);
}

logger.m_currentPrios[dbName] = prio;
logger.m_currentOutputs[dbName] = output;
logger.m_currentPrios.set(dbName, prio);
logger.m_currentOutputs.set(dbName, output);

prioNotify(dbName, prio);
outputNotify(dbName, output);
}

void Logger::linkToDb(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio)
void Logger::linkToDb(const std::string& dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio)
{
linkToDbWithOutput(dbName, prioNotify, defPrio, swssOutputNotify, "SYSLOG");
}

void Logger::linkToDbNative(const std::string &dbName, const char * defPrio)
void Logger::linkToDbNative(const std::string& dbName, const char * defPrio)
{
auto& logger = getInstance();

linkToDb(dbName, swssPrioNotify, defPrio);
logger.m_settingThread.reset(new std::thread(&Logger::settingThread, &logger));

getInstance().restartSettingThread();
}

Logger &Logger::getInstance()
Logger& Logger::getInstance()
{
static Logger m_logger;
return m_logger;
Expand All @@ -171,13 +199,13 @@ void Logger::settingThread()
DBConnector db("LOGLEVEL_DB", 0);
std::map<std::string, std::shared_ptr<ConsumerStateTable>> selectables;

while (!terminateSettingThread)
while (m_runSettingThread)
{
if (selectables.size() < m_settingChangeObservers.size())
{
for (const auto& i : m_settingChangeObservers)
for (const auto& i : m_settingChangeObservers.getCopy())
{
const std::string &dbName = i.first;
const std::string& dbName = i.first;
if (selectables.find(dbName) == selectables.end())
{
auto table = std::make_shared<ConsumerStateTable>(&db, dbName);
Expand Down Expand Up @@ -208,27 +236,28 @@ void Logger::settingThread()
dynamic_cast<ConsumerStateTable *>(selectable)->pop(koValues);
std::string key = kfvKey(koValues), op = kfvOp(koValues);

if ((op != SET_COMMAND) || (m_settingChangeObservers.find(key) == m_settingChangeObservers.end()))
if (op != SET_COMMAND || !m_settingChangeObservers.contains(key))
{
continue;
}

auto values = kfvFieldsValues(koValues);
for (const auto& i : values)
const auto& values = kfvFieldsValues(koValues);

for (auto& i : values)
{
const std::string &field = fvField(i), &value = fvValue(i);
if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios[key]))
auto& field = fvField(i);
auto& value = fvValue(i);

if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios.get(key)))
{
m_currentPrios[key] = value;
m_settingChangeObservers[key].first(key, value);
m_currentPrios.set(key, value);
m_settingChangeObservers.get(key).first(key, value);
}
else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs[key]))
else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs.get(key)))
{
m_currentOutputs[key] = value;
m_settingChangeObservers[key].second(key, value);
m_currentOutputs.set(key, value);
m_settingChangeObservers.get(key).second(key, value);
}

break;
}
}
}
Expand All @@ -246,14 +275,16 @@ void Logger::write(Priority prio, const char *fmt, ...)

if (m_output == SWSS_SYSLOG)
{
vsyslog(prio, fmt, ap);
vsyslog(prio, fmt, ap);
}
else
{
std::stringstream ss;
ss << std::setw(6) << std::right << priorityToString(prio);
ss << fmt << std::endl;
std::lock_guard<std::mutex> lock(m_mutex);

MUTEX;

if (m_output == SWSS_STDOUT)
{
vprintf(ss.str().c_str(), ap);
Expand Down Expand Up @@ -283,7 +314,9 @@ void Logger::wthrow(Priority prio, const char *fmt, ...)
std::stringstream ss;
ss << std::setw(6) << std::right << priorityToString(prio);
ss << fmt << std::endl;
std::lock_guard<std::mutex> lock(m_mutex);

MUTEX;

if (m_output == SWSS_STDOUT)
{
vprintf(ss.str().c_str(), ap);
Expand Down Expand Up @@ -363,5 +396,3 @@ Logger::ScopeTimer::~ScopeTimer()

Logger::getInstance().write(swss::Logger::SWSS_NOTICE, ":- %s: %s took %lf sec", m_fun, m_msg.c_str(), duration);
}

};
Loading

0 comments on commit 9fd7dbf

Please sign in to comment.