diff --git a/Server/CMakeLists.txt b/Server/CMakeLists.txt index 963b6f5..4c6bd3f 100644 --- a/Server/CMakeLists.txt +++ b/Server/CMakeLists.txt @@ -33,7 +33,6 @@ find_library(LIBYAML_CPP_LIBRARY lib64 lib) - find_path(LIBSWSSCOMMON_INCLUDE_DIR sonic-swss-common/common/dbconnector.h HINTS @@ -41,6 +40,16 @@ find_path(LIBSWSSCOMMON_INCLUDE_DIR PATH_SUFFIXES include) + +find_library(LIBHIREDIS_LIBRARY + NAMES + libhiredis.a hiredis + HINTS + ${HINT_ROOT_DIR} + PATH_SUFFIXES + lib64 + lib) + find_library(LIBSWSSCOMMON_LIBRARY NAMES libswsscommon.a libswsscommon @@ -60,13 +69,6 @@ find_library(LIBRT_LIBRARY lib) -if (NOT LIBSWSSCOMMON_INCLUDE_DIR OR NOT LIBSWSSCOMMON_LIBRARY) - Message (FATAL_ERROR "swsscommon was not found, cannot proceed. Visit https://github.com/sonic-net/sonic-swss-common for details on how to install it.") -#else () -# Message ("lib = " ${LIBSWSSCOMMON_LIBRARY}) -endif() - - if (NOT LIBYAML_CPP_INCLUDE_DIR OR NOT LIBYAML_CPP_LIBRARY) Message (FATAL_ERROR "Libyaml-cpp was not found, cannot proceed. Visit https://github.com/jbeder/yaml-cpp for install details.") endif() @@ -75,11 +77,21 @@ if (NOT LIBRT_LIBRARY AND NOT MACOSX) Message (FATAL_ERROR "librt was not found, cannot proceed.") endif() +if (NOT LIBHIREDIS_LIBRARY) + Message (FATAL_ERROR "Libhiredis was not found, cannot proceed. Visit https://github.com/redis/hiredis for details on how to install it.") +#else () +# Message ("lib = " ${LIBHIREDIS_LIBRARY}) +endif() +if (NOT LIBSWSSCOMMON_INCLUDE_DIR OR NOT LIBSWSSCOMMON_LIBRARY) + Message (FATAL_ERROR "swsscommon was not found, cannot proceed. Visit https://github.com/sonic-net/sonic-swss-common for details on how to install it.") +#else () +# Message ("lib = " ${LIBSWSSCOMMON_LIBRARY}) +endif() # Update the include dir -include_directories(${LIBSWSSCOMMON_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate ) - +include_directories(${LIBSWSSCOMMON_INCLUDE_DIR} ${LIBYAML_CPP_INCLUDE_DIR} src/ src/bmp src/bgp src/bgp/linkstate) +#link_directories(${LIBRDKAFKA_LIBRARY}) # Define the source files to compile set (SRC_FILES @@ -90,6 +102,7 @@ set (SRC_FILES src/md5.cpp src/Logger.cpp src/Config.cpp + src/RedisManager.cpp src/client_thread.cpp src/bgp/parseBGP.cpp src/bgp/NotificationMsg.cpp @@ -102,6 +115,7 @@ set (SRC_FILES src/bgp/EVPN.cpp src/bgp/linkstate/MPLinkState.cpp src/bgp/linkstate/MPLinkStateAttr.cpp + src/redis/MsgBusImpl_redis.cpp ) # Disable warnings @@ -124,7 +138,7 @@ if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang" OR CMAKE_COMPILER_IS_GNUCXX) endif() # Set the libs to link -set (LIBS pthread ${LIBYAML_CPP_LIBRARY} ${LIBSWSSCOMMON_LIBRARY} z ${SSL_LIBS} dl) +set (LIBS pthread ${LIBYAML_CPP_LIBRARY} ${LIBHIREDIS_LIBRARY} ${LIBSWSSCOMMON_LIBRARY} z ${SSL_LIBS} dl) # Set the binary add_executable (openbmpd ${SRC_FILES}) @@ -138,4 +152,4 @@ endif() # Install the binary and configs install(TARGETS openbmpd DESTINATION bin COMPONENT binaries) -install(FILES openbmpd.conf DESTINATION etc/openbmp/ COMPONENT config) \ No newline at end of file +install(FILES openbmpd.conf DESTINATION etc/openbmp/ COMPONENT config) diff --git a/Server/src/Config.cpp b/Server/src/Config.cpp index 8f4a785..27819fa 100644 --- a/Server/src/Config.cpp +++ b/Server/src/Config.cpp @@ -57,6 +57,25 @@ Config::Config() { pat_enabled = false; bzero(admin_id, sizeof(admin_id)); + /* + * Initialized the kafka topic names + * The keys match the configuration node/vars. Topic name nodes will be ignored if + * not initialized here. + */ + /* + topic_names_map[MSGBUS_TOPIC_VAR_COLLECTOR] = MSGBUS_TOPIC_COLLECTOR; + topic_names_map[MSGBUS_TOPIC_VAR_ROUTER] = MSGBUS_TOPIC_ROUTER; + topic_names_map[MSGBUS_TOPIC_VAR_PEER] = MSGBUS_TOPIC_PEER; + topic_names_map[MSGBUS_TOPIC_VAR_BMP_STAT] = MSGBUS_TOPIC_BMP_STAT; + topic_names_map[MSGBUS_TOPIC_VAR_BMP_RAW] = MSGBUS_TOPIC_BMP_RAW; + topic_names_map[MSGBUS_TOPIC_VAR_BASE_ATTRIBUTE] = MSGBUS_TOPIC_BASE_ATTRIBUTE; + topic_names_map[MSGBUS_TOPIC_VAR_UNICAST_PREFIX] = MSGBUS_TOPIC_UNICAST_PREFIX; + topic_names_map[MSGBUS_TOPIC_VAR_LS_NODE] = MSGBUS_TOPIC_LS_NODE; + topic_names_map[MSGBUS_TOPIC_VAR_LS_LINK] = MSGBUS_TOPIC_LS_LINK; + topic_names_map[MSGBUS_TOPIC_VAR_LS_PREFIX] = MSGBUS_TOPIC_LS_PREFIX; + topic_names_map[MSGBUS_TOPIC_VAR_L3VPN] = MSGBUS_TOPIC_L3VPN; + topic_names_map[MSGBUS_TOPIC_VAR_EVPN] = MSGBUS_TOPIC_EVPN; + */ } /*********************************************************************//** diff --git a/Server/src/RedisManager.cpp b/Server/src/RedisManager.cpp new file mode 100644 index 0000000..b899e00 --- /dev/null +++ b/Server/src/RedisManager.cpp @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2024 Microsoft, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ + +#include "RedisManager.h" + + +/*********************************************************************//** + * Constructor for class + ***********************************************************************/ +RedisManager::RedisManager() : stateDb_(BMP_DB_NAME, 0, true) { + swss::SonicDBConfig::initialize(); + swss::SonicDBConfig::initializeGlobalConfig(); + separator_ = swss::SonicDBConfig::getSeparator(BMP_DB_NAME); + exit_ = false; +} + +/*********************************************************************//** + * Constructor for class + ***********************************************************************/ +RedisManager::~RedisManager() { + if (!exit_) { + exit_ = true; + for (auto& threadPtr : threadList_) { + threadPtr->join(); + } + } +} + + +/********************************************************************* + * Setup logger for this class + * + * \param [in] logPtr logger pointer + ***********************************************************************/ +void RedisManager::Setup(Logger *logPtr, BMPListener::ClientInfo *client) { + logger = logPtr; + client_ = client; +} + + + +/** + * Get Key separator for deletion + * + * \param [in] N/A + */ +std::string RedisManager::GetKeySeparator() { + return separator_; +} + + +/** + * WriteBMPTable + * + * \param [in] table Reference to table name + * \param [in] key Reference to various keys list + * \param [in] fieldValues Reference to field-value pairs + */ +bool RedisManager::WriteBMPTable(const std::string& table, const std::vector& keys, const std::vector fieldValues) { + + if (enabledTables_.find(table) == enabledTables_.end()) { + LOG_INFO("RedisManager %s is disabled", table.c_str()); + return false; + } + + swss::Table stateBMPTable(&stateDb_, table); + std::string fullKey; + for (const auto& key : keys) { + fullKey += key; + fullKey += separator_; + } + fullKey.erase(fullKey.size() - 1); + + LOG_INFO("RedisManager WriteBMPTable key = %s", fullKey.c_str()); + + stateBMPTable.set(fullKey, fieldValues); + return true; +} + + +/** + * RemoveBMPTable + * + * \param [in] keys Reference to various keys + */ +bool RedisManager::RemoveBMPTable(const std::vector& keys) { + + stateDb_.del(keys); + return true; +} + +/** + * DisconnectBMP + * + * \param [in] N/A + */ +void RedisManager::DisconnectBMP() { + LOG_INFO("RedisManager DisconnectBMP"); + close(client_->c_sock); + client_->c_sock = 0; +} + +/** + * ExitRedisManager + * + * \param [in] N/A + */ +void RedisManager::ExitRedisManager() { + exit_ = true; + for (auto& threadPtr : threadList_) { + threadPtr->join(); + } +} + +/** + * ReadBMPTable, there will be dedicated thread be launched inside and monitor corresponding redis table. + * + * \param [in] tables table names to be subscribed. + */ +void RedisManager::SubscriberWorker(const std::string& table) { + try { + swss::DBConnector cfgDb("CONFIG_DB", 0, true); + + swss::SubscriberStateTable conf_table(&cfgDb, table); + swss::Select s; + s.addSelectable(&conf_table); + + while (!exit_) { + swss::Selectable *sel; + int ret; + + ret = s.select(&sel, BMP_CFG_TABLE_SELECT_TIMEOUT); + if (ret == swss::Select::ERROR) { + SWSS_LOG_NOTICE("Error: %s!", strerror(errno)); + continue; + } + if (ret == swss::Select::TIMEOUT) { + continue; + } + + swss::KeyOpFieldsValuesTuple kco; + conf_table.pop(kco); + + if (std::get<0>(kco) == "SET") { + if (std::get<1>(kco) == "true") { + EnableTable(table); + } + else { + DisableTable(table); + DisconnectBMP(); + } + } + else if (std::get<0>(kco) == "DEL") + { + LOG_ERR("Config should not be deleted"); + } + } + } + catch (const exception &e) { + LOG_ERR("Runtime error: %s", e.what()); + } +} + + + +/** + * ReadBMPTable, there will be dedicated thread be launched inside and monitor corresponding redis table. + * + * \param [in] tables table names to be subscribed. + */ +bool RedisManager::ReadBMPTable(const std::vector& tables) { + for (const auto& table : tables) { + std::shared_ptr threadPtr = std::make_shared( + std::bind(&RedisManager::SubscriberWorker, this, table)); + threadList_.push_back(threadPtr); + } + return true; +} + + +/** + * Enable specific Table + * + * \param [in] table Reference to table name, like BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE + */ +bool RedisManager::EnableTable(const std::string & table) { + enabledTables_.insert(table); + return true; +} + +/** + * Enable BGP_Neighbor* Table + * + * \param [in] table Reference to table name BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE + */ +bool RedisManager::DisableTable(const std::string & table) { + enabledTables_.erase(table); + return ResetBMPTable(table); +} + + +/** + * Reset ResetBMPTable, this will flush redis + * + * \param [in] table Reference to table name BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE + */ +bool RedisManager::ResetBMPTable(const std::string & table) { + + LOG_INFO("RedisManager ResetBMPTable %s", table.c_str()); + swss::Table stateBMPTable(&stateDb_, table); + std::vector keys; + stateBMPTable.getKeys(keys); + stateDb_.del(keys); + + return true; +} + + + +/** + * Reset all Tables once FRR reconnects to BMP, this will not disable table population + * + * \param [in] N/A + */ +void RedisManager::ResetAllTables() { + LOG_INFO("RedisManager ResetAllTables"); + for (const auto& enabledTable : enabledTables_) { + ResetBMPTable(enabledTable); + } +} \ No newline at end of file diff --git a/Server/src/RedisManager.h b/Server/src/RedisManager.h new file mode 100644 index 0000000..245f9c8 --- /dev/null +++ b/Server/src/RedisManager.h @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2024 Microsoft, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ + +#ifndef REDISMANAGER_H_ +#define REDISMANAGER_H_ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "Logger.h" +#include "BMPListener.h" + + +/** + * BMP_TABLE_* defines the default table name prefix + */ +#define BMP_DB_NAME "STATE_DB" +#define BMP_TABLE_NEI "BGP_NEIGHBOR_TABLE" +#define BMP_TABLE_RIB_IN "BGP_RIB_IN_TABLE" +#define BMP_TABLE_RIB_OUT "BGP_RIB_OUT_TABLE" +#define BMP_TABLE_NEI_PREFIX "BGP_NEIGHBOR" +#define BMP_TABLE_NEI_KEYS "KEYS BGP_NEIGHBOR*" +#define BMP_TABLE_RIB_IN_KEYS "KEYS BGP_RIB_IN_TABLE*" +#define BMP_TABLE_RIB_OUT_KEYS "KEYS BGP_RIB_OUT_TABLE*" + +/** + * BMP_CFG_TABLE_* defines config db tables. + */ +#define BMP_CFG_DB_NAME "CONFIG_DB" +#define BMP_CFG_TABLE_NEI "bgp_neighbor_table" +#define BMP_CFG_TABLE_RIB_IN "bgp-rib-in-table" +#define BMP_CFG_TABLE_RIB_OUT "bgp-rib-out-table" +#define BMP_CFG_TABLE_SELECT_TIMEOUT 1000 + +/** + * \class RedisManager + * + * \brief RedisManager class for openbmpd + * \details + * Encapsulate redis operation in this class instance. + */ +class RedisManager { + +public: + /*********************************************************************** + * Constructor for class + ***********************************************************************/ + RedisManager(); + + /*********************************************************************//** + * Destructor for class + ***********************************************************************/ + ~RedisManager(); + + /*********************************************************************** + * Setup logger for this class + * + * \param [in] logPtr logger pointer + * \param [in] client client pointer + ***********************************************************************/ + void Setup(Logger *logPtr, BMPListener::ClientInfo *client); + + /** + * DisconnectBMP + * + * \param [in] N/A + */ + void DisconnectBMP(); + + /** + * ExitRedisManager + * + * \param [in] N/A + */ + void ExitRedisManager(); + + /** + * Reset all Tables once FRR reconnects to BMP, this will not disable table population + * + * \param [in] N/A + */ + void ResetAllTables(); + + /** + * Reset ResetBMPTable, this will flush redis + * + * \param [in] table Reference to table name BGP_NEIGHBOR_TABLE/BGP_RIB_OUT_TABLE/BGP_RIB_IN_TABLE + */ + bool ResetBMPTable(const std::string & table); + + /** + * WriteBMPTable + * + * \param [in] table Reference to table name + * \param [in] key Reference to various keys list + * \param [in] fieldValues Reference to field-value pairs + */ + bool WriteBMPTable(const std::string& table, const std::vector& keys, const std::vector fieldValues); + + /** + * SubscriberWorker, thread main for redis table subcriber. + * + * \param [in] table table name to be subscribed. + */ + void SubscriberWorker(const std::string& table); + + /** + * ReadBMPTable, there will be dedicated thread be launched inside and monitor corresponding redis table. + * + * \param [in] tables table names to be subscribed. + */ + bool ReadBMPTable(const std::vector& tables); + + /** + * RemoveBMPTable + * + * \param [in] table Reference to table name + * \param [in] args Reference to various keys + */ + bool RemoveBMPTable(const std::vector& keys); + + /** + * Enable specific Table + * + * \param [in] table Reference to table name + */ + bool EnableTable(const std::string & table); + + /** + * Enable BGP_Neighbor* Table + * + * \param [in] table Reference to table name + */ + bool DisableTable(const std::string & table); + + + /** + * Get Key separator for deletion + * + * \param [in] N/A + */ + std::string GetKeySeparator(); + +private: + swss::DBConnector stateDb_; + std::string separator_; + Logger *logger; + std::unordered_set enabledTables_; + std::vector> threadList_; + bool exit_; + BMPListener::ClientInfo *client_; +}; + + +#endif /* RedisManager_H_ */ diff --git a/Server/src/bgp/OpenMsg.cpp b/Server/src/bgp/OpenMsg.cpp index 7fbbcfa..e6069ac 100644 --- a/Server/src/bgp/OpenMsg.cpp +++ b/Server/src/bgp/OpenMsg.cpp @@ -9,6 +9,7 @@ #include "OpenMsg.h" #include "AddPathDataContainer.h" #include "BMPReader.h" +#include "RedisManager.h" #include #include diff --git a/Server/src/bgp/OpenMsg.h b/Server/src/bgp/OpenMsg.h index bc9e59f..f073a64 100644 --- a/Server/src/bgp/OpenMsg.h +++ b/Server/src/bgp/OpenMsg.h @@ -28,7 +28,7 @@ namespace bgp_msg { */ class OpenMsg { public: - /** + /** * Defines the BGP capabilities * http://www.iana.org/assignments/capability-codes/capability-codes.xhtml */ @@ -47,6 +47,8 @@ class OpenMsg { BGP_CAP_MULTI_SESSION, BGP_CAP_ADD_PATH, BGP_CAP_ROUTE_REFRESH_ENHANCED, + BGP_CAP_LONG_LIVE_GRACEFUL_RESTART=71, + BGP_CAP_FQDN=73, BGP_CAP_ROUTE_REFRESH_OLD=128 }; diff --git a/Server/src/bgp/parseBGP.cpp b/Server/src/bgp/parseBGP.cpp index 0c4c459..2b9d8d0 100644 --- a/Server/src/bgp/parseBGP.cpp +++ b/Server/src/bgp/parseBGP.cpp @@ -347,28 +347,11 @@ u_char parseBGP::parseBgpHeader(u_char *data, size_t size) { * \param parsed_data Reference to the parsed update data */ void parseBGP::UpdateDB(bgp_msg::UpdateMsg::parsed_update_data &parsed_data) { - /* - * Update the path attributes - */ - UpdateDBAttrs(parsed_data.attrs); - - /* - * Update the bgp-ls data - */ - UpdateDbBgpLs(false, parsed_data.ls, parsed_data.ls_attrs); - UpdateDbBgpLs(true, parsed_data.ls_withdrawn, parsed_data.ls_attrs); - /* * Update the advertised prefixes (both ipv4 and ipv6) */ UpdateDBAdvPrefixes(parsed_data.advertised, parsed_data.attrs); - UpdateDBL3Vpn(false,parsed_data.vpn, parsed_data.attrs); - UpdateDBL3Vpn(true,parsed_data.vpn_withdrawn, parsed_data.attrs); - - UpdateDBeVPN(false, parsed_data.evpn, parsed_data.attrs); - UpdateDBeVPN(true, parsed_data.evpn_withdrawn, parsed_data.attrs); - /* * Update withdraws (both ipv4 and ipv6) */ diff --git a/Server/src/bmp/BMPReader.cpp b/Server/src/bmp/BMPReader.cpp index b8eac44..4b86100 100644 --- a/Server/src/bmp/BMPReader.cpp +++ b/Server/src/bmp/BMPReader.cpp @@ -69,11 +69,11 @@ BMPReader::~BMPReader() { * * \throw (char const *str) message indicate error */ -void BMPReader::readerThreadLoop(bool &run, BMPListener::ClientInfo *client) { +void BMPReader::readerThreadLoop(bool &run, BMPListener::ClientInfo *client, MsgBusInterface *mbus_ptr) { while (run) { try { - if (not ReadIncomingMsg(client)) + if (not ReadIncomingMsg(client, mbus_ptr)) break; } catch (char const *str) { @@ -95,7 +95,7 @@ void BMPReader::readerThreadLoop(bool &run, BMPListener::ClientInfo *client) { * * \throw (char const *str) message indicate error */ -bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { +bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client, MsgBusInterface *mbus_ptr) { bool rval = true; string peer_info_key; @@ -127,6 +127,14 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { try { bmp_type = pBMP->handleMessage(read_fd); + /* + * Now that we have parsed the BMP message... + * add record to the database + */ + + if (bmp_type != parseBMP::TYPE_INIT_MSG) + mbus_ptr->update_Router(r_object, mbus_ptr->ROUTER_ACTION_FIRST); // add the router entry + // only process the peering info if the message includes it if (bmp_type < 4) { // Update p_entry hash_id now that add_Router updated it. @@ -134,7 +142,8 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { peer_info_key = p_entry.peer_addr; peer_info_key += p_entry.peer_rd; - + if (bmp_type != parseBMP::TYPE_PEER_UP) + mbus_ptr->update_Peer(p_entry, NULL, NULL, mbus_ptr->PEER_ACTION_FIRST); // add the peer entry if (not peer_info_map[peer_info_key].using_2_octet_asn and p_entry.isTwoOctet) { peer_info_map[peer_info_key].using_2_octet_asn = true; @@ -155,7 +164,7 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { // Prepare the BGP parser - pBGP = new parseBGP(logger, NULL, &p_entry, (char *)r_object.ip_addr, + pBGP = new parseBGP(logger, mbus_ptr, &p_entry, (char *)r_object.ip_addr, &peer_info_map[peer_info_key]); if (cfg->debug_bgp) @@ -194,6 +203,8 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { delete pBGP; // Free the bgp parser after each use. + // Add event to the database + mbus_ptr->update_Peer(p_entry, NULL, &down_event, mbus_ptr->PEER_ACTION_DOWN); } else { LOG_ERR("Error with client socket %d", read_fd); @@ -214,7 +225,7 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { pBMP->bufferBMPMessage(read_fd); // Prepare the BGP parser - pBGP = new parseBGP(logger, NULL, &p_entry, (char *)r_object.ip_addr, + pBGP = new parseBGP(logger, mbus_ptr, &p_entry, (char *)r_object.ip_addr, &peer_info_map[peer_info_key]); if (cfg->debug_bgp) @@ -232,6 +243,8 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { pBMP->parsePeerUpInfo(pBMP->bmp_data + read, (int)pBMP->bmp_data_len - read); } + // Add the up event to the DB + mbus_ptr->update_Peer(p_entry, &up_event, NULL, mbus_ptr->PEER_ACTION_UP); } else { LOG_NOTICE("%s: PEER UP Received but failed to parse the BMP header.", client->c_ip); @@ -246,7 +259,7 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { * Read and parse the the BGP message from the client. * parseBGP will update mysql directly */ - pBGP = new parseBGP(logger, NULL, &p_entry, (char *)r_object.ip_addr, + pBGP = new parseBGP(logger, mbus_ptr, &p_entry, (char *)r_object.ip_addr, &peer_info_map[peer_info_key]); if (cfg->debug_bgp) @@ -262,7 +275,7 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { while (it != peer_info_map.end() && it->second.endOfRIB) ++it; - if (it == peer_info_map.end() || checkRIBdumpRate(p_entry.timestamp_secs,1)) { //End-Of-RIBs are received for all peers. + if (it == peer_info_map.end() || checkRIBdumpRate(p_entry.timestamp_secs,mbus_ptr->ribSeq)) { //End-Of-RIBs are received for all peers. timeval now; gettimeofday(&now, NULL); cfg->router_baseline_time[str] = 1.2 * (now.tv_sec - client->startTime.tv_sec); //20% buffer for baseline time @@ -274,6 +287,11 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { } case parseBMP::TYPE_STATS_REPORT : { // Stats Report + MsgBusInterface::obj_stats_report stats = {}; + if (! pBMP->handleStatsReport(read_fd, stats)) + // Add to mysql + mbus_ptr->add_StatReport(p_entry, stats); + break; } @@ -286,6 +304,7 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { hashRouter(client, r_object); LOG_INFO("Router ID hashed with hash_type: %d", r_object.hash_type); // Update the router entry with the details + mbus_ptr->update_Router(r_object, mbus_ptr->ROUTER_ACTION_INIT); break; } @@ -297,6 +316,7 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { pBMP->handleTermMsg(read_fd, r_object); LOG_INFO("Proceeding to disconnect router"); + mbus_ptr->update_Router(r_object, mbus_ptr->ROUTER_ACTION_TERM); close(client->c_sock); rval = false; // Indicate connection is closed @@ -307,12 +327,15 @@ bool BMPReader::ReadIncomingMsg(BMPListener::ClientInfo *client) { } catch (char const *str) { // Mark the router as disconnected and update the error to be a local disconnect (no term message received) LOG_INFO("%s: Caught: %s", client->c_ip, str); - disconnect(client, NULL, parseBMP::TERM_REASON_OPENBMP_CONN_ERR, str); + disconnect(client, mbus_ptr, parseBMP::TERM_REASON_OPENBMP_CONN_ERR, str); delete pBMP; // Make sure to free the resource throw str; } + // Send BMP RAW packet data + mbus_ptr->send_bmp_raw(router_hash_id, p_entry, pBMP->bmp_packet, pBMP->bmp_packet_len); + // Free the bmp parser delete pBMP; @@ -369,6 +392,7 @@ void BMPReader::disconnect(BMPListener::ClientInfo *client, MsgBusInterface *mbu if (reason_text != NULL) snprintf(r_object.term_reason_text, sizeof(r_object.term_reason_text), "%s", reason_text); + mbus_ptr->update_Router(r_object, mbus_ptr->ROUTER_ACTION_TERM); close(client->c_sock); client->c_sock = 0; diff --git a/Server/src/bmp/BMPReader.h b/Server/src/bmp/BMPReader.h index 82abdf3..a0dabc4 100644 --- a/Server/src/bmp/BMPReader.h +++ b/Server/src/bmp/BMPReader.h @@ -64,7 +64,7 @@ class BMPReader { * \param [in] mbus_ptr The database pointer referencer - DB should be already initialized * \return true if more to read, false if the connection is done/closed */ - bool ReadIncomingMsg(BMPListener::ClientInfo *client); + bool ReadIncomingMsg(BMPListener::ClientInfo *client, MsgBusInterface *mbus_ptr); /** * Checks if End-of-RIB is reached for all peers by checking the rate of RIB dumps @@ -86,7 +86,7 @@ class BMPReader { * * \throw (char const *str) message indicate error */ - void readerThreadLoop(bool &run, BMPListener::ClientInfo *client); + void readerThreadLoop(bool &run, BMPListener::ClientInfo *client, MsgBusInterface *mbus_ptr); /** * disconnect/close bmp stream diff --git a/Server/src/bmp/parseBMP.cpp b/Server/src/bmp/parseBMP.cpp index 9051028..39d0416 100644 --- a/Server/src/bmp/parseBMP.cpp +++ b/Server/src/bmp/parseBMP.cpp @@ -19,6 +19,7 @@ #include #include #include "bgp_common.h" +#include "RedisManager.h" /** * Constructor for class @@ -332,6 +333,9 @@ void parseBMP::parseBMPv3(int sock) { break; case TYPE_INIT_MSG: + { + break; + } case TYPE_TERM_MSG: // Allowed break; diff --git a/Server/src/client_thread.cpp b/Server/src/client_thread.cpp index 3b6a183..abe3cd7 100644 --- a/Server/src/client_thread.cpp +++ b/Server/src/client_thread.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "client_thread.h" #include "BMPReader.h" @@ -73,7 +74,6 @@ void *ClientThread(void *arg) { // Setup the client thread info struct ClientThreadInfo cInfo; - cInfo.client = &thr->client; cInfo.log = thr->log; cInfo.closing = false; @@ -89,6 +89,9 @@ void *ClientThread(void *arg) { pthread_cleanup_push(ClientThread_cancel, &cInfo); try { + // connect to redis + cInfo.redis = std::make_shared(logger, thr->cfg, cInfo.client); + BMPReader rBMP(logger, thr->cfg); LOG_INFO("Thread started to monitor BMP from router %s using socket %d buffer in bytes = %u", cInfo.client->c_ip, cInfo.client->c_sock, thr->cfg->bmp_buffer_size); @@ -103,8 +106,8 @@ void *ClientThread(void *arg) { */ bool bmp_run = true; //cInfo.bmp_reader_thread = new std::thread([&] {rBMP.readerThreadLoop(bmp_run,cInfo.client, - cInfo.bmp_reader_thread = new std::thread(&BMPReader::readerThreadLoop, &rBMP, std::ref(bmp_run), cInfo.client - ); + cInfo.bmp_reader_thread = new std::thread(&BMPReader::readerThreadLoop, &rBMP, std::ref(bmp_run), cInfo.client, + (MsgBusInterface *)cInfo.redis.get()); // Variables to handle circular buffer sock_buf = new unsigned char[thr->cfg->bmp_buffer_size]; @@ -264,7 +267,6 @@ void *ClientThread(void *arg) { cInfo.bmp_reader_thread = NULL; } - } // Exit the thread diff --git a/Server/src/client_thread.h b/Server/src/client_thread.h index d0f105f..f7f8a73 100644 --- a/Server/src/client_thread.h +++ b/Server/src/client_thread.h @@ -13,7 +13,9 @@ #include "BMPListener.h" #include "Logger.h" #include "Config.h" +#include "redis/MsgBusImpl_redis.h" #include +#include #define CLIENT_WRITE_BUFFER_BLOCK_SIZE 8192 // Number of bytes to write to BMP reader from buffer @@ -27,6 +29,7 @@ struct ThreadMgmt { }; struct ClientThreadInfo { + std::shared_ptr redis; BMPListener::ClientInfo *client; Logger *log; diff --git a/Server/src/openbmp.cpp b/Server/src/openbmp.cpp index 6fb225e..d12f8ab 100644 --- a/Server/src/openbmp.cpp +++ b/Server/src/openbmp.cpp @@ -16,13 +16,10 @@ */ #include "BMPListener.h" -//#include "MsgBusImpl_kafka.h" #include "MsgBusInterface.hpp" #include "client_thread.h" #include "openbmpd_version.h" #include "Config.h" -#include -#include #include #include @@ -363,11 +360,10 @@ bool ReadCmdArgs(int argc, char **argv, Config &cfg) { /** * Collector Update Message * - * \param [in] kafka Pointer to kafka instance * \param [in] cfg Reference to configuration * \param [in] code reason code for the update */ -void collector_update_msg( Config &cfg, +void collector_update_msg(Config &cfg, MsgBusInterface::collector_action_code code) { MsgBusInterface::obj_collector oc; @@ -391,8 +387,6 @@ void collector_update_msg( Config &cfg, gettimeofday(&tv, NULL); oc.timestamp_secs = tv.tv_sec; oc.timestamp_us = tv.tv_usec; - - // kafka->update_Collector(oc, code); } /** @@ -401,7 +395,6 @@ void collector_update_msg( Config &cfg, * \param [in] cfg Reference to the config options */ void runServer(Config &cfg) { - // msgBus_kafka *kafka; int active_connections = 0; // Number of active connections/threads int concurrent_routers = 0; // Number of concurrent routers time_t last_heartbeat_time = 0; @@ -419,13 +412,10 @@ void runServer(Config &cfg) { memcpy(cfg.c_hash_id, hash_raw, 16); delete[] hash_raw; - // Kafka connection - // kafka = new msgBus_kafka(logger, &cfg, cfg.c_hash_id); - // allocate and start a new bmp server BMPListener *bmp_svr = new BMPListener(logger, &cfg); - collector_update_msg( cfg, MsgBusInterface::COLLECTOR_ACTION_STARTED); + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_STARTED); last_heartbeat_time = time(NULL); LOG_INFO("Ready. Waiting for connections"); @@ -451,7 +441,7 @@ void runServer(Config &cfg) { delete thr_list.at(i); thr_list.erase(thr_list.begin() + i); - collector_update_msg( cfg, + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_CHANGE); } @@ -520,7 +510,7 @@ void runServer(Config &cfg) { // Free attribute pthread_attr_destroy(&thr_attr); - collector_update_msg( cfg, + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_CHANGE); last_heartbeat_time = time(NULL); @@ -530,7 +520,7 @@ void runServer(Config &cfg) { // Send heartbeat if needed if ( (time(NULL) - last_heartbeat_time) >= cfg.heartbeat_interval) { - collector_update_msg( cfg, MsgBusInterface::COLLECTOR_ACTION_HEARTBEAT); + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_HEARTBEAT); last_heartbeat_time = time(NULL); } @@ -544,8 +534,7 @@ void runServer(Config &cfg) { } } - collector_update_msg( cfg, MsgBusInterface::COLLECTOR_ACTION_STOPPED); - + collector_update_msg(cfg, MsgBusInterface::COLLECTOR_ACTION_STOPPED); } catch (char const *str) { LOG_WARN(str); @@ -557,7 +546,7 @@ void runServer(Config &cfg) { */ int main(int argc, char **argv) { Config cfg; - swss::DBConnector stateDb("STATE_DB", 0, true); + // Process the command line args if (ReadCmdArgs(argc, argv, cfg)) { return 1; diff --git a/Server/src/redis/MsgBusImpl_redis.cpp b/Server/src/redis/MsgBusImpl_redis.cpp new file mode 100644 index 0000000..1636885 --- /dev/null +++ b/Server/src/redis/MsgBusImpl_redis.cpp @@ -0,0 +1,243 @@ +#include +#include +#include + +#include + +#include +#include + +#include +#include + +#include "MsgBusImpl_redis.h" +#include "RedisManager.h" + + + + + +using namespace std; + +/******************************************************************//** + * \brief This function will initialize and connect to Kafka. + * + * \details It is expected that this class will start off with a new connection. + * + * \param [in] logPtr Pointer to Logger instance + * \param [in] cfg Pointer to the config instance + ********************************************************************/ +MsgBusImpl_redis::MsgBusImpl_redis(Logger *logPtr, Config *cfg, BMPListener::ClientInfo *client) { + logger = logPtr; + this->cfg = cfg; + redisMgr_.Setup(logPtr, client); + std::vector tables; + tables.emplace_back(BMP_CFG_TABLE_NEI); + tables.emplace_back(BMP_CFG_TABLE_RIB_IN); + tables.emplace_back(BMP_CFG_TABLE_RIB_OUT); + + redisMgr_.ReadBMPTable(tables); +} + +/** + * Destructor + */ +MsgBusImpl_redis::~MsgBusImpl_redis() { + redisMgr_.ExitRedisManager(); +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code) { + + // Below attributes will be populated if exists, and no matter bgp neighbor is up or down + std::vector fieldValues; + fieldValues.reserve(30); + std::vector keys; + keys.emplace_back(peer.peer_addr); + + fieldValues.emplace_back(std::make_pair("peer_addr", peer.peer_addr)); + fieldValues.emplace_back(std::make_pair("peer_asn", std::to_string(peer.peer_as))); + fieldValues.emplace_back(std::make_pair("peer_rd", peer.peer_rd)); + fieldValues.emplace_back(std::make_pair("remote_port", std::to_string(up->remote_port))); + fieldValues.emplace_back(std::make_pair("local_asn", std::to_string(up->local_asn))); + fieldValues.emplace_back(std::make_pair("local_ip", up->local_ip)); + fieldValues.emplace_back(std::make_pair("local_port", std::to_string(up->local_port))); + fieldValues.emplace_back(std::make_pair("sent_cap", up->sent_cap)); + fieldValues.emplace_back(std::make_pair("recv_cap", up->recv_cap)); + + switch (code) { + case PEER_ACTION_UP : + { + } + break; + + case PEER_ACTION_DOWN: + { + // PEER DOWN only + fieldValues.emplace_back(std::make_pair("bgp_err_code", std::to_string(down->bgp_err_code))); + fieldValues.emplace_back(std::make_pair("bgp_err_subcode", std::to_string(down->bgp_err_subcode))); + fieldValues.emplace_back(std::make_pair("error_text", down->error_text)); + + } + break; + } + redisMgr_.WriteBMPTable(BMP_TABLE_NEI, keys, fieldValues); +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_unicastPrefix(obj_bgp_peer &peer, std::vector &rib, + obj_path_attr *attr, unicast_prefix_action_code code) { + if (attr == NULL) + return; + + // Loop through the vector array of rib entries + std::vector addFieldValues; + addFieldValues.reserve(30); + std::vector del_keys; + string neigh = peer.peer_addr; + + for (size_t i = 0; i < rib.size(); i++) { + std::vector keys; + std::string redisMgr_pfx = rib[i].prefix; + redisMgr_pfx += "/"; + redisMgr_pfx += std::to_string(rib[i].prefix_len); + keys.reserve(10); + keys.emplace_back(redisMgr_pfx); + + switch (code) { + + case UNICAST_PREFIX_ACTION_ADD: + { + addFieldValues.emplace_back(std::make_pair("origin", attr->origin)); + addFieldValues.emplace_back(std::make_pair("as_path", attr->as_path)); + addFieldValues.emplace_back(std::make_pair("as_path_count", std::to_string(attr->as_path_count))); + addFieldValues.emplace_back(std::make_pair("origin_as", std::to_string(attr->origin_as))); + addFieldValues.emplace_back(std::make_pair("next_hop", attr->next_hop)); + addFieldValues.emplace_back(std::make_pair("local_pref", std::to_string(attr->local_pref))); + addFieldValues.emplace_back(std::make_pair("community_list", attr->community_list)); + addFieldValues.emplace_back(std::make_pair("ext_community_list", attr->ext_community_list)); + addFieldValues.emplace_back(std::make_pair("large_community_list", attr->large_community_list)); + addFieldValues.emplace_back(std::make_pair("originator_id", attr->originator_id)); + + keys.emplace_back(BMP_TABLE_NEI_PREFIX); + keys.emplace_back(peer.peer_addr); + + if(peer.isAdjIn) + { + redisMgr_.WriteBMPTable(BMP_TABLE_RIB_IN, keys, addFieldValues); + } + else + { + redisMgr_.WriteBMPTable(BMP_TABLE_RIB_OUT, keys, addFieldValues); + } + } + break; + + case UNICAST_PREFIX_ACTION_DEL: + { + std::string com_key; + if(peer.isAdjIn) + { + com_key = BMP_TABLE_RIB_IN; + } + else + { + com_key = BMP_TABLE_RIB_OUT; + } + com_key += redisMgr_.GetKeySeparator(); + com_key += redisMgr_pfx; + com_key += redisMgr_.GetKeySeparator(); + com_key += BMP_TABLE_NEI_PREFIX; + com_key += neigh; + del_keys.push_back(com_key); + } + break; + } + } + + if (!del_keys.empty()) { + redisMgr_.RemoveBMPTable(del_keys); + } +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_Collector(obj_collector &c_object, collector_action_code action_code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_Router(obj_router &r_object, router_action_code code) { + if (code == ROUTER_ACTION_INIT) { + redisMgr_.ResetAllTables(); + } +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code) { + +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_L3Vpn(obj_bgp_peer &peer, std::vector &vpn, + obj_path_attr *attr, vpn_action_code code) { + +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_eVPN(obj_bgp_peer &peer, std::vector &vpn, + obj_path_attr *attr, vpn_action_code code) { +} + + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list &nodes, + ls_action_code code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list &links, + ls_action_code code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + */ +void MsgBusImpl_redis::update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list &prefixes, + ls_action_code code) { +} + +/** + * Abstract method Implementation - See MsgBusInterface.hpp for details + * + * TODO: Consolidate this to single produce method + */ +void MsgBusImpl_redis::send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len) { +} \ No newline at end of file diff --git a/Server/src/redis/MsgBusImpl_redis.h b/Server/src/redis/MsgBusImpl_redis.h new file mode 100644 index 0000000..d18a55c --- /dev/null +++ b/Server/src/redis/MsgBusImpl_redis.h @@ -0,0 +1,69 @@ +#ifndef MSGBUSIMPL_REDIS_H_ +#define MSGBUSIMPL_REDIS_H_ + +#define HASH_SIZE 16 + +#include "MsgBusInterface.hpp" +#include "RedisManager.h" +#include "BMPListener.h" + +#include "Logger.h" +#include +#include +#include +#include + + + +#include "Config.h" + +/** + * \class MsgBusImpl_redis + * + * \brief Kafka message bus implementation + */ +class MsgBusImpl_redis: public MsgBusInterface { +public: + + /******************************************************************//** + * \brief This function will initialize and connect to Kafka. + * + * \details It is expected that this class will start off with a new connection. + * + * \param [in] logPtr Pointer to Logger instance + * \param [in] cfg Pointer to the config instance + ********************************************************************/ + MsgBusImpl_redis(Logger *logPtr, Config *cfg, BMPListener::ClientInfo *client); + ~MsgBusImpl_redis(); + + /* + * abstract methods implemented + * See MsgBusInterface.hpp for method details + */ + void update_Collector(struct obj_collector &c_obj, collector_action_code action_code); + void update_Router(struct obj_router &r_entry, router_action_code code); + void update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, obj_peer_down_event *down, peer_action_code code); + void update_baseAttribute(obj_bgp_peer &peer, obj_path_attr &attr, base_attr_action_code code); + void update_unicastPrefix(obj_bgp_peer &peer, std::vector &rib, obj_path_attr *attr, unicast_prefix_action_code code); + void add_StatReport(obj_bgp_peer &peer, obj_stats_report &stats); + + void update_LsNode(obj_bgp_peer &peer, obj_path_attr &attr, std::list &nodes, + ls_action_code code); + void update_LsLink(obj_bgp_peer &peer, obj_path_attr &attr, std::list &links, + ls_action_code code); + void update_LsPrefix(obj_bgp_peer &peer, obj_path_attr &attr, std::list &prefixes, + ls_action_code code); + + void update_L3Vpn(obj_bgp_peer &peer, std::vector &vpn, obj_path_attr *attr, vpn_action_code code); + + void update_eVPN(obj_bgp_peer &peer, std::vector &vpn, obj_path_attr *attr, vpn_action_code code); + + void send_bmp_raw(u_char *r_hash, obj_bgp_peer &peer, u_char *data, size_t data_len); + +private: + Logger *logger; ///< Logging class pointer + Config *cfg; ///< Pointer to config instance + RedisManager redisMgr_; +}; + +#endif /* MSGBUSIMPL_REDIS_H_ */ diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 94916e9..ed6c5a3 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -100,9 +100,14 @@ jobs: git clone https://github.com/FengPan-Frank/sonic-bmp.git cd sonic-bmp - git checkout fenpan_swsscommon + sudo cp /usr/lib/x86_64-linux-gnu/libhiredis.* /usr/local/lib + sudo ls /usr/local/lib + + git checkout fenpan_redis + mkdir build cd build cmake -DCMAKE_INSTALL_PREFIX:PATH=/usr ../ make - + sudo ldconfig -p | grep libsasl2 +