diff --git a/syncd/syncd.cpp b/syncd/syncd.cpp index 171ae66dffb8..ee5940c5f6e4 100644 --- a/syncd/syncd.cpp +++ b/syncd/syncd.cpp @@ -2358,9 +2358,10 @@ void processPfcWdEvent( const auto &key = kfvKey(kco); const auto &op = kfvOp(kco); - sai_object_id_t queueVid = SAI_NULL_OBJECT_ID; - sai_deserialize_object_id(key, queueVid); - sai_object_id_t queueId = translate_vid_to_rid(queueVid); + sai_object_id_t vid = SAI_NULL_OBJECT_ID; + sai_deserialize_object_id(key, vid); + sai_object_id_t rid = translate_vid_to_rid(vid); + sai_object_type_t objectType = sai_object_type_query(rid); const auto values = kfvFieldsValues(kco); for (const auto& valuePair : values) @@ -2370,33 +2371,94 @@ void processPfcWdEvent( if (op == DEL_COMMAND) { - PfcWatchdog::removeQueue(queueVid); - continue; - } - - auto idStrings = swss::tokenize(value, ','); - - if (field == PFC_WD_PORT_COUNTER_ID_LIST) - { - std::vector portCounterIds; - for (const auto &str : idStrings) + if (objectType == SAI_OBJECT_TYPE_PORT) + { + PfcWatchdog::removePort(vid); + } + else if (objectType == SAI_OBJECT_TYPE_QUEUE) { - sai_port_stat_t stat; - sai_deserialize_port_stat(str, stat); - portCounterIds.push_back(stat); + PfcWatchdog::removeQueue(vid); + } + else + { + SWSS_LOG_ERROR("Object type for removal not supported"); } - PfcWatchdog::setPortCounterList(queueVid, queueId, portCounterIds); } - else if (field == PFC_WD_QUEUE_COUNTER_ID_LIST) + else if (op == SET_COMMAND) { - std::vector queueCounterIds; - for (const auto &str : idStrings) + auto idStrings = swss::tokenize(value, ','); + + if (objectType == SAI_OBJECT_TYPE_PORT && field == PFC_WD_PORT_COUNTER_ID_LIST) { - sai_queue_stat_t stat; - sai_deserialize_queue_stat(str, stat); - queueCounterIds.push_back(stat); + std::vector portCounterIds; + for (const auto &str : idStrings) + { + sai_port_stat_t stat; + sai_deserialize_port_stat(str, stat); + portCounterIds.push_back(stat); + } + PfcWatchdog::setPortCounterList(vid, rid, portCounterIds); } - PfcWatchdog::setQueueCounterList(queueVid, queueId, queueCounterIds); + else if (objectType == SAI_OBJECT_TYPE_QUEUE && field == PFC_WD_QUEUE_COUNTER_ID_LIST) + { + std::vector queueCounterIds; + for (const auto &str : idStrings) + { + sai_queue_stat_t stat; + sai_deserialize_queue_stat(str, stat); + queueCounterIds.push_back(stat); + } + PfcWatchdog::setQueueCounterList(vid, rid, queueCounterIds); + } + else + { + SWSS_LOG_ERROR("Object type not supported"); + } + } + } +} + +void processPfcWdPluginEvent( + _In_ swss::ConsumerStateTable &consumer) +{ + std::lock_guard lock(g_mutex); + + SWSS_LOG_ENTER(); + + swss::KeyOpFieldsValuesTuple kco; + consumer.pop(kco); + + const auto &key = kfvKey(kco); + const auto &op = kfvOp(kco); + + if (op == DEL_COMMAND) + { + PfcWatchdog::removeCounterPlugin(key); + return; + } + + const auto values = kfvFieldsValues(kco); + for (const auto& valuePair : values) + { + const auto field = fvField(valuePair); + const auto value = fvValue(valuePair); + + if (field != SAI_OBJECT_TYPE) + { + continue; + } + + if (value == sai_serialize_object_type(SAI_OBJECT_TYPE_PORT)) + { + PfcWatchdog::addPortCounterPlugin(key); + } + else if (value == sai_serialize_object_type(SAI_OBJECT_TYPE_QUEUE)) + { + PfcWatchdog::addQueueCounterPlugin(key); + } + else + { + SWSS_LOG_ERROR("Plugin for %s is not supported", value.c_str()); } } } @@ -2981,6 +3043,7 @@ int main(int argc, char **argv) std::shared_ptr asicState = std::make_shared(dbAsic.get(), ASIC_STATE_TABLE); std::shared_ptr restartQuery = std::make_shared(dbAsic.get(), "RESTARTQUERY"); std::shared_ptr pfcWdState = std::make_shared(dbPfcWatchdog.get(), PFC_WD_STATE_TABLE); + std::shared_ptr pfcWdPlugin = std::make_shared(dbPfcWatchdog.get(), PLUGIN_TABLE); /* * At the end we cant use producer consumer concept since if one proces @@ -3077,6 +3140,7 @@ int main(int argc, char **argv) s.addSelectable(asicState.get()); s.addSelectable(restartQuery.get()); s.addSelectable(pfcWdState.get()); + s.addSelectable(pfcWdPlugin.get()); SWSS_LOG_NOTICE("starting main loop"); @@ -3105,6 +3169,10 @@ int main(int argc, char **argv) { processPfcWdEvent(*(swss::ConsumerStateTable*)sel); } + else if (sel == pfcWdPlugin.get()) + { + processPfcWdPluginEvent(*(swss::ConsumerStateTable*)sel); + } else if (result == swss::Select::OBJECT) { processEvent(*(swss::ConsumerTable*)sel); diff --git a/syncd/syncd_pfc_watchdog.cpp b/syncd/syncd_pfc_watchdog.cpp index f19198158f22..cc557ae4730c 100644 --- a/syncd/syncd_pfc_watchdog.cpp +++ b/syncd/syncd_pfc_watchdog.cpp @@ -1,44 +1,41 @@ #include "syncd_pfc_watchdog.h" #include "syncd.h" +#include "swss/redisapi.h" -#define PFC_WD_POLL_MSECS 50 +#define PFC_WD_POLL_MSECS 100 -PfcWatchdog::PfcCounterIds::PfcCounterIds( - _In_ sai_object_id_t queue, +PfcWatchdog::PortCounterIds::PortCounterIds( _In_ sai_object_id_t port, - _In_ const std::vector &portIds, + _In_ const std::vector &portIds): + portId(port), portCounterIds(portIds) +{ +} + +PfcWatchdog::QueueCounterIds::QueueCounterIds( + _In_ sai_object_id_t queue, _In_ const std::vector &queueIds): - queueId(queue), portId(port), portCounterIds(portIds), queueCounterIds(queueIds) + queueId(queue), queueCounterIds(queueIds) { } void PfcWatchdog::setPortCounterList( - _In_ sai_object_id_t queueVid, - _In_ sai_object_id_t queueId, + _In_ sai_object_id_t portVid, + _In_ sai_object_id_t portId, _In_ const std::vector &counterIds) { SWSS_LOG_ENTER(); PfcWatchdog &wd = getInstance(); - sai_object_id_t portId = queueIdToPortId(queueId); - if (portId == SAI_NULL_OBJECT_ID) - { - return; - } - - auto it = wd.m_counterIdsMap.find(queueVid); - if (it != wd.m_counterIdsMap.end()) + auto it = wd.m_portCounterIdsMap.find(portVid); + if (it != wd.m_portCounterIdsMap.end()) { (*it).second->portCounterIds = counterIds; return; } - auto pfcCounterIds = std::make_shared(queueId, - portId, - counterIds, - std::vector()); - wd.m_counterIdsMap.emplace(queueVid, pfcCounterIds); + auto portCounterIds = std::make_shared(portId, counterIds); + wd.m_portCounterIdsMap.emplace(portVid, portCounterIds); // Start watchdog thread in case it was not running due to empty counter IDs map wd.startWatchdogThread(); @@ -53,27 +50,41 @@ void PfcWatchdog::setQueueCounterList( PfcWatchdog &wd = getInstance(); - sai_object_id_t portId = queueIdToPortId(queueId); - if (portId == SAI_NULL_OBJECT_ID) + auto it = wd.m_queueCounterIdsMap.find(queueVid); + if (it != wd.m_queueCounterIdsMap.end()) { + (*it).second->queueCounterIds = counterIds; return; } - auto it = wd.m_counterIdsMap.find(queueVid); - if (it != wd.m_counterIdsMap.end()) + auto queueCounterIds = std::make_shared(queueId, counterIds); + wd.m_queueCounterIdsMap.emplace(queueVid, queueCounterIds); + + // Start watchdog thread in case it was not running due to empty counter IDs map + wd.startWatchdogThread(); +} + +void PfcWatchdog::removePort( + _In_ sai_object_id_t portVid) +{ + SWSS_LOG_ENTER(); + + PfcWatchdog &wd = getInstance(); + + auto it = wd.m_portCounterIdsMap.find(portVid); + if (it == wd.m_portCounterIdsMap.end()) { - (*it).second->queueCounterIds = counterIds; + SWSS_LOG_ERROR("Trying to remove nonexisting port counter Ids 0x%lx", portVid); return; } - auto pfcCounterIds = std::make_shared(queueId, - portId, - std::vector(), - counterIds); - wd.m_counterIdsMap.emplace(queueVid, pfcCounterIds); + wd.m_portCounterIdsMap.erase(it); - // Start watchdog thread in case it was not running due to empty counter IDs map - wd.startWatchdogThread(); + // Stop watchdog thread if counter IDs map is empty + if (wd.m_queueCounterIdsMap.empty() && wd.m_portCounterIdsMap.empty()) + { + wd.endWatchdogThread(); + } } void PfcWatchdog::removeQueue( @@ -83,60 +94,81 @@ void PfcWatchdog::removeQueue( PfcWatchdog &wd = getInstance(); - auto it = wd.m_counterIdsMap.find(queueVid); - if (it == wd.m_counterIdsMap.end()) + auto it = wd.m_queueCounterIdsMap.find(queueVid); + if (it == wd.m_queueCounterIdsMap.end()) { SWSS_LOG_ERROR("Trying to remove nonexisting queue counter Ids 0x%lx", queueVid); return; } - wd.m_counterIdsMap.erase(it); + wd.m_queueCounterIdsMap.erase(it); // Stop watchdog thread if counter IDs map is empty - if (wd.m_counterIdsMap.empty()) + if (wd.m_queueCounterIdsMap.empty() && wd.m_portCounterIdsMap.empty()) { wd.endWatchdogThread(); } } -PfcWatchdog::~PfcWatchdog(void) +void PfcWatchdog::addPortCounterPlugin( + _In_ std::string sha) { - endWatchdogThread(); + SWSS_LOG_ENTER(); + + PfcWatchdog &wd = getInstance(); + + if (wd.m_portPlugins.find(sha) != wd.m_portPlugins.end() || + wd.m_queuePlugins.find(sha) != wd.m_queuePlugins.end()) + { + SWSS_LOG_ERROR("Plugin %s already registered", sha.c_str()); + } + + wd.m_portPlugins.insert(sha); + SWSS_LOG_NOTICE("Port counters plugin %s registered", sha.c_str()); } -PfcWatchdog::PfcWatchdog(void) +void PfcWatchdog::addQueueCounterPlugin( + _In_ std::string sha) { + SWSS_LOG_ENTER(); + + PfcWatchdog &wd = getInstance(); + + if (wd.m_portPlugins.find(sha) != wd.m_portPlugins.end() || + wd.m_queuePlugins.find(sha) != wd.m_queuePlugins.end()) + { + SWSS_LOG_ERROR("Plugin %s already registered", sha.c_str()); + } + + wd.m_queuePlugins.insert(sha); + SWSS_LOG_NOTICE("Queue counters plugin %s registered", sha.c_str()); } -PfcWatchdog& PfcWatchdog::getInstance(void) +void PfcWatchdog::removeCounterPlugin( + _In_ std::string sha) { - static PfcWatchdog wd; + SWSS_LOG_ENTER(); - return wd; + PfcWatchdog &wd = getInstance(); + + wd.m_queuePlugins.erase(sha); + wd.m_portPlugins.erase(sha); } -sai_object_id_t PfcWatchdog::queueIdToPortId( - _In_ sai_object_id_t queueId) +PfcWatchdog::~PfcWatchdog(void) { - SWSS_LOG_ENTER(); + endWatchdogThread(); +} - sai_attribute_t attr = - { - .id = SAI_QUEUE_ATTR_PORT, - .value = - { - .oid = queueId, - } - }; +PfcWatchdog::PfcWatchdog(void) +{ +} - sai_status_t status = sai_metadata_sai_queue_api->get_queue_attribute(queueId, 1, &attr); - if (status != SAI_STATUS_SUCCESS) - { - SWSS_LOG_ERROR("Failed to get port Id of queue 0x%lx: %d", queueId, status); - return SAI_NULL_OBJECT_ID; - } +PfcWatchdog& PfcWatchdog::getInstance(void) +{ + static PfcWatchdog wd; - return attr.value.oid; + return wd; } void PfcWatchdog::collectCounters( @@ -146,17 +178,14 @@ void PfcWatchdog::collectCounters( std::lock_guard lock(g_mutex); - // Collect stats for every registered queue - for (const auto &kv: m_counterIdsMap) + // Collect stats for every registered port + for (const auto &kv: m_portCounterIdsMap) { - const auto &queueVid = kv.first; - const auto &queueId = kv.second->queueId; + const auto &portVid = kv.first; const auto &portId = kv.second->portId; const auto &portCounterIds = kv.second->portCounterIds; - const auto &queueCounterIds = kv.second->queueCounterIds; std::vector portStats(portCounterIds.size()); - std::vector queueStats(queueCounterIds.size()); // Get port stats for queue sai_status_t status = sai_metadata_sai_port_api->get_port_stats( @@ -170,8 +199,32 @@ void PfcWatchdog::collectCounters( continue; } + // Push all counter values to a single vector + std::vector values; + + for (size_t i = 0; i != portCounterIds.size(); i++) + { + const std::string &counterName = sai_serialize_port_stat(portCounterIds[i]); + values.emplace_back(counterName, std::to_string(portStats[i])); + } + + // Write counters to DB + std::string portVidStr = sai_serialize_object_id(portVid); + + countersTable.set(portVidStr, values, ""); + } + + // Collect stats for every registered queue + for (const auto &kv: m_queueCounterIdsMap) + { + const auto &queueVid = kv.first; + const auto &queueId = kv.second->queueId; + const auto &queueCounterIds = kv.second->queueCounterIds; + + std::vector queueStats(queueCounterIds.size()); + // Get queue stats - status = sai_metadata_sai_queue_api->get_queue_stats( + sai_status_t status = sai_metadata_sai_queue_api->get_queue_stats( queueId, static_cast(queueCounterIds.size()), queueCounterIds.data(), @@ -185,12 +238,6 @@ void PfcWatchdog::collectCounters( // Push all counter values to a single vector std::vector values; - for (size_t i = 0; i != portCounterIds.size(); i++) - { - const std::string &counterName = sai_serialize_port_stat(portCounterIds[i]); - values.emplace_back(counterName, std::to_string(portStats[i])); - } - for (size_t i = 0; i != queueCounterIds.size(); i++) { const std::string &counterName = sai_serialize_queue_stat(queueCounterIds[i]); @@ -204,6 +251,45 @@ void PfcWatchdog::collectCounters( } } +void PfcWatchdog::runPlugins( + _In_ swss::DBConnector& db) +{ + SWSS_LOG_ENTER(); + + std::lock_guard lock(g_mutex); + + const std::vector argv = + { + std::to_string(COUNTERS_DB), + COUNTERS_TABLE, + std::to_string(PFC_WD_POLL_MSECS * 1000) + }; + + std::vector portList; + portList.reserve(m_portCounterIdsMap.size()); + for (const auto& kv : m_portCounterIdsMap) + { + portList.push_back(sai_serialize_object_id(kv.first)); + } + + for (const auto& sha : m_portPlugins) + { + runRedisScript(db, sha, portList, argv); + } + + std::vector queueList; + queueList.reserve(m_queueCounterIdsMap.size()); + for (const auto& kv : m_queueCounterIdsMap) + { + queueList.push_back(sai_serialize_object_id(kv.first)); + } + + for (const auto& sha : m_queuePlugins) + { + runRedisScript(db, sha, queueList, argv); + } +} + void PfcWatchdog::pfcWatchdogThread(void) { SWSS_LOG_ENTER(); @@ -214,6 +300,7 @@ void PfcWatchdog::pfcWatchdogThread(void) while (m_runPfcWatchdogThread) { collectCounters(countersTable); + runPlugins(db); std::unique_lock lk(m_mtxSleep); m_cvSleep.wait_for(lk, std::chrono::milliseconds(PFC_WD_POLL_MSECS)); diff --git a/syncd/syncd_pfc_watchdog.h b/syncd/syncd_pfc_watchdog.h index 4d8c73fb8bed..bb6d2cdb40ab 100644 --- a/syncd/syncd_pfc_watchdog.h +++ b/syncd/syncd_pfc_watchdog.h @@ -7,6 +7,7 @@ extern "C" { #include #include +#include #include #include "swss/table.h" @@ -14,47 +15,67 @@ class PfcWatchdog { public: static void setPortCounterList( - _In_ sai_object_id_t queueVid, - _In_ sai_object_id_t queueId, + _In_ sai_object_id_t portVid, + _In_ sai_object_id_t portId, _In_ const std::vector &counterIds); static void setQueueCounterList( _In_ sai_object_id_t queueVid, _In_ sai_object_id_t queueId, _In_ const std::vector &counterIds); + static void removePort( + _In_ sai_object_id_t portVid); static void removeQueue( _In_ sai_object_id_t queueVid); + static void addPortCounterPlugin( + _In_ std::string sha); + static void addQueueCounterPlugin( + _In_ std::string sha); + static void removeCounterPlugin( + _In_ std::string sha); + PfcWatchdog( _In_ const PfcWatchdog&) = delete; ~PfcWatchdog(void); private: - struct PfcCounterIds + struct QueueCounterIds { - PfcCounterIds( + QueueCounterIds( _In_ sai_object_id_t queue, - _In_ sai_object_id_t port, - _In_ const std::vector &portIds, _In_ const std::vector &queueIds); sai_object_id_t queueId; + std::vector queueCounterIds; + }; + + struct PortCounterIds + { + PortCounterIds( + _In_ sai_object_id_t port, + _In_ const std::vector &portIds); + sai_object_id_t portId; std::vector portCounterIds; - std::vector queueCounterIds; }; PfcWatchdog(void); static PfcWatchdog& getInstance(void); - static sai_object_id_t queueIdToPortId( - _In_ sai_object_id_t queueVid); void collectCounters( _In_ swss::Table &countersTable); + void runPlugins( + _In_ swss::DBConnector& db); void pfcWatchdogThread(void); void startWatchdogThread(void); void endWatchdogThread(void); - // Key is a queue Virtual ID - std::map> m_counterIdsMap; + // Key is a Virtual ID + std::map> m_portCounterIdsMap; + std::map> m_queueCounterIdsMap; + + // Plugins + std::set m_queuePlugins; + std::set m_portPlugins; std::atomic_bool m_runPfcWatchdogThread = { false }; std::shared_ptr m_pfcWatchdogThread = nullptr;